Coverage Report

Created: 2025-07-24 13:02

/root/doris/cloud/src/recycler/recycler.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/cloud.pb.h>
21
22
#include <atomic>
23
#include <condition_variable>
24
#include <cstdint>
25
#include <deque>
26
#include <functional>
27
#include <memory>
28
#include <string>
29
#include <string_view>
30
#include <thread>
31
#include <utility>
32
33
#include "common/bvars.h"
34
#include "meta-service/txn_lazy_committer.h"
35
#include "meta-store/versionstamp.h"
36
#include "recycler/storage_vault_accessor.h"
37
#include "recycler/white_black_list.h"
38
39
namespace brpc {
40
class Server;
41
} // namespace brpc
42
43
namespace doris::cloud {
44
class TxnKv;
45
class InstanceRecycler;
46
class StorageVaultAccessor;
47
class Checker;
48
class SimpleThreadPool;
49
class RecyclerMetricsContext;
50
struct RecyclerThreadPoolGroup {
51
6
    RecyclerThreadPoolGroup() = default;
52
    RecyclerThreadPoolGroup(std::shared_ptr<SimpleThreadPool> s3_producer_pool,
53
                            std::shared_ptr<SimpleThreadPool> recycle_tablet_pool,
54
                            std::shared_ptr<SimpleThreadPool> group_recycle_function_pool)
55
            : s3_producer_pool(std::move(s3_producer_pool)),
56
              recycle_tablet_pool(std::move(recycle_tablet_pool)),
57
8
              group_recycle_function_pool(std::move(group_recycle_function_pool)) {}
58
166
    ~RecyclerThreadPoolGroup() = default;
59
76
    RecyclerThreadPoolGroup(const RecyclerThreadPoolGroup&) = default;
60
    RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup& other) = default;
61
8
    RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup&& other) = default;
62
76
    RecyclerThreadPoolGroup(RecyclerThreadPoolGroup&&) = default;
63
    // used for accessor.delete_files, accessor.delete_directory
64
    std::shared_ptr<SimpleThreadPool> s3_producer_pool;
65
    // used for InstanceRecycler::recycle_tablet
66
    std::shared_ptr<SimpleThreadPool> recycle_tablet_pool;
67
    std::shared_ptr<SimpleThreadPool> group_recycle_function_pool;
68
};
69
70
class Recycler {
71
public:
72
    explicit Recycler(std::shared_ptr<TxnKv> txn_kv);
73
    ~Recycler();
74
75
    // returns 0 for success otherwise error
76
    int start(brpc::Server* server);
77
78
    void stop();
79
80
291
    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
81
82
private:
83
    void recycle_callback();
84
85
    void instance_scanner_callback();
86
87
    void lease_recycle_jobs();
88
89
    void check_recycle_tasks();
90
91
private:
92
    friend class RecyclerServiceImpl;
93
94
    std::shared_ptr<TxnKv> txn_kv_;
95
    std::atomic_bool stopped_ {false};
96
97
    std::vector<std::thread> workers_;
98
99
    std::mutex mtx_;
100
    // notify recycle workers
101
    std::condition_variable pending_instance_cond_;
102
    std::deque<InstanceInfoPB> pending_instance_queue_;
103
    std::unordered_set<std::string> pending_instance_set_;
104
    std::unordered_map<std::string, std::shared_ptr<InstanceRecycler>> recycling_instance_map_;
105
    // notify instance scanner and lease thread
106
    std::condition_variable notifier_;
107
108
    std::string ip_port_;
109
110
    WhiteBlackList instance_filter_;
111
    std::unique_ptr<Checker> checker_;
112
113
    RecyclerThreadPoolGroup _thread_pool_group;
114
115
    std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
116
};
117
118
enum class RowsetRecyclingState {
119
    FORMAL_ROWSET,
120
    TMP_ROWSET,
121
};
122
123
class InstanceRecycler {
124
public:
125
    explicit InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const InstanceInfoPB& instance,
126
                              RecyclerThreadPoolGroup thread_pool_group,
127
                              std::shared_ptr<TxnLazyCommitter> txn_lazy_committer);
128
    ~InstanceRecycler();
129
130
    // returns 0 for success otherwise error
131
    int init();
132
133
0
    void stop() { stopped_.store(true, std::memory_order_release); }
134
35
    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
135
136
    // returns 0 for success otherwise error
137
    int do_recycle();
138
139
    // remove all kv and data in this instance, ONLY be called when instance has been deleted
140
    // returns 0 for success otherwise error
141
    int recycle_deleted_instance();
142
143
    // scan and recycle expired indexes:
144
    // 1. dropped table, dropped mv
145
    // 2. half-successtable/index when create
146
    // returns 0 for success otherwise error
147
    int recycle_indexes();
148
149
    // scan and recycle expired partitions:
150
    // 1. dropped parttion
151
    // 2. half-success partition when create
152
    // returns 0 for success otherwise error
153
    int recycle_partitions();
154
155
    // scan and recycle expired rowsets:
156
    // 1. prepare_rowset will produce recycle_rowset before uploading data to remote storage (memo)
157
    // 2. compaction will change the input rowsets to recycle_rowset
158
    // returns 0 for success otherwise error
159
    int recycle_rowsets();
160
161
    // scan and recycle expired tmp rowsets:
162
    // 1. commit_rowset will produce tmp_rowset when finish upload data (load or compaction) to remote storage
163
    // returns 0 for success otherwise error
164
    int recycle_tmp_rowsets();
165
166
    /**
167
     * recycle all tablets belonging to the index specified by `index_id`
168
     *
169
     * @param partition_id if positive, only recycle tablets in this partition belonging to the specified index
170
     * @param is_empty_tablet indicates whether the tablet has object files, can skip delete objects if tablet is empty
171
     * @return 0 for success otherwise error
172
     */
173
    int recycle_tablets(int64_t table_id, int64_t index_id, RecyclerMetricsContext& ctx,
174
                        int64_t partition_id = -1, bool is_empty_tablet = false);
175
176
    /**
177
     * recycle all rowsets belonging to the tablet specified by `tablet_id`
178
     *
179
     * @return 0 for success otherwise error
180
     */
181
    int recycle_tablet(int64_t tablet_id, RecyclerMetricsContext& metrics_context);
182
183
    // scan and recycle useless partition version kv
184
    int recycle_versions();
185
186
    // scan and abort timeout txn label
187
    // returns 0 for success otherwise error
188
    int abort_timeout_txn();
189
190
    //scan and recycle expire txn label
191
    // returns 0 for success otherwise error
192
    int recycle_expired_txn_label();
193
194
    // scan and recycle finished or timeout copy jobs
195
    // returns 0 for success otherwise error
196
    int recycle_copy_jobs();
197
198
    // scan and recycle dropped internal stage
199
    // returns 0 for success otherwise error
200
    int recycle_stage();
201
202
    // scan and recycle expired stage objects
203
    // returns 0 for success otherwise error
204
    int recycle_expired_stage_objects();
205
206
    // scan and recycle operation logs
207
    // returns 0 for success otherwise error
208
    int recycle_operation_logs();
209
210
    bool check_recycle_tasks();
211
212
    int scan_and_statistics_indexes();
213
214
    int scan_and_statistics_partitions();
215
216
    int scan_and_statistics_rowsets();
217
218
    int scan_and_statistics_tmp_rowsets();
219
220
    int scan_and_statistics_abort_timeout_txn();
221
222
    int scan_and_statistics_expired_txn_label();
223
224
    int scan_and_statistics_copy_jobs();
225
226
    int scan_and_statistics_stage();
227
228
    int scan_and_statistics_expired_stage_objects();
229
230
    int scan_and_statistics_versions();
231
232
private:
233
    // returns 0 for success otherwise error
234
    int init_obj_store_accessors();
235
236
    // returns 0 for success otherwise error
237
    int init_storage_vault_accessors();
238
239
    /**
240
     * Scan key-value pairs between [`begin`, `end`), and perform `recycle_func` on each key-value pair.
241
     *
242
     * @param recycle_func defines how to recycle resources corresponding to a key-value pair. Returns 0 if the recycling is successful.
243
     * @param loop_done is called after `RangeGetIterator` has no next kv. Usually used to perform a batch recycling. Returns 0 if success. 
244
     * @return 0 if all corresponding resources are recycled successfully, otherwise non-zero
245
     */
246
    int scan_and_recycle(std::string begin, std::string_view end,
247
                         std::function<int(std::string_view k, std::string_view v)> recycle_func,
248
                         std::function<int()> loop_done = nullptr);
249
250
    // return 0 for success otherwise error
251
    int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb);
252
253
    // return 0 for success otherwise error
254
    // NOTE: this function ONLY be called when the file paths cannot be calculated
255
    int delete_rowset_data(const std::string& resource_id, int64_t tablet_id,
256
                           const std::string& rowset_id);
257
258
    // return 0 for success otherwise error
259
    int delete_rowset_data(const std::map<std::string, doris::RowsetMetaCloudPB>& rowsets,
260
                           RowsetRecyclingState type, RecyclerMetricsContext& metrics_context);
261
262
    /**
263
     * Get stage storage info from instance and init StorageVaultAccessor
264
     * @return 0 if accessor is successfully inited, 1 if stage not found, negative for error
265
     */
266
    int init_copy_job_accessor(const std::string& stage_id, const StagePB::StageType& stage_type,
267
                               std::shared_ptr<StorageVaultAccessor>* accessor);
268
269
    void register_recycle_task(const std::string& task_name, int64_t start_time);
270
271
    void unregister_recycle_task(const std::string& task_name);
272
273
    // for scan all tablets and statistics metrics
274
    int scan_tablets_and_statistics(int64_t tablet_id, int64_t index_id,
275
                                    RecyclerMetricsContext& metrics_context,
276
                                    int64_t partition_id = -1, bool is_empty_tablet = false);
277
278
    // for scan all rs of tablet and statistics metrics
279
    int scan_tablet_and_statistics(int64_t tablet_id, RecyclerMetricsContext& metrics_context);
280
281
    // Recycle operation log and the log key.
282
    //
283
    // The log_key is constructed from the log_version and instance_id.
284
    // Both `operation_log` and `log_key` will be removed in the same transaction, to ensure atomicity.
285
    int recycle_operation_log(Versionstamp log_version, OperationLogPB operation_log);
286
287
private:
288
    std::atomic_bool stopped_ {false};
289
    std::shared_ptr<TxnKv> txn_kv_;
290
    std::string instance_id_;
291
    InstanceInfoPB instance_info_;
292
293
    // TODO(plat1ko): Add new accessor to map in runtime for new created storage vaults
294
    std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>> accessor_map_;
295
    using InvertedIndexInfo =
296
            std::pair<InvertedIndexStorageFormatPB, std::vector<std::pair<int64_t, std::string>>>;
297
298
    class InvertedIndexIdCache;
299
    std::unique_ptr<InvertedIndexIdCache> inverted_index_id_cache_;
300
301
    std::mutex recycled_tablets_mtx_;
302
    // Store recycled tablets, we can skip deleting rowset data of these tablets because these data has already been deleted.
303
    std::unordered_set<int64_t> recycled_tablets_;
304
305
    std::mutex recycle_tasks_mutex;
306
    // <task_name, start_time>>
307
    std::map<std::string, int64_t> running_recycle_tasks;
308
309
    RecyclerThreadPoolGroup _thread_pool_group;
310
311
    std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
312
};
313
314
class RecyclerMetricsContext {
315
public:
316
1
    RecyclerMetricsContext() = default;
317
318
    RecyclerMetricsContext(std::string instance_id, std::string operation_type)
319
147
            : operation_type(std::move(operation_type)), instance_id(std::move(instance_id)) {
320
147
        start();
321
147
    }
322
323
143
    ~RecyclerMetricsContext() = default;
324
325
    int64_t total_need_recycle_data_size = 0;
326
    int64_t total_need_recycle_num = 0;
327
328
    std::atomic_long total_recycled_data_size {0};
329
    std::atomic_long total_recycled_num {0};
330
331
    std::string operation_type {};
332
    std::string instance_id {};
333
334
    double start_time = 0;
335
336
147
    void start() {
337
147
        start_time = duration_cast<std::chrono::milliseconds>(
338
147
                             std::chrono::system_clock::now().time_since_epoch())
339
147
                             .count();
340
147
    }
341
342
163
    double duration() const {
343
163
        return duration_cast<std::chrono::milliseconds>(
344
163
                       std::chrono::system_clock::now().time_since_epoch())
345
163
                       .count() -
346
163
               start_time;
347
163
    }
348
349
20
    void reset() {
350
20
        total_need_recycle_data_size = 0;
351
20
        total_need_recycle_num = 0;
352
20
        total_recycled_data_size.store(0);
353
20
        total_recycled_num.store(0);
354
20
        start_time = duration_cast<std::chrono::milliseconds>(
355
20
                             std::chrono::system_clock::now().time_since_epoch())
356
20
                             .count();
357
20
    }
358
359
163
    void finish_report() {
360
163
        if (!operation_type.empty()) {
361
163
            double cost = duration();
362
163
            g_bvar_recycler_instance_last_round_recycle_elpased_ts.put(
363
163
                    {instance_id, operation_type}, cost);
364
163
            g_bvar_recycler_instance_recycle_round.put({instance_id, operation_type}, 1);
365
163
            LOG(INFO) << "recycle instance: " << instance_id
366
163
                      << ", operation type: " << operation_type << ", cost: " << cost
367
163
                      << " ms, total recycled num: " << total_recycled_num.load()
368
163
                      << ", total recycled data size: " << total_recycled_data_size.load()
369
163
                      << " bytes";
370
163
            if (total_recycled_num.load()) {
371
23
                g_bvar_recycler_instance_recycle_time_per_resource.put(
372
23
                        {instance_id, operation_type}, cost / total_recycled_num.load());
373
140
            } else {
374
140
                g_bvar_recycler_instance_recycle_time_per_resource.put(
375
140
                        {instance_id, operation_type}, -1);
376
140
            }
377
163
            if (total_recycled_data_size.load()) {
378
0
                g_bvar_recycler_instance_recycle_bytes_per_ms.put(
379
0
                        {instance_id, operation_type}, total_recycled_data_size.load() / cost);
380
163
            } else {
381
163
                g_bvar_recycler_instance_recycle_bytes_per_ms.put({instance_id, operation_type},
382
163
                                                                  -1);
383
163
            }
384
163
        }
385
163
    }
386
387
    // `is_begin` is used to initialize total num of items need to be recycled
388
23.9k
    void report(bool is_begin = false) {
389
23.9k
        if (!operation_type.empty()) {
390
            // is init
391
23.9k
            if (is_begin) {
392
0
                if (total_need_recycle_data_size) {
393
0
                    g_bvar_recycler_instance_last_round_to_recycle_bytes.put(
394
0
                            {instance_id, operation_type}, total_need_recycle_data_size);
395
0
                }
396
23.9k
            } else {
397
23.9k
                if (total_recycled_data_size.load()) {
398
0
                    g_bvar_recycler_instance_last_round_recycled_bytes.put(
399
0
                            {instance_id, operation_type}, total_recycled_data_size.load());
400
0
                }
401
23.9k
                g_bvar_recycler_instance_recycle_total_bytes_since_started.put(
402
23.9k
                        {instance_id, operation_type}, total_recycled_data_size.load());
403
23.9k
            }
404
405
            // is init
406
23.9k
            if (is_begin) {
407
0
                if (total_need_recycle_num) {
408
0
                    g_bvar_recycler_instance_last_round_to_recycle_num.put(
409
0
                            {instance_id, operation_type}, total_need_recycle_num);
410
0
                }
411
23.9k
            } else {
412
23.9k
                if (total_recycled_num.load()) {
413
23.5k
                    g_bvar_recycler_instance_last_round_recycled_num.put(
414
23.5k
                            {instance_id, operation_type}, total_recycled_num.load());
415
23.5k
                }
416
23.9k
                g_bvar_recycler_instance_recycle_total_num_since_started.put(
417
23.9k
                        {instance_id, operation_type}, total_recycled_num.load());
418
23.9k
            }
419
23.9k
        }
420
23.9k
    }
421
};
422
423
} // namespace doris::cloud