Coverage Report

Created: 2026-06-10 18:56

/root/doris/cloud/src/recycler/recycler.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/cloud.pb.h>
21
#include <glog/logging.h>
22
23
#include <atomic>
24
#include <condition_variable>
25
#include <cstdint>
26
#include <deque>
27
#include <functional>
28
#include <map>
29
#include <memory>
30
#include <string>
31
#include <string_view>
32
#include <thread>
33
#include <unordered_map>
34
#include <unordered_set>
35
#include <utility>
36
#include <vector>
37
38
#include "common/bvars.h"
39
#include "meta-service/delete_bitmap_lock_white_list.h"
40
#include "meta-service/txn_lazy_committer.h"
41
#include "meta-store/versionstamp.h"
42
#include "recycler/snapshot_chain_compactor.h"
43
#include "recycler/snapshot_data_migrator.h"
44
#include "recycler/storage_vault_accessor.h"
45
#include "snapshot/snapshot_manager.h"
46
47
namespace brpc {
48
class Server;
49
} // namespace brpc
50
51
namespace doris::cloud {
52
class TxnKv;
53
class InstanceRecycler;
54
class StorageVaultAccessor;
55
class Checker;
56
class SimpleThreadPool;
57
class RecyclerMetricsContext;
58
class TabletRecyclerMetricsContext;
59
class SegmentRecyclerMetricsContext;
60
struct RecyclerThreadPoolGroup {
61
10
    RecyclerThreadPoolGroup() = default;
62
    RecyclerThreadPoolGroup(std::shared_ptr<SimpleThreadPool> s3_producer_pool,
63
                            std::shared_ptr<SimpleThreadPool> recycle_tablet_pool,
64
                            std::shared_ptr<SimpleThreadPool> group_recycle_function_pool)
65
            : s3_producer_pool(std::move(s3_producer_pool)),
66
              recycle_tablet_pool(std::move(recycle_tablet_pool)),
67
13
              group_recycle_function_pool(std::move(group_recycle_function_pool)) {}
68
303
    ~RecyclerThreadPoolGroup() = default;
69
140
    RecyclerThreadPoolGroup(const RecyclerThreadPoolGroup&) = default;
70
    RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup& other) = default;
71
13
    RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup&& other) = default;
72
140
    RecyclerThreadPoolGroup(RecyclerThreadPoolGroup&&) = default;
73
    // used for accessor.delete_files, accessor.delete_directory
74
    std::shared_ptr<SimpleThreadPool> s3_producer_pool;
75
    // used for InstanceRecycler::recycle_tablet
76
    std::shared_ptr<SimpleThreadPool> recycle_tablet_pool;
77
    std::shared_ptr<SimpleThreadPool> group_recycle_function_pool;
78
};
79
80
class Recycler {
81
public:
82
    explicit Recycler(std::shared_ptr<TxnKv> txn_kv);
83
    ~Recycler();
84
85
    // returns 0 for success otherwise error
86
    int start(brpc::Server* server);
87
88
    void stop();
89
90
3.44k
    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
91
92
0
    RecyclerThreadPoolGroup& thread_pool_group() { return _thread_pool_group; }
93
94
private:
95
    void recycle_callback();
96
97
    void instance_scanner_callback();
98
99
    void lease_recycle_jobs();
100
101
    void check_recycle_tasks();
102
103
private:
104
    friend class RecyclerServiceImpl;
105
106
    std::shared_ptr<TxnKv> txn_kv_;
107
    std::atomic_bool stopped_ {false};
108
109
    std::vector<std::thread> workers_;
110
111
    std::mutex mtx_;
112
    // notify recycle workers
113
    std::condition_variable pending_instance_cond_;
114
    std::deque<InstanceInfoPB> pending_instance_queue_;
115
    std::unordered_set<std::string> pending_instance_set_;
116
    std::unordered_map<std::string, std::shared_ptr<InstanceRecycler>> recycling_instance_map_;
117
    // notify instance scanner and lease thread
118
    std::condition_variable notifier_;
119
120
    std::string ip_port_;
121
122
    std::unique_ptr<Checker> checker_;
123
124
    RecyclerThreadPoolGroup _thread_pool_group;
125
126
    std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
127
    std::shared_ptr<SnapshotManager> snapshot_manager_;
128
    std::shared_ptr<SnapshotDataMigrator> snapshot_data_migrator_;
129
    std::shared_ptr<SnapshotChainCompactor> snapshot_chain_compactor_;
130
};
131
132
enum class RowsetRecyclingState {
133
    FORMAL_ROWSET,
134
    TMP_ROWSET,
135
};
136
137
// Represents a single rowset deletion task for batch delete
138
struct RowsetDeleteTask {
139
    RowsetMetaCloudPB rowset_meta;
140
    std::string recycle_rowset_key;       // Primary key marking "pending recycle"
141
    std::string non_versioned_rowset_key; // Legacy non-versioned rowset meta key
142
    std::string versioned_rowset_key;     // Versioned meta rowset key
143
    Versionstamp versionstamp;
144
    std::string rowset_ref_count_key;
145
};
146
147
class RecyclerMetricsContext {
148
public:
149
9
    RecyclerMetricsContext() = default;
150
151
    RecyclerMetricsContext(std::string instance_id, std::string operation_type)
152
569
            : operation_type(std::move(operation_type)), instance_id(std::move(instance_id)) {
153
569
        start();
154
569
    }
155
156
577
    ~RecyclerMetricsContext() = default;
157
158
    std::atomic_ullong total_need_recycle_data_size = 0;
159
    std::atomic_ullong total_need_recycle_num = 0;
160
161
    std::atomic_ullong total_recycled_data_size = 0;
162
    std::atomic_ullong total_recycled_num = 0;
163
164
    std::string operation_type;
165
    std::string instance_id;
166
167
    double start_time = 0;
168
169
569
    void start() {
170
569
        start_time = duration_cast<std::chrono::milliseconds>(
171
569
                             std::chrono::system_clock::now().time_since_epoch())
172
569
                             .count();
173
569
    }
174
175
294
    double duration() const {
176
294
        return duration_cast<std::chrono::milliseconds>(
177
294
                       std::chrono::system_clock::now().time_since_epoch())
178
294
                       .count() -
179
294
               start_time;
180
294
    }
181
182
18
    void reset() {
183
18
        total_need_recycle_data_size = 0;
184
18
        total_need_recycle_num = 0;
185
18
        total_recycled_data_size = 0;
186
18
        total_recycled_num = 0;
187
18
        start_time = duration_cast<std::chrono::milliseconds>(
188
18
                             std::chrono::system_clock::now().time_since_epoch())
189
18
                             .count();
190
18
    }
191
192
294
    void finish_report() {
193
294
        if (!operation_type.empty()) {
194
294
            double cost = duration();
195
294
            g_bvar_recycler_instance_last_round_recycle_elpased_ts.put(
196
294
                    {instance_id, operation_type}, cost);
197
294
            g_bvar_recycler_instance_recycle_round.put({instance_id, operation_type}, 1);
198
294
            g_bvar_recycler_instance_recycle_total_bytes_since_started.put(
199
294
                    {instance_id, operation_type}, total_recycled_data_size.load());
200
294
            g_bvar_recycler_instance_recycle_total_num_since_started.put(
201
294
                    {instance_id, operation_type}, total_recycled_num.load());
202
294
            LOG(INFO) << "recycle instance: " << instance_id
203
294
                      << ", operation type: " << operation_type << ", cost: " << cost
204
294
                      << " ms, total recycled num: " << total_recycled_num.load()
205
294
                      << ", total recycled data size: " << total_recycled_data_size.load()
206
294
                      << " bytes";
207
294
            if (cost != 0) {
208
284
                if (total_recycled_num.load() != 0) {
209
76
                    g_bvar_recycler_instance_recycle_time_per_resource.put(
210
76
                            {instance_id, operation_type}, cost / total_recycled_num.load());
211
76
                }
212
284
                g_bvar_recycler_instance_recycle_bytes_per_ms.put(
213
284
                        {instance_id, operation_type}, total_recycled_data_size.load() / cost);
214
284
            }
215
294
        }
216
294
    }
217
218
    // `is_begin` is used to initialize total num of items need to be recycled
219
1.16k
    void report(bool is_begin = false) {
220
1.16k
        if (!operation_type.empty()) {
221
            // is init
222
1.13k
            if (is_begin) {
223
3
                auto value = total_need_recycle_num.load();
224
225
3
                g_bvar_recycler_instance_last_round_to_recycle_bytes.put(
226
3
                        {instance_id, operation_type}, total_need_recycle_data_size.load());
227
3
                g_bvar_recycler_instance_last_round_to_recycle_num.put(
228
3
                        {instance_id, operation_type}, value);
229
1.13k
            } else {
230
1.13k
                g_bvar_recycler_instance_last_round_recycled_bytes.put(
231
1.13k
                        {instance_id, operation_type}, total_recycled_data_size.load());
232
1.13k
                g_bvar_recycler_instance_last_round_recycled_num.put({instance_id, operation_type},
233
1.13k
                                                                     total_recycled_num.load());
234
1.13k
            }
235
1.13k
        }
236
1.16k
    }
237
};
238
239
class TabletRecyclerMetricsContext : public RecyclerMetricsContext {
240
public:
241
140
    TabletRecyclerMetricsContext() : RecyclerMetricsContext("global_recycler", "recycle_tablet") {}
242
};
243
244
class SegmentRecyclerMetricsContext : public RecyclerMetricsContext {
245
public:
246
    SegmentRecyclerMetricsContext()
247
140
            : RecyclerMetricsContext("global_recycler", "recycle_segment") {}
248
};
249
250
struct OplogRecycleStats;
251
252
class InstanceRecycler {
253
public:
254
    struct PackedFileRecycleStats {
255
        int64_t num_scanned = 0;          // packed-file kv scanned
256
        int64_t num_corrected = 0;        // packed-file kv corrected
257
        int64_t num_deleted = 0;          // packed-file kv deleted
258
        int64_t num_failed = 0;           // packed-file kv failed
259
        int64_t bytes_deleted = 0;        // packed-file kv bytes deleted from txn-kv
260
        int64_t num_object_deleted = 0;   // packed-file objects deleted from storage (vault/HDFS)
261
        int64_t bytes_object_deleted = 0; // bytes deleted from storage objects
262
        int64_t rowset_scan_count = 0;    // rowset metas scanned during correction
263
    };
264
265
    explicit InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const InstanceInfoPB& instance,
266
                              RecyclerThreadPoolGroup thread_pool_group,
267
                              std::shared_ptr<TxnLazyCommitter> txn_lazy_committer);
268
    ~InstanceRecycler();
269
270
0
    std::string_view instance_id() const { return instance_id_; }
271
5
    const InstanceInfoPB& instance_info() const { return instance_info_; }
272
273
    // returns 0 for success otherwise error
274
    int init();
275
276
0
    void stop() { stopped_.store(true, std::memory_order_release); }
277
79
    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
278
279
    // returns 0 for success otherwise error
280
    int do_recycle();
281
282
    // remove all kv and data in this instance, ONLY be called when instance has been deleted
283
    // returns 0 for success otherwise error
284
    int recycle_deleted_instance();
285
286
    // scan and recycle expired indexes:
287
    // 1. dropped table, dropped mv
288
    // 2. half-successtable/index when create
289
    // returns 0 for success otherwise error
290
    int recycle_indexes();
291
292
    // scan and recycle expired partitions:
293
    // 1. dropped parttion
294
    // 2. half-success partition when create
295
    // returns 0 for success otherwise error
296
    int recycle_partitions();
297
298
    // scan and recycle expired rowsets:
299
    // 1. prepare_rowset will produce recycle_rowset before uploading data to remote storage (memo)
300
    // 2. compaction will change the input rowsets to recycle_rowset
301
    // returns 0 for success otherwise error
302
    int recycle_rowsets();
303
304
    // like `recycle_rowsets`, but for versioned rowsets.
305
    int recycle_versioned_rowsets();
306
307
    // scan and recycle expired tmp rowsets:
308
    // 1. commit_rowset will produce tmp_rowset when finish upload data (load or compaction) to remote storage
309
    // returns 0 for success otherwise error
310
    int recycle_tmp_rowsets();
311
312
    /**
313
     * recycle all tablets belonging to the index specified by `index_id`
314
     *
315
     * @param partition_id if positive, only recycle tablets in this partition belonging to the specified index
316
     * @return 0 for success otherwise error
317
     */
318
    int recycle_tablets(int64_t table_id, int64_t index_id, RecyclerMetricsContext& ctx,
319
                        int64_t partition_id = -1);
320
321
    /**
322
     * recycle all rowsets belonging to the tablet specified by `tablet_id`
323
     *
324
     * @return 0 for success otherwise error
325
     */
326
    int recycle_tablet(int64_t tablet_id, RecyclerMetricsContext& metrics_context);
327
328
    /**
329
     * like `recycle_tablet`, but for versioned tablet
330
     */
331
    int recycle_versioned_tablet(int64_t tablet_id, RecyclerMetricsContext& metrics_context);
332
333
    // scan and recycle useless partition version kv
334
    int recycle_versions();
335
336
    // scan and recycle the orphan partitions
337
    int recycle_orphan_partitions();
338
339
    // scan and abort timeout txn label
340
    // returns 0 for success otherwise error
341
    int abort_timeout_txn();
342
343
    //scan and recycle expire txn label
344
    // returns 0 for success otherwise error
345
    int recycle_expired_txn_label();
346
347
    // scan and recycle finished or timeout copy jobs
348
    // returns 0 for success otherwise error
349
    int recycle_copy_jobs();
350
351
    // scan and recycle dropped internal stage
352
    // returns 0 for success otherwise error
353
    int recycle_stage();
354
355
    // scan and recycle expired stage objects
356
    // returns 0 for success otherwise error
357
    int recycle_expired_stage_objects();
358
359
    // scan and recycle operation logs
360
    // returns 0 for success otherwise error
361
    int recycle_operation_logs();
362
363
    // scan and recycle expired restore jobs
364
    // returns 0 for success otherwise error
365
    int recycle_restore_jobs();
366
367
    /**
368
     * Scan packed-file metadata, correct reference counters, and recycle unused packed files.
369
     *
370
     * @return 0 on success, non-zero error code otherwise
371
     */
372
    int recycle_packed_files();
373
374
    // scan and recycle snapshots
375
    // returns 0 for success otherwise error
376
    int recycle_cluster_snapshots();
377
378
    // scan and recycle ref rowsets for deleted instance
379
    // returns 0 for success otherwise error
380
    int recycle_ref_rowsets(bool* has_unrecycled_rowsets);
381
382
    bool check_recycle_tasks();
383
384
    int scan_and_statistics_indexes();
385
386
    int scan_and_statistics_partitions();
387
388
    int scan_and_statistics_rowsets();
389
390
    int scan_and_statistics_tmp_rowsets();
391
392
    int scan_and_statistics_abort_timeout_txn();
393
394
    int scan_and_statistics_expired_txn_label();
395
396
    int scan_and_statistics_copy_jobs();
397
398
    int scan_and_statistics_stage();
399
400
    int scan_and_statistics_expired_stage_objects();
401
402
    int scan_and_statistics_versions();
403
404
    int scan_and_statistics_restore_jobs();
405
406
    void scan_and_statistics_operation_logs();
407
408
    /**
409
     * Decode the key of a packed-file metadata record into the persisted object path.
410
     *
411
     * @param key raw key persisted in txn-kv
412
     * @param packed_path output object storage path referenced by the key
413
     * @return true if decoding succeeds, false otherwise
414
     */
415
    static bool decode_packed_file_key(std::string_view key, std::string* packed_path);
416
417
29
    void TEST_add_accessor(std::string_view id, std::shared_ptr<StorageVaultAccessor> accessor) {
418
29
        accessor_map_.insert({std::string(id), std::move(accessor)});
419
29
    }
420
421
    // Recycle snapshot meta and data, return 0 for success otherwise error.
422
    int recycle_snapshot_meta_and_data(const std::string& instance_id,
423
                                       const std::string& resource_id,
424
                                       Versionstamp snapshot_version,
425
                                       const SnapshotPB& snapshot_pb);
426
427
private:
428
    // returns 0 for success otherwise error
429
    int init_obj_store_accessors();
430
431
    // returns 0 for success otherwise error
432
    int init_storage_vault_accessors();
433
434
    /**
435
     * Scan key-value pairs between [`begin`, `end`) with multiple rounds of range get(`RangeGetIterator`),
436
     * and perform `recycle_func` on each key-value pair.
437
     *
438
     * @param recycle_func defines how to recycle resources corresponding to a key-value pair.
439
     *                     The scan will stop if recycle_func() returns non-zero.
440
     *                     recycle_func() returns 0 if the recycling is successful or the scan can continue with ignorable errors.
441
     * @param loop_done is called after a round (`RangeGetIterator`) in the scan has no next kv. Usually used to perform a batch recycling.
442
     *                  The scan will stop if loop_done() returns non-zero.
443
     *                  loop_done() returns 0 if the recycling is successful or the scan can continue with ignorable errors.
444
     * @return 0 if all corresponding resources are recycled successfully, otherwise non-zero
445
     */
446
    int scan_and_recycle(std::string begin, std::string_view end,
447
                         std::function<int(std::string_view k, std::string_view v)> recycle_func,
448
                         std::function<int()> loop_done = nullptr,
449
                         std::function<bool(std::string*)> next_begin_getter = nullptr);
450
451
    static int next_recycle_rowset_tablet_key(const std::string& instance_id, int64_t tablet_id,
452
                                              std::string* next_key);
453
454
    int scan_recycle_rowsets_by_tablet(
455
            std::string begin, std::string_view end,
456
            std::function<int(std::string_view k, std::string_view v)> recycle_func,
457
            std::function<int()> loop_done = nullptr);
458
459
    // return 0 for success otherwise error
460
    int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb);
461
462
    // return 0 for success otherwise error
463
    // NOTE: this function ONLY be called when the file paths cannot be calculated
464
    int delete_rowset_data(const std::string& resource_id, int64_t tablet_id,
465
                           const std::string& rowset_id);
466
467
    // return 0 for success otherwise error
468
    int delete_rowset_data(const std::map<std::string, doris::RowsetMetaCloudPB>& rowsets,
469
                           RowsetRecyclingState type, RecyclerMetricsContext& metrics_context);
470
471
    // Decrement packed file ref counts for rowset segments.
472
    // Returns 0 for success, -1 for error.
473
    int decrement_packed_file_ref_counts(const doris::RowsetMetaCloudPB& rs_meta_pb);
474
475
    enum class DeleteBitmapStorageType {
476
        NOT_FOUND,
477
        IN_FDB,
478
        STANDALONE_FILE,
479
        PACKED_FILE,
480
    };
481
482
    // Process delete bitmap storage and decrement packed file ref count when needed.
483
    // Returns 0 for success, -1 for error.
484
    // out_storage_type: if not null, will be set to the delete bitmap storage type.
485
    int decrement_delete_bitmap_packed_file_ref_counts(int64_t tablet_id,
486
                                                       const std::string& rowset_id,
487
                                                       DeleteBitmapStorageType* out_storage_type);
488
489
    int delete_packed_file_and_kv(const std::string& packed_file_path,
490
                                  const std::string& packed_key,
491
                                  const cloud::PackedFileInfoPB& packed_info);
492
493
    /**
494
     * Get stage storage info from instance and init StorageVaultAccessor
495
     * @return 0 if accessor is successfully inited, 1 if stage not found, negative for error
496
     */
497
    int init_copy_job_accessor(const std::string& stage_id, const StagePB::StageType& stage_type,
498
                               std::shared_ptr<StorageVaultAccessor>* accessor);
499
500
    void register_recycle_task(const std::string& task_name, int64_t start_time);
501
502
    void unregister_recycle_task(const std::string& task_name);
503
504
    // for scan all tablets and statistics metrics
505
    int scan_tablets_and_statistics(int64_t tablet_id, int64_t index_id,
506
                                    RecyclerMetricsContext& metrics_context,
507
                                    int64_t partition_id = -1, bool is_empty_tablet = false);
508
509
    // for scan all rs of tablet and statistics metrics
510
    int scan_tablet_and_statistics(int64_t tablet_id, RecyclerMetricsContext& metrics_context);
511
512
    // Recycle operation log and the log keys. The log keys are specified by `raw_keys`.
513
    //
514
    // Both `operation_log` and `raw_keys` will be removed in the same transaction, to ensure atomicity.
515
    int recycle_operation_log(Versionstamp log_version, const std::vector<std::string>& raw_keys,
516
                              OperationLogPB operation_log,
517
                              OplogRecycleStats* oplog_stats = nullptr);
518
519
    // Recycle rowset meta and data, return 0 for success otherwise error
520
    //
521
    // This function will decrease the rowset ref count and remove the rowset meta and data if the ref count is 1.
522
    int recycle_rowset_meta_and_data(const RowsetDeleteTask& task);
523
524
    // Classify rowset task by ref_count, return 0 to add to batch delete, 1 if handled (ref>1), -1 on error
525
    int classify_rowset_task_by_ref_count(RowsetDeleteTask& task,
526
                                          std::vector<RowsetDeleteTask>& batch_delete_tasks);
527
528
    // Cleanup metadata for deleted rowsets, return 0 for success otherwise error
529
    int cleanup_rowset_metadata(const std::vector<RowsetDeleteTask>& tasks);
530
531
    // Whether the instance has any snapshots, return 0 for success otherwise error.
532
    int has_cluster_snapshots(bool* any);
533
534
    // Whether need to recycle versioned keys
535
    bool should_recycle_versioned_keys() const;
536
537
    /**
538
     * Parse the path of a packed-file fragment and output the owning tablet and rowset identifiers.
539
     *
540
     * @param path packed-file fragment path to decode
541
     * @param tablet_id output tablet identifier extracted from the path
542
     * @param rowset_id output rowset identifier extracted from the path
543
     * @return true if both identifiers are successfully parsed, false otherwise
544
     */
545
    static bool parse_packed_slice_path(std::string_view path, int64_t* tablet_id,
546
                                        std::string* rowset_id);
547
    // Check whether a rowset referenced by a packed file still exists in metadata.
548
    // @param stats optional recycle statistics collector.
549
    int check_rowset_exists(int64_t tablet_id, const std::string& rowset_id, bool* exists,
550
                            PackedFileRecycleStats* stats = nullptr);
551
    int check_recycle_and_tmp_rowset_exists(int64_t tablet_id, const std::string& rowset_id,
552
                                            int64_t txn_id, bool* recycle_exists, bool* tmp_exists);
553
    /**
554
     * Resolve which storage accessor should be used for a packed file.
555
     *
556
     * @param hint preferred storage resource identifier persisted with the file
557
     * @return pair of the resolved resource identifier and accessor; the accessor can be null if unavailable
558
     */
559
    std::pair<std::string, std::shared_ptr<StorageVaultAccessor>> resolve_packed_file_accessor(
560
            const std::string& hint);
561
    // Recompute packed-file counters and lifecycle state after validating contained fragments.
562
    // @param stats optional recycle statistics collector.
563
    int correct_packed_file_info(cloud::PackedFileInfoPB* packed_info, bool* changed,
564
                                 const std::string& packed_file_path,
565
                                 PackedFileRecycleStats* stats = nullptr);
566
    // Correct and recycle a single packed-file record, updating metadata and accounting statistics.
567
    // @param stats optional recycle statistics collector.
568
    int process_single_packed_file(const std::string& packed_key,
569
                                   const std::string& packed_file_path,
570
                                   PackedFileRecycleStats* stats);
571
    // Process a packed-file KV while scanning and aggregate recycling statistics.
572
    int handle_packed_file_kv(std::string_view key, std::string_view value,
573
                              PackedFileRecycleStats* stats, int* ret);
574
575
    // Abort the transaction/job associated with a rowset that is about to be recycled.
576
    // This function is called during rowset recycling to prevent data loss by ensuring that
577
    // the transaction/job cannot be committed after its rowset data has been deleted.
578
    //
579
    // Scenario:
580
    // When recycler detects an expired prepared rowset (e.g., from a failed load transaction/job),
581
    // it needs to recycle the rowset data. However, if the transaction/job is still active and gets
582
    // committed after the data is deleted, it would lead to data loss - the transaction/job would
583
    // reference non-existent data.
584
    //
585
    // Solution:
586
    // Before recycling the rowset data, this function aborts the associated transaction/job to ensure
587
    // it cannot be committed. This guarantees that:
588
    // 1. The transaction/job state is marked as ABORTED
589
    // 2. Any subsequent commit_rowset/commit_txn attempts will fail
590
    // 3. The rowset data can be safely deleted without risk of data loss
591
    //
592
    // Parameters:
593
    //   txn_id: The transaction/job ID associated with the rowset to be recycled
594
    //
595
    // Returns:
596
    //   0 on success, -1 on failure
597
    int abort_txn_for_related_rowset(int64_t txn_id);
598
    int abort_job_for_related_rowset(const RowsetMetaCloudPB& rowset_meta);
599
600
    template <typename T>
601
    int batch_abort_txn_or_job_for_recycle(const std::vector<std::string>& keys,
602
                                           bool skip_base_version);
603
604
private:
605
    std::atomic_bool stopped_ {false};
606
    std::shared_ptr<TxnKv> txn_kv_;
607
    std::string instance_id_;
608
    InstanceInfoPB instance_info_;
609
610
    // TODO(plat1ko): Add new accessor to map in runtime for new created storage vaults
611
    std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>> accessor_map_;
612
    using InvertedIndexInfo =
613
            std::pair<InvertedIndexStorageFormatPB, std::vector<std::pair<int64_t, std::string>>>;
614
615
    class InvertedIndexIdCache;
616
    std::unique_ptr<InvertedIndexIdCache> inverted_index_id_cache_;
617
618
    std::mutex recycled_tablets_mtx_;
619
    // Store recycled tablets, we can skip deleting rowset data of these tablets because these data has already been deleted.
620
    std::unordered_set<int64_t> recycled_tablets_;
621
622
    std::mutex recycle_tasks_mutex;
623
    // <task_name, start_time>>
624
    std::map<std::string, int64_t> running_recycle_tasks;
625
626
    RecyclerThreadPoolGroup _thread_pool_group;
627
628
    std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
629
    std::shared_ptr<SnapshotManager> snapshot_manager_;
630
    std::shared_ptr<DeleteBitmapLockWhiteList> delete_bitmap_lock_white_list_;
631
    std::shared_ptr<ResourceManager> resource_mgr_;
632
633
    TabletRecyclerMetricsContext tablet_metrics_context_;
634
    SegmentRecyclerMetricsContext segment_metrics_context_;
635
};
636
637
struct OperationLogReferenceInfo {
638
    bool referenced_by_instance = false;
639
    bool referenced_by_snapshot = false;
640
    Versionstamp referenced_snapshot_timestamp;
641
};
642
643
struct OplogRecycleStats {
644
    // Total oplog count scanned per round
645
    std::atomic<int64_t> total_num {0};
646
    // Oplogs not recycled this round (per round, written to mBvarStatus)
647
    std::atomic<int64_t> not_recycled_num {0};
648
    // Recycle failures (per round, accumulated to mBvarIntAdder at end)
649
    std::atomic<int64_t> failed_num {0};
650
    // Per-oplog-type recycled counts (incremented after successful commit)
651
    std::atomic<int64_t> recycled_commit_partition {0};
652
    std::atomic<int64_t> recycled_drop_partition {0};
653
    std::atomic<int64_t> recycled_commit_index {0};
654
    std::atomic<int64_t> recycled_drop_index {0};
655
    std::atomic<int64_t> recycled_update_tablet {0};
656
    std::atomic<int64_t> recycled_compaction {0};
657
    std::atomic<int64_t> recycled_schema_change {0};
658
    std::atomic<int64_t> recycled_commit_txn {0};
659
};
660
661
// Helper class to check if operation logs can be recycled based on snapshots and versionstamps
662
class OperationLogRecycleChecker {
663
public:
664
    OperationLogRecycleChecker(std::string_view instance_id, TxnKv* txn_kv,
665
                               const InstanceInfoPB& instance_info)
666
35
            : instance_id_(instance_id), txn_kv_(txn_kv), instance_info_(instance_info) {}
667
668
    // Initialize the checker by loading snapshots and setting max version stamp
669
    int init();
670
671
    // Check if an operation log can be recycled
672
    bool can_recycle(const Versionstamp& log_versionstamp, int64_t log_min_timestamp,
673
                     OperationLogReferenceInfo* reference_info) const;
674
675
0
    Versionstamp max_versionstamp() const { return max_versionstamp_; }
676
677
28
    const std::vector<std::pair<SnapshotPB, Versionstamp>>& get_snapshots() const {
678
28
        return snapshots_;
679
28
    }
680
681
private:
682
    std::string_view instance_id_;
683
    TxnKv* txn_kv_;
684
    const InstanceInfoPB& instance_info_;
685
    Versionstamp max_versionstamp_;
686
    Versionstamp source_snapshot_versionstamp_;
687
    std::map<Versionstamp, size_t> snapshot_indexes_;
688
    std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots_;
689
};
690
691
class SnapshotDataSizeCalculator {
692
public:
693
    SnapshotDataSizeCalculator(std::string_view instance_id, std::shared_ptr<TxnKv> txn_kv)
694
29
            : instance_id_(instance_id), txn_kv_(std::move(txn_kv)) {}
695
696
    void init(const std::vector<std::pair<SnapshotPB, Versionstamp>>& snapshots);
697
698
    int calculate_operation_log_data_size(const std::string_view& log_key,
699
                                          OperationLogPB& operation_log,
700
                                          OperationLogReferenceInfo& reference_info);
701
702
    int save_snapshot_data_size_with_retry();
703
704
private:
705
    int get_all_index_partitions(int64_t db_id, int64_t table_id, int64_t index_id,
706
                                 std::vector<int64_t>* partition_ids);
707
    int get_index_partition_data_size(int64_t db_id, int64_t table_id, int64_t index_id,
708
                                      int64_t partition_id, int64_t* data_size);
709
    int save_operation_log(const std::string_view& log_key, OperationLogPB& operation_log);
710
    int save_snapshot_data_size();
711
712
    std::string_view instance_id_;
713
    std::shared_ptr<TxnKv> txn_kv_;
714
715
    int64_t instance_retained_data_size_ = 0;
716
    std::map<Versionstamp, int64_t> retained_data_size_;
717
    std::set<std::string> calculated_partitions_;
718
};
719
720
} // namespace doris::cloud