Coverage Report

Created: 2025-12-26 19:40

/root/doris/cloud/src/recycler/recycler.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/cloud.pb.h>
21
#include <glog/logging.h>
22
23
#include <atomic>
24
#include <condition_variable>
25
#include <cstdint>
26
#include <deque>
27
#include <functional>
28
#include <memory>
29
#include <string>
30
#include <string_view>
31
#include <thread>
32
#include <unordered_map>
33
#include <unordered_set>
34
#include <utility>
35
36
#include "common/bvars.h"
37
#include "meta-service/txn_lazy_committer.h"
38
#include "meta-store/versionstamp.h"
39
#include "recycler/snapshot_chain_compactor.h"
40
#include "recycler/snapshot_data_migrator.h"
41
#include "recycler/storage_vault_accessor.h"
42
#include "recycler/white_black_list.h"
43
#include "snapshot/snapshot_manager.h"
44
45
namespace brpc {
46
class Server;
47
} // namespace brpc
48
49
namespace doris::cloud {
50
class TxnKv;
51
class InstanceRecycler;
52
class StorageVaultAccessor;
53
class Checker;
54
class SimpleThreadPool;
55
class RecyclerMetricsContext;
56
class TabletRecyclerMetricsContext;
57
class SegmentRecyclerMetricsContext;
58
struct RecyclerThreadPoolGroup {
59
8
    RecyclerThreadPoolGroup() = default;
60
    RecyclerThreadPoolGroup(std::shared_ptr<SimpleThreadPool> s3_producer_pool,
61
                            std::shared_ptr<SimpleThreadPool> recycle_tablet_pool,
62
                            std::shared_ptr<SimpleThreadPool> group_recycle_function_pool)
63
            : s3_producer_pool(std::move(s3_producer_pool)),
64
              recycle_tablet_pool(std::move(recycle_tablet_pool)),
65
10
              group_recycle_function_pool(std::move(group_recycle_function_pool)) {}
66
248
    ~RecyclerThreadPoolGroup() = default;
67
115
    RecyclerThreadPoolGroup(const RecyclerThreadPoolGroup&) = default;
68
    RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup& other) = default;
69
10
    RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup&& other) = default;
70
115
    RecyclerThreadPoolGroup(RecyclerThreadPoolGroup&&) = default;
71
    // used for accessor.delete_files, accessor.delete_directory
72
    std::shared_ptr<SimpleThreadPool> s3_producer_pool;
73
    // used for InstanceRecycler::recycle_tablet
74
    std::shared_ptr<SimpleThreadPool> recycle_tablet_pool;
75
    std::shared_ptr<SimpleThreadPool> group_recycle_function_pool;
76
};
77
78
class Recycler {
79
public:
80
    explicit Recycler(std::shared_ptr<TxnKv> txn_kv);
81
    ~Recycler();
82
83
    // returns 0 for success otherwise error
84
    int start(brpc::Server* server);
85
86
    void stop();
87
88
285
    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
89
90
private:
91
    void recycle_callback();
92
93
    void instance_scanner_callback();
94
95
    void lease_recycle_jobs();
96
97
    void check_recycle_tasks();
98
99
private:
100
    friend class RecyclerServiceImpl;
101
102
    std::shared_ptr<TxnKv> txn_kv_;
103
    std::atomic_bool stopped_ {false};
104
105
    std::vector<std::thread> workers_;
106
107
    std::mutex mtx_;
108
    // notify recycle workers
109
    std::condition_variable pending_instance_cond_;
110
    std::deque<InstanceInfoPB> pending_instance_queue_;
111
    std::unordered_set<std::string> pending_instance_set_;
112
    std::unordered_map<std::string, std::shared_ptr<InstanceRecycler>> recycling_instance_map_;
113
    // notify instance scanner and lease thread
114
    std::condition_variable notifier_;
115
116
    std::string ip_port_;
117
118
    WhiteBlackList instance_filter_;
119
    std::unique_ptr<Checker> checker_;
120
121
    RecyclerThreadPoolGroup _thread_pool_group;
122
123
    std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
124
    std::shared_ptr<SnapshotManager> snapshot_manager_;
125
    std::shared_ptr<SnapshotDataMigrator> snapshot_data_migrator_;
126
    std::shared_ptr<SnapshotChainCompactor> snapshot_chain_compactor_;
127
};
128
129
enum class RowsetRecyclingState {
130
    FORMAL_ROWSET,
131
    TMP_ROWSET,
132
};
133
134
// Represents a single rowset deletion task for batch delete
135
struct RowsetDeleteTask {
136
    RowsetMetaCloudPB rowset_meta;
137
    std::string recycle_rowset_key;       // Primary key marking "pending recycle"
138
    std::string non_versioned_rowset_key; // Legacy non-versioned rowset meta key
139
    std::string versioned_rowset_key;     // Versioned meta rowset key
140
    std::string rowset_ref_count_key;
141
};
142
143
class RecyclerMetricsContext {
144
public:
145
9
    RecyclerMetricsContext() = default;
146
147
    RecyclerMetricsContext(std::string instance_id, std::string operation_type)
148
416
            : operation_type(std::move(operation_type)), instance_id(std::move(instance_id)) {
149
416
        start();
150
416
    }
151
152
424
    ~RecyclerMetricsContext() = default;
153
154
    std::atomic_ullong total_need_recycle_data_size = 0;
155
    std::atomic_ullong total_need_recycle_num = 0;
156
157
    std::atomic_ullong total_recycled_data_size = 0;
158
    std::atomic_ullong total_recycled_num = 0;
159
160
    std::string operation_type;
161
    std::string instance_id;
162
163
    double start_time = 0;
164
165
416
    void start() {
166
416
        start_time = duration_cast<std::chrono::milliseconds>(
167
416
                             std::chrono::system_clock::now().time_since_epoch())
168
416
                             .count();
169
416
    }
170
171
194
    double duration() const {
172
194
        return duration_cast<std::chrono::milliseconds>(
173
194
                       std::chrono::system_clock::now().time_since_epoch())
174
194
                       .count() -
175
194
               start_time;
176
194
    }
177
178
20
    void reset() {
179
20
        total_need_recycle_data_size = 0;
180
20
        total_need_recycle_num = 0;
181
20
        total_recycled_data_size = 0;
182
20
        total_recycled_num = 0;
183
20
        start_time = duration_cast<std::chrono::milliseconds>(
184
20
                             std::chrono::system_clock::now().time_since_epoch())
185
20
                             .count();
186
20
    }
187
188
194
    void finish_report() {
189
194
        if (!operation_type.empty()) {
190
194
            double cost = duration();
191
194
            g_bvar_recycler_instance_last_round_recycle_elpased_ts.put(
192
194
                    {instance_id, operation_type}, cost);
193
194
            g_bvar_recycler_instance_recycle_round.put({instance_id, operation_type}, 1);
194
194
            LOG(INFO) << "recycle instance: " << instance_id
195
194
                      << ", operation type: " << operation_type << ", cost: " << cost
196
194
                      << " ms, total recycled num: " << total_recycled_num.load()
197
194
                      << ", total recycled data size: " << total_recycled_data_size.load()
198
194
                      << " bytes";
199
194
            if (cost != 0) {
200
181
                if (total_recycled_num.load() != 0) {
201
32
                    g_bvar_recycler_instance_recycle_time_per_resource.put(
202
32
                            {instance_id, operation_type}, cost / total_recycled_num.load());
203
32
                }
204
181
                g_bvar_recycler_instance_recycle_bytes_per_ms.put(
205
181
                        {instance_id, operation_type}, total_recycled_data_size.load() / cost);
206
181
            }
207
194
        }
208
194
    }
209
210
    // `is_begin` is used to initialize total num of items need to be recycled
211
24.6k
    void report(bool is_begin = false) {
212
24.6k
        if (!operation_type.empty()) {
213
            // is init
214
24.6k
            if (is_begin) {
215
0
                auto value = total_need_recycle_num.load();
216
217
0
                g_bvar_recycler_instance_last_round_to_recycle_bytes.put(
218
0
                        {instance_id, operation_type}, total_need_recycle_data_size.load());
219
0
                g_bvar_recycler_instance_last_round_to_recycle_num.put(
220
0
                        {instance_id, operation_type}, value);
221
24.6k
            } else {
222
24.6k
                g_bvar_recycler_instance_last_round_recycled_bytes.put(
223
24.6k
                        {instance_id, operation_type}, total_recycled_data_size.load());
224
24.6k
                g_bvar_recycler_instance_recycle_total_bytes_since_started.put(
225
24.6k
                        {instance_id, operation_type}, total_recycled_data_size.load());
226
24.6k
                g_bvar_recycler_instance_last_round_recycled_num.put({instance_id, operation_type},
227
24.6k
                                                                     total_recycled_num.load());
228
24.6k
                g_bvar_recycler_instance_recycle_total_num_since_started.put(
229
24.6k
                        {instance_id, operation_type}, total_recycled_num.load());
230
24.6k
            }
231
24.6k
        }
232
24.6k
    }
233
};
234
235
class TabletRecyclerMetricsContext : public RecyclerMetricsContext {
236
public:
237
115
    TabletRecyclerMetricsContext() : RecyclerMetricsContext("global_recycler", "recycle_tablet") {}
238
};
239
240
class SegmentRecyclerMetricsContext : public RecyclerMetricsContext {
241
public:
242
    SegmentRecyclerMetricsContext()
243
115
            : RecyclerMetricsContext("global_recycler", "recycle_segment") {}
244
};
245
246
class InstanceRecycler {
247
public:
248
    struct PackedFileRecycleStats {
249
        int64_t num_scanned = 0;          // packed-file kv scanned
250
        int64_t num_corrected = 0;        // packed-file kv corrected
251
        int64_t num_deleted = 0;          // packed-file kv deleted
252
        int64_t num_failed = 0;           // packed-file kv failed
253
        int64_t bytes_deleted = 0;        // packed-file kv bytes deleted from txn-kv
254
        int64_t num_object_deleted = 0;   // packed-file objects deleted from storage (vault/HDFS)
255
        int64_t bytes_object_deleted = 0; // bytes deleted from storage objects
256
        int64_t rowset_scan_count = 0;    // rowset metas scanned during correction
257
    };
258
259
    explicit InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const InstanceInfoPB& instance,
260
                              RecyclerThreadPoolGroup thread_pool_group,
261
                              std::shared_ptr<TxnLazyCommitter> txn_lazy_committer);
262
    ~InstanceRecycler();
263
264
0
    std::string_view instance_id() const { return instance_id_; }
265
3
    const InstanceInfoPB& instance_info() const { return instance_info_; }
266
267
    // returns 0 for success otherwise error
268
    int init();
269
270
0
    void stop() { stopped_.store(true, std::memory_order_release); }
271
55
    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
272
273
    // returns 0 for success otherwise error
274
    int do_recycle();
275
276
    // remove all kv and data in this instance, ONLY be called when instance has been deleted
277
    // returns 0 for success otherwise error
278
    int recycle_deleted_instance();
279
280
    // scan and recycle expired indexes:
281
    // 1. dropped table, dropped mv
282
    // 2. half-successtable/index when create
283
    // returns 0 for success otherwise error
284
    int recycle_indexes();
285
286
    // scan and recycle expired partitions:
287
    // 1. dropped parttion
288
    // 2. half-success partition when create
289
    // returns 0 for success otherwise error
290
    int recycle_partitions();
291
292
    // scan and recycle expired rowsets:
293
    // 1. prepare_rowset will produce recycle_rowset before uploading data to remote storage (memo)
294
    // 2. compaction will change the input rowsets to recycle_rowset
295
    // returns 0 for success otherwise error
296
    int recycle_rowsets();
297
298
    // like `recycle_rowsets`, but for versioned rowsets.
299
    int recycle_versioned_rowsets();
300
301
    // scan and recycle expired tmp rowsets:
302
    // 1. commit_rowset will produce tmp_rowset when finish upload data (load or compaction) to remote storage
303
    // returns 0 for success otherwise error
304
    int recycle_tmp_rowsets();
305
306
    /**
307
     * recycle all tablets belonging to the index specified by `index_id`
308
     *
309
     * @param partition_id if positive, only recycle tablets in this partition belonging to the specified index
310
     * @return 0 for success otherwise error
311
     */
312
    int recycle_tablets(int64_t table_id, int64_t index_id, RecyclerMetricsContext& ctx,
313
                        int64_t partition_id = -1);
314
315
    /**
316
     * recycle all rowsets belonging to the tablet specified by `tablet_id`
317
     *
318
     * @return 0 for success otherwise error
319
     */
320
    int recycle_tablet(int64_t tablet_id, RecyclerMetricsContext& metrics_context);
321
322
    /**
323
     * like `recycle_tablet`, but for versioned tablet
324
     */
325
    int recycle_versioned_tablet(int64_t tablet_id, RecyclerMetricsContext& metrics_context);
326
327
    // scan and recycle useless partition version kv
328
    int recycle_versions();
329
330
    // scan and recycle the orphan partitions
331
    int recycle_orphan_partitions();
332
333
    // scan and abort timeout txn label
334
    // returns 0 for success otherwise error
335
    int abort_timeout_txn();
336
337
    //scan and recycle expire txn label
338
    // returns 0 for success otherwise error
339
    int recycle_expired_txn_label();
340
341
    // scan and recycle finished or timeout copy jobs
342
    // returns 0 for success otherwise error
343
    int recycle_copy_jobs();
344
345
    // scan and recycle dropped internal stage
346
    // returns 0 for success otherwise error
347
    int recycle_stage();
348
349
    // scan and recycle expired stage objects
350
    // returns 0 for success otherwise error
351
    int recycle_expired_stage_objects();
352
353
    // scan and recycle operation logs
354
    // returns 0 for success otherwise error
355
    int recycle_operation_logs();
356
357
    // scan and recycle expired restore jobs
358
    // returns 0 for success otherwise error
359
    int recycle_restore_jobs();
360
361
    /**
362
     * Scan packed-file metadata, correct reference counters, and recycle unused packed files.
363
     *
364
     * @return 0 on success, non-zero error code otherwise
365
     */
366
    int recycle_packed_files();
367
368
    // scan and recycle snapshots
369
    // returns 0 for success otherwise error
370
    int recycle_cluster_snapshots();
371
372
    // scan and recycle ref rowsets for deleted instance
373
    // returns 0 for success otherwise error
374
    int recycle_ref_rowsets(bool* has_unrecycled_rowsets);
375
376
    bool check_recycle_tasks();
377
378
    int scan_and_statistics_indexes();
379
380
    int scan_and_statistics_partitions();
381
382
    int scan_and_statistics_rowsets();
383
384
    int scan_and_statistics_tmp_rowsets();
385
386
    int scan_and_statistics_abort_timeout_txn();
387
388
    int scan_and_statistics_expired_txn_label();
389
390
    int scan_and_statistics_copy_jobs();
391
392
    int scan_and_statistics_stage();
393
394
    int scan_and_statistics_expired_stage_objects();
395
396
    int scan_and_statistics_versions();
397
398
    int scan_and_statistics_restore_jobs();
399
400
    /**
401
     * Decode the key of a packed-file metadata record into the persisted object path.
402
     *
403
     * @param key raw key persisted in txn-kv
404
     * @param packed_path output object storage path referenced by the key
405
     * @return true if decoding succeeds, false otherwise
406
     */
407
    static bool decode_packed_file_key(std::string_view key, std::string* packed_path);
408
409
17
    void TEST_add_accessor(std::string_view id, std::shared_ptr<StorageVaultAccessor> accessor) {
410
17
        accessor_map_.insert({std::string(id), std::move(accessor)});
411
17
    }
412
413
    // Recycle snapshot meta and data, return 0 for success otherwise error.
414
    int recycle_snapshot_meta_and_data(const std::string& resource_id,
415
                                       Versionstamp snapshot_version,
416
                                       const SnapshotPB& snapshot_pb);
417
418
private:
419
    // returns 0 for success otherwise error
420
    int init_obj_store_accessors();
421
422
    // returns 0 for success otherwise error
423
    int init_storage_vault_accessors();
424
425
    /**
426
     * Scan key-value pairs between [`begin`, `end`), and perform `recycle_func` on each key-value pair.
427
     *
428
     * @param recycle_func defines how to recycle resources corresponding to a key-value pair. Returns 0 if the recycling is successful.
429
     * @param loop_done is called after `RangeGetIterator` has no next kv. Usually used to perform a batch recycling. Returns 0 if success. 
430
     * @return 0 if all corresponding resources are recycled successfully, otherwise non-zero
431
     */
432
    int scan_and_recycle(std::string begin, std::string_view end,
433
                         std::function<int(std::string_view k, std::string_view v)> recycle_func,
434
                         std::function<int()> loop_done = nullptr);
435
436
    // return 0 for success otherwise error
437
    int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb);
438
439
    // return 0 for success otherwise error
440
    // NOTE: this function ONLY be called when the file paths cannot be calculated
441
    int delete_rowset_data(const std::string& resource_id, int64_t tablet_id,
442
                           const std::string& rowset_id);
443
444
    // return 0 for success otherwise error
445
    int delete_rowset_data(const std::map<std::string, doris::RowsetMetaCloudPB>& rowsets,
446
                           RowsetRecyclingState type, RecyclerMetricsContext& metrics_context);
447
448
    // return 0 for success otherwise error
449
    int decrement_packed_file_ref_counts(const doris::RowsetMetaCloudPB& rs_meta_pb);
450
451
    int delete_packed_file_and_kv(const std::string& packed_file_path,
452
                                  const std::string& packed_key,
453
                                  const cloud::PackedFileInfoPB& packed_info);
454
455
    /**
456
     * Get stage storage info from instance and init StorageVaultAccessor
457
     * @return 0 if accessor is successfully inited, 1 if stage not found, negative for error
458
     */
459
    int init_copy_job_accessor(const std::string& stage_id, const StagePB::StageType& stage_type,
460
                               std::shared_ptr<StorageVaultAccessor>* accessor);
461
462
    void register_recycle_task(const std::string& task_name, int64_t start_time);
463
464
    void unregister_recycle_task(const std::string& task_name);
465
466
    // for scan all tablets and statistics metrics
467
    int scan_tablets_and_statistics(int64_t tablet_id, int64_t index_id,
468
                                    RecyclerMetricsContext& metrics_context,
469
                                    int64_t partition_id = -1, bool is_empty_tablet = false);
470
471
    // for scan all rs of tablet and statistics metrics
472
    int scan_tablet_and_statistics(int64_t tablet_id, RecyclerMetricsContext& metrics_context);
473
474
    // Recycle operation log and the log keys. The log keys are specified by `raw_keys`.
475
    //
476
    // Both `operation_log` and `raw_keys` will be removed in the same transaction, to ensure atomicity.
477
    int recycle_operation_log(Versionstamp log_version, const std::vector<std::string>& raw_keys,
478
                              OperationLogPB operation_log);
479
480
    // Recycle rowset meta and data, return 0 for success otherwise error
481
    //
482
    // Both recycle_rowset_key and non_versioned_rowset_key will be removed in the same transaction.
483
    //
484
    // This function will decrease the rowset ref count and remove the rowset meta and data if the ref count is 1.
485
    int recycle_rowset_meta_and_data(std::string_view recycle_rowset_key,
486
                                     const RowsetMetaCloudPB& rowset_meta,
487
                                     std::string_view non_versioned_rowset_key = "");
488
489
    // Classify rowset task by ref_count, return 0 to add to batch delete, 1 if handled (ref>1), -1 on error
490
    int classify_rowset_task_by_ref_count(RowsetDeleteTask& task,
491
                                          std::vector<RowsetDeleteTask>& batch_delete_tasks);
492
493
    // Cleanup metadata for deleted rowsets, return 0 for success otherwise error
494
    int cleanup_rowset_metadata(const std::vector<RowsetDeleteTask>& tasks);
495
496
    // Whether the instance has any snapshots, return 0 for success otherwise error.
497
    int has_cluster_snapshots(bool* any);
498
499
    // Whether need to recycle versioned keys
500
    bool should_recycle_versioned_keys() const;
501
502
    /**
503
     * Parse the path of a packed-file fragment and output the owning tablet and rowset identifiers.
504
     *
505
     * @param path packed-file fragment path to decode
506
     * @param tablet_id output tablet identifier extracted from the path
507
     * @param rowset_id output rowset identifier extracted from the path
508
     * @return true if both identifiers are successfully parsed, false otherwise
509
     */
510
    static bool parse_packed_slice_path(std::string_view path, int64_t* tablet_id,
511
                                        std::string* rowset_id);
512
    // Check whether a rowset referenced by a packed file still exists in metadata.
513
    // @param stats optional recycle statistics collector.
514
    int check_rowset_exists(int64_t tablet_id, const std::string& rowset_id, bool* exists,
515
                            PackedFileRecycleStats* stats = nullptr);
516
    int check_recycle_and_tmp_rowset_exists(int64_t tablet_id, const std::string& rowset_id,
517
                                            int64_t txn_id, bool* recycle_exists, bool* tmp_exists);
518
    /**
519
     * Resolve which storage accessor should be used for a packed file.
520
     *
521
     * @param hint preferred storage resource identifier persisted with the file
522
     * @return pair of the resolved resource identifier and accessor; the accessor can be null if unavailable
523
     */
524
    std::pair<std::string, std::shared_ptr<StorageVaultAccessor>> resolve_packed_file_accessor(
525
            const std::string& hint);
526
    // Recompute packed-file counters and lifecycle state after validating contained fragments.
527
    // @param stats optional recycle statistics collector.
528
    int correct_packed_file_info(cloud::PackedFileInfoPB* packed_info, bool* changed,
529
                                 const std::string& packed_file_path,
530
                                 PackedFileRecycleStats* stats = nullptr);
531
    // Correct and recycle a single packed-file record, updating metadata and accounting statistics.
532
    // @param stats optional recycle statistics collector.
533
    int process_single_packed_file(const std::string& packed_key,
534
                                   const std::string& packed_file_path,
535
                                   PackedFileRecycleStats* stats);
536
    // Process a packed-file KV while scanning and aggregate recycling statistics.
537
    int handle_packed_file_kv(std::string_view key, std::string_view value,
538
                              PackedFileRecycleStats* stats, int* ret);
539
540
private:
541
    std::atomic_bool stopped_ {false};
542
    std::shared_ptr<TxnKv> txn_kv_;
543
    std::string instance_id_;
544
    InstanceInfoPB instance_info_;
545
546
    // TODO(plat1ko): Add new accessor to map in runtime for new created storage vaults
547
    std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>> accessor_map_;
548
    using InvertedIndexInfo =
549
            std::pair<InvertedIndexStorageFormatPB, std::vector<std::pair<int64_t, std::string>>>;
550
551
    class InvertedIndexIdCache;
552
    std::unique_ptr<InvertedIndexIdCache> inverted_index_id_cache_;
553
554
    std::mutex recycled_tablets_mtx_;
555
    // Store recycled tablets, we can skip deleting rowset data of these tablets because these data has already been deleted.
556
    std::unordered_set<int64_t> recycled_tablets_;
557
558
    std::mutex recycle_tasks_mutex;
559
    // <task_name, start_time>>
560
    std::map<std::string, int64_t> running_recycle_tasks;
561
562
    RecyclerThreadPoolGroup _thread_pool_group;
563
564
    std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
565
    std::shared_ptr<SnapshotManager> snapshot_manager_;
566
567
    TabletRecyclerMetricsContext tablet_metrics_context_;
568
    SegmentRecyclerMetricsContext segment_metrics_context_;
569
};
570
571
struct OperationLogReferenceInfo {
572
    bool referenced_by_instance = false;
573
    bool referenced_by_snapshot = false;
574
    Versionstamp referenced_snapshot_timestamp;
575
};
576
577
// Helper class to check if operation logs can be recycled based on snapshots and versionstamps
578
class OperationLogRecycleChecker {
579
public:
580
    OperationLogRecycleChecker(std::string_view instance_id, TxnKv* txn_kv,
581
                               const InstanceInfoPB& instance_info)
582
27
            : instance_id_(instance_id), txn_kv_(txn_kv), instance_info_(instance_info) {}
583
584
    // Initialize the checker by loading snapshots and setting max version stamp
585
    int init();
586
587
    // Check if an operation log can be recycled
588
    bool can_recycle(const Versionstamp& log_versionstamp, int64_t log_min_timestamp,
589
                     OperationLogReferenceInfo* reference_info) const;
590
591
0
    Versionstamp max_versionstamp() const { return max_versionstamp_; }
592
593
23
    const std::vector<std::pair<SnapshotPB, Versionstamp>>& get_snapshots() const {
594
23
        return snapshots_;
595
23
    }
596
597
private:
598
    std::string_view instance_id_;
599
    TxnKv* txn_kv_;
600
    const InstanceInfoPB& instance_info_;
601
    Versionstamp max_versionstamp_;
602
    Versionstamp source_snapshot_versionstamp_;
603
    std::map<Versionstamp, size_t> snapshot_indexes_;
604
    std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots_;
605
};
606
607
class SnapshotDataSizeCalculator {
608
public:
609
    SnapshotDataSizeCalculator(std::string_view instance_id, std::shared_ptr<TxnKv> txn_kv)
610
24
            : instance_id_(instance_id), txn_kv_(std::move(txn_kv)) {}
611
612
    void init(const std::vector<std::pair<SnapshotPB, Versionstamp>>& snapshots);
613
614
    int calculate_operation_log_data_size(const std::string_view& log_key,
615
                                          OperationLogPB& operation_log,
616
                                          OperationLogReferenceInfo& reference_info);
617
618
    int save_snapshot_data_size_with_retry();
619
620
private:
621
    int get_all_index_partitions(int64_t db_id, int64_t table_id, int64_t index_id,
622
                                 std::vector<int64_t>* partition_ids);
623
    int get_index_partition_data_size(int64_t db_id, int64_t table_id, int64_t index_id,
624
                                      int64_t partition_id, int64_t* data_size);
625
    int save_operation_log(const std::string_view& log_key, OperationLogPB& operation_log);
626
    int save_snapshot_data_size();
627
628
    std::string_view instance_id_;
629
    std::shared_ptr<TxnKv> txn_kv_;
630
631
    int64_t instance_retained_data_size_ = 0;
632
    std::map<Versionstamp, int64_t> retained_data_size_;
633
    std::set<std::string> calculated_partitions_;
634
};
635
636
} // namespace doris::cloud