Coverage Report

Created: 2025-10-17 16:23

/root/doris/cloud/src/recycler/recycler.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/cloud.pb.h>
21
#include <glog/logging.h>
22
23
#include <atomic>
24
#include <condition_variable>
25
#include <cstdint>
26
#include <deque>
27
#include <functional>
28
#include <memory>
29
#include <string>
30
#include <string_view>
31
#include <thread>
32
#include <utility>
33
34
#include "common/bvars.h"
35
#include "meta-service/txn_lazy_committer.h"
36
#include "meta-store/versionstamp.h"
37
#include "recycler/snapshot_chain_compactor.h"
38
#include "recycler/snapshot_data_migrator.h"
39
#include "recycler/storage_vault_accessor.h"
40
#include "recycler/white_black_list.h"
41
#include "snapshot/snapshot_manager.h"
42
43
namespace brpc {
44
class Server;
45
} // namespace brpc
46
47
namespace doris::cloud {
48
class TxnKv;
49
class InstanceRecycler;
50
class StorageVaultAccessor;
51
class Checker;
52
class SimpleThreadPool;
53
class RecyclerMetricsContext;
54
class TabletRecyclerMetricsContext;
55
class SegmentRecyclerMetricsContext;
56
struct RecyclerThreadPoolGroup {
57
8
    RecyclerThreadPoolGroup() = default;
58
    RecyclerThreadPoolGroup(std::shared_ptr<SimpleThreadPool> s3_producer_pool,
59
                            std::shared_ptr<SimpleThreadPool> recycle_tablet_pool,
60
                            std::shared_ptr<SimpleThreadPool> group_recycle_function_pool)
61
            : s3_producer_pool(std::move(s3_producer_pool)),
62
              recycle_tablet_pool(std::move(recycle_tablet_pool)),
63
10
              group_recycle_function_pool(std::move(group_recycle_function_pool)) {}
64
220
    ~RecyclerThreadPoolGroup() = default;
65
101
    RecyclerThreadPoolGroup(const RecyclerThreadPoolGroup&) = default;
66
    RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup& other) = default;
67
10
    RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup&& other) = default;
68
101
    RecyclerThreadPoolGroup(RecyclerThreadPoolGroup&&) = default;
69
    // used for accessor.delete_files, accessor.delete_directory
70
    std::shared_ptr<SimpleThreadPool> s3_producer_pool;
71
    // used for InstanceRecycler::recycle_tablet
72
    std::shared_ptr<SimpleThreadPool> recycle_tablet_pool;
73
    std::shared_ptr<SimpleThreadPool> group_recycle_function_pool;
74
};
75
76
class Recycler {
77
public:
78
    explicit Recycler(std::shared_ptr<TxnKv> txn_kv);
79
    ~Recycler();
80
81
    // returns 0 for success otherwise error
82
    int start(brpc::Server* server);
83
84
    void stop();
85
86
288
    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
87
88
private:
89
    void recycle_callback();
90
91
    void instance_scanner_callback();
92
93
    void lease_recycle_jobs();
94
95
    void check_recycle_tasks();
96
97
private:
98
    friend class RecyclerServiceImpl;
99
100
    std::shared_ptr<TxnKv> txn_kv_;
101
    std::atomic_bool stopped_ {false};
102
103
    std::vector<std::thread> workers_;
104
105
    std::mutex mtx_;
106
    // notify recycle workers
107
    std::condition_variable pending_instance_cond_;
108
    std::deque<InstanceInfoPB> pending_instance_queue_;
109
    std::unordered_set<std::string> pending_instance_set_;
110
    std::unordered_map<std::string, std::shared_ptr<InstanceRecycler>> recycling_instance_map_;
111
    // notify instance scanner and lease thread
112
    std::condition_variable notifier_;
113
114
    std::string ip_port_;
115
116
    WhiteBlackList instance_filter_;
117
    std::unique_ptr<Checker> checker_;
118
119
    RecyclerThreadPoolGroup _thread_pool_group;
120
121
    std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
122
    std::shared_ptr<SnapshotManager> snapshot_manager_;
123
    std::shared_ptr<SnapshotDataMigrator> snapshot_data_migrator_;
124
    std::shared_ptr<SnapshotChainCompactor> snapshot_chain_compactor_;
125
};
126
127
enum class RowsetRecyclingState {
128
    FORMAL_ROWSET,
129
    TMP_ROWSET,
130
};
131
132
class RecyclerMetricsContext {
133
public:
134
4
    RecyclerMetricsContext() = default;
135
136
    RecyclerMetricsContext(std::string instance_id, std::string operation_type)
137
375
            : operation_type(std::move(operation_type)), instance_id(std::move(instance_id)) {
138
375
        start();
139
375
    }
140
141
378
    ~RecyclerMetricsContext() = default;
142
143
    std::atomic_ullong total_need_recycle_data_size = 0;
144
    std::atomic_ullong total_need_recycle_num = 0;
145
146
    std::atomic_ullong total_recycled_data_size = 0;
147
    std::atomic_ullong total_recycled_num = 0;
148
149
    std::string operation_type;
150
    std::string instance_id;
151
152
    double start_time = 0;
153
154
375
    void start() {
155
375
        start_time = duration_cast<std::chrono::milliseconds>(
156
375
                             std::chrono::system_clock::now().time_since_epoch())
157
375
                             .count();
158
375
    }
159
160
191
    double duration() const {
161
191
        return duration_cast<std::chrono::milliseconds>(
162
191
                       std::chrono::system_clock::now().time_since_epoch())
163
191
                       .count() -
164
191
               start_time;
165
191
    }
166
167
20
    void reset() {
168
20
        total_need_recycle_data_size = 0;
169
20
        total_need_recycle_num = 0;
170
20
        total_recycled_data_size = 0;
171
20
        total_recycled_num = 0;
172
20
        start_time = duration_cast<std::chrono::milliseconds>(
173
20
                             std::chrono::system_clock::now().time_since_epoch())
174
20
                             .count();
175
20
    }
176
177
0
    void finish_report() {
178
0
        if (!operation_type.empty()) {
179
0
            double cost = duration();
180
0
            g_bvar_recycler_instance_last_round_recycle_elpased_ts.put(
181
0
                    {instance_id, operation_type}, cost);
182
0
            g_bvar_recycler_instance_recycle_round.put({instance_id, operation_type}, 1);
183
0
            LOG(INFO) << "recycle instance: " << instance_id
184
0
                      << ", operation type: " << operation_type << ", cost: " << cost
185
0
                      << " ms, total recycled num: " << total_recycled_num.load()
186
0
                      << ", total recycled data size: " << total_recycled_data_size.load()
187
0
                      << " bytes";
188
0
            if (cost != 0) {
189
0
                if (total_recycled_num.load() != 0) {
190
0
                    g_bvar_recycler_instance_recycle_time_per_resource.put(
191
0
                            {instance_id, operation_type}, cost / total_recycled_num.load());
192
0
                }
193
0
                g_bvar_recycler_instance_recycle_bytes_per_ms.put(
194
0
                        {instance_id, operation_type}, total_recycled_data_size.load() / cost);
195
0
            }
196
0
        }
197
0
    }
198
199
    // `is_begin` is used to initialize total num of items need to be recycled
200
24.5k
    void report(bool is_begin = false) {
201
24.5k
        if (!operation_type.empty()) {
202
            // is init
203
24.5k
            if (is_begin) {
204
0
                auto value = total_need_recycle_num.load();
205
206
0
                g_bvar_recycler_instance_last_round_to_recycle_bytes.put(
207
0
                        {instance_id, operation_type}, total_need_recycle_data_size.load());
208
0
                g_bvar_recycler_instance_last_round_to_recycle_num.put(
209
0
                        {instance_id, operation_type}, value);
210
24.5k
            } else {
211
24.5k
                g_bvar_recycler_instance_last_round_recycled_bytes.put(
212
24.5k
                        {instance_id, operation_type}, total_recycled_data_size.load());
213
24.5k
                g_bvar_recycler_instance_recycle_total_bytes_since_started.put(
214
24.5k
                        {instance_id, operation_type}, total_recycled_data_size.load());
215
24.5k
                g_bvar_recycler_instance_last_round_recycled_num.put({instance_id, operation_type},
216
24.5k
                                                                     total_recycled_num.load());
217
24.5k
                g_bvar_recycler_instance_recycle_total_num_since_started.put(
218
24.5k
                        {instance_id, operation_type}, total_recycled_num.load());
219
24.5k
            }
220
24.5k
        }
221
24.5k
    }
222
};
223
224
class TabletRecyclerMetricsContext : public RecyclerMetricsContext {
225
public:
226
101
    TabletRecyclerMetricsContext() : RecyclerMetricsContext("global_recycler", "recycle_tablet") {}
227
};
228
229
class SegmentRecyclerMetricsContext : public RecyclerMetricsContext {
230
public:
231
    SegmentRecyclerMetricsContext()
232
101
            : RecyclerMetricsContext("global_recycler", "recycle_segment") {}
233
};
234
235
class InstanceRecycler {
236
public:
237
    explicit InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const InstanceInfoPB& instance,
238
                              RecyclerThreadPoolGroup thread_pool_group,
239
                              std::shared_ptr<TxnLazyCommitter> txn_lazy_committer);
240
    ~InstanceRecycler();
241
242
0
    std::string_view instance_id() const { return instance_id_; }
243
0
    const InstanceInfoPB& instance_info() const { return instance_info_; }
244
245
    // returns 0 for success otherwise error
246
    int init();
247
248
0
    void stop() { stopped_.store(true, std::memory_order_release); }
249
51
    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
250
251
    // returns 0 for success otherwise error
252
    int do_recycle();
253
254
    // remove all kv and data in this instance, ONLY be called when instance has been deleted
255
    // returns 0 for success otherwise error
256
    int recycle_deleted_instance();
257
258
    // scan and recycle expired indexes:
259
    // 1. dropped table, dropped mv
260
    // 2. half-successtable/index when create
261
    // returns 0 for success otherwise error
262
    int recycle_indexes();
263
264
    // scan and recycle expired partitions:
265
    // 1. dropped parttion
266
    // 2. half-success partition when create
267
    // returns 0 for success otherwise error
268
    int recycle_partitions();
269
270
    // scan and recycle expired rowsets:
271
    // 1. prepare_rowset will produce recycle_rowset before uploading data to remote storage (memo)
272
    // 2. compaction will change the input rowsets to recycle_rowset
273
    // returns 0 for success otherwise error
274
    int recycle_rowsets();
275
276
    // like `recycle_rowsets`, but for versioned rowsets.
277
    int recycle_versioned_rowsets();
278
279
    // scan and recycle expired tmp rowsets:
280
    // 1. commit_rowset will produce tmp_rowset when finish upload data (load or compaction) to remote storage
281
    // returns 0 for success otherwise error
282
    int recycle_tmp_rowsets();
283
284
    /**
285
     * recycle all tablets belonging to the index specified by `index_id`
286
     *
287
     * @param partition_id if positive, only recycle tablets in this partition belonging to the specified index
288
     * @return 0 for success otherwise error
289
     */
290
    int recycle_tablets(int64_t table_id, int64_t index_id, RecyclerMetricsContext& ctx,
291
                        int64_t partition_id = -1);
292
293
    /**
294
     * recycle all rowsets belonging to the tablet specified by `tablet_id`
295
     *
296
     * @return 0 for success otherwise error
297
     */
298
    int recycle_tablet(int64_t tablet_id, RecyclerMetricsContext& metrics_context);
299
300
    /**
301
     * like `recycle_tablet`, but for versioned tablet
302
     */
303
    int recycle_versioned_tablet(int64_t tablet_id, RecyclerMetricsContext& metrics_context);
304
305
    // scan and recycle useless partition version kv
306
    int recycle_versions();
307
308
    // scan and recycle the orphan partitions
309
    int recycle_orphan_partitions();
310
311
    // scan and abort timeout txn label
312
    // returns 0 for success otherwise error
313
    int abort_timeout_txn();
314
315
    //scan and recycle expire txn label
316
    // returns 0 for success otherwise error
317
    int recycle_expired_txn_label();
318
319
    // scan and recycle finished or timeout copy jobs
320
    // returns 0 for success otherwise error
321
    int recycle_copy_jobs();
322
323
    // scan and recycle dropped internal stage
324
    // returns 0 for success otherwise error
325
    int recycle_stage();
326
327
    // scan and recycle expired stage objects
328
    // returns 0 for success otherwise error
329
    int recycle_expired_stage_objects();
330
331
    // scan and recycle operation logs
332
    // returns 0 for success otherwise error
333
    int recycle_operation_logs();
334
335
    // scan and recycle expired restore jobs
336
    // returns 0 for success otherwise error
337
    int recycle_restore_jobs();
338
339
    // scan and recycle snapshots
340
    // returns 0 for success otherwise error
341
    int recycle_cluster_snapshots();
342
343
    bool check_recycle_tasks();
344
345
    int scan_and_statistics_indexes();
346
347
    int scan_and_statistics_partitions();
348
349
    int scan_and_statistics_rowsets();
350
351
    int scan_and_statistics_tmp_rowsets();
352
353
    int scan_and_statistics_abort_timeout_txn();
354
355
    int scan_and_statistics_expired_txn_label();
356
357
    int scan_and_statistics_copy_jobs();
358
359
    int scan_and_statistics_stage();
360
361
    int scan_and_statistics_expired_stage_objects();
362
363
    int scan_and_statistics_versions();
364
365
    int scan_and_statistics_restore_jobs();
366
367
12
    void TEST_add_accessor(std::string_view id, std::shared_ptr<StorageVaultAccessor> accessor) {
368
12
        accessor_map_.insert({std::string(id), std::move(accessor)});
369
12
    }
370
371
    // Recycle snapshot meta and data, return 0 for success otherwise error.
372
    int recycle_snapshot_meta_and_data(const std::string& resource_id,
373
                                       Versionstamp snapshot_version,
374
                                       const SnapshotPB& snapshot_pb);
375
376
private:
377
    // returns 0 for success otherwise error
378
    int init_obj_store_accessors();
379
380
    // returns 0 for success otherwise error
381
    int init_storage_vault_accessors();
382
383
    /**
384
     * Scan key-value pairs between [`begin`, `end`), and perform `recycle_func` on each key-value pair.
385
     *
386
     * @param recycle_func defines how to recycle resources corresponding to a key-value pair. Returns 0 if the recycling is successful.
387
     * @param loop_done is called after `RangeGetIterator` has no next kv. Usually used to perform a batch recycling. Returns 0 if success. 
388
     * @return 0 if all corresponding resources are recycled successfully, otherwise non-zero
389
     */
390
    int scan_and_recycle(std::string begin, std::string_view end,
391
                         std::function<int(std::string_view k, std::string_view v)> recycle_func,
392
                         std::function<int()> loop_done = nullptr);
393
394
    // return 0 for success otherwise error
395
    int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb);
396
397
    // return 0 for success otherwise error
398
    // NOTE: this function ONLY be called when the file paths cannot be calculated
399
    int delete_rowset_data(const std::string& resource_id, int64_t tablet_id,
400
                           const std::string& rowset_id);
401
402
    // return 0 for success otherwise error
403
    int delete_rowset_data(const std::map<std::string, doris::RowsetMetaCloudPB>& rowsets,
404
                           RowsetRecyclingState type, RecyclerMetricsContext& metrics_context);
405
406
    /**
407
     * Get stage storage info from instance and init StorageVaultAccessor
408
     * @return 0 if accessor is successfully inited, 1 if stage not found, negative for error
409
     */
410
    int init_copy_job_accessor(const std::string& stage_id, const StagePB::StageType& stage_type,
411
                               std::shared_ptr<StorageVaultAccessor>* accessor);
412
413
    void register_recycle_task(const std::string& task_name, int64_t start_time);
414
415
    void unregister_recycle_task(const std::string& task_name);
416
417
    // for scan all tablets and statistics metrics
418
    int scan_tablets_and_statistics(int64_t tablet_id, int64_t index_id,
419
                                    RecyclerMetricsContext& metrics_context,
420
                                    int64_t partition_id = -1, bool is_empty_tablet = false);
421
422
    // for scan all rs of tablet and statistics metrics
423
    int scan_tablet_and_statistics(int64_t tablet_id, RecyclerMetricsContext& metrics_context);
424
425
    // Recycle operation log and the log key.
426
    //
427
    // The log_key is constructed from the log_version and instance_id.
428
    // Both `operation_log` and `log_key` will be removed in the same transaction, to ensure atomicity.
429
    int recycle_operation_log(Versionstamp log_version, OperationLogPB operation_log);
430
431
    // Recycle rowset meta and data, return 0 for success otherwise error
432
    //
433
    // This function will decrease the rowset ref count and remove the rowset meta and data if the ref count is 1.
434
    int recycle_rowset_meta_and_data(std::string_view recycle_rowset_key,
435
                                     const RowsetMetaCloudPB& rowset_meta);
436
437
    // Whether the instance has any snapshots, return 0 for success otherwise error.
438
    int has_cluster_snapshots(bool* any);
439
440
private:
441
    std::atomic_bool stopped_ {false};
442
    std::shared_ptr<TxnKv> txn_kv_;
443
    std::string instance_id_;
444
    InstanceInfoPB instance_info_;
445
446
    // TODO(plat1ko): Add new accessor to map in runtime for new created storage vaults
447
    std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>> accessor_map_;
448
    using InvertedIndexInfo =
449
            std::pair<InvertedIndexStorageFormatPB, std::vector<std::pair<int64_t, std::string>>>;
450
451
    class InvertedIndexIdCache;
452
    std::unique_ptr<InvertedIndexIdCache> inverted_index_id_cache_;
453
454
    std::mutex recycled_tablets_mtx_;
455
    // Store recycled tablets, we can skip deleting rowset data of these tablets because these data has already been deleted.
456
    std::unordered_set<int64_t> recycled_tablets_;
457
458
    std::mutex recycle_tasks_mutex;
459
    // <task_name, start_time>>
460
    std::map<std::string, int64_t> running_recycle_tasks;
461
462
    RecyclerThreadPoolGroup _thread_pool_group;
463
464
    std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
465
    std::shared_ptr<SnapshotManager> snapshot_manager_;
466
467
    TabletRecyclerMetricsContext tablet_metrics_context_;
468
    SegmentRecyclerMetricsContext segment_metrics_context_;
469
};
470
471
// Helper class to check if operation logs can be recycled based on snapshots and versionstamps
472
class OperationLogRecycleChecker {
473
public:
474
    OperationLogRecycleChecker(std::string_view instance_id, TxnKv* txn_kv)
475
24
            : instance_id_(instance_id), txn_kv_(txn_kv) {}
476
477
    // Initialize the checker by loading snapshots and setting max version stamp
478
    int init();
479
480
    // Check if an operation log can be recycled
481
    bool can_recycle(const Versionstamp& log_versionstamp, int64_t log_min_timestamp) const;
482
483
0
    Versionstamp max_versionstamp() const { return max_versionstamp_; }
484
485
private:
486
    std::string_view instance_id_;
487
    TxnKv* txn_kv_;
488
    Versionstamp max_versionstamp_;
489
    std::map<Versionstamp, size_t> snapshot_indexes_;
490
    std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots_;
491
};
492
493
} // namespace doris::cloud