Coverage Report

Created: 2025-10-30 15:10

/root/doris/cloud/src/recycler/recycler.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/cloud.pb.h>
21
#include <glog/logging.h>
22
23
#include <atomic>
24
#include <condition_variable>
25
#include <cstdint>
26
#include <deque>
27
#include <functional>
28
#include <memory>
29
#include <string>
30
#include <string_view>
31
#include <thread>
32
#include <utility>
33
34
#include "common/bvars.h"
35
#include "meta-service/txn_lazy_committer.h"
36
#include "recycler/storage_vault_accessor.h"
37
#include "recycler/white_black_list.h"
38
39
namespace brpc {
40
class Server;
41
} // namespace brpc
42
43
namespace doris::cloud {
44
class TxnKv;
45
class InstanceRecycler;
46
class StorageVaultAccessor;
47
class Checker;
48
class SimpleThreadPool;
49
class RecyclerMetricsContext;
50
class TabletRecyclerMetricsContext;
51
class SegmentRecyclerMetricsContext;
52
struct RecyclerThreadPoolGroup {
53
8
    RecyclerThreadPoolGroup() = default;
54
    RecyclerThreadPoolGroup(std::shared_ptr<SimpleThreadPool> s3_producer_pool,
55
                            std::shared_ptr<SimpleThreadPool> recycle_tablet_pool,
56
                            std::shared_ptr<SimpleThreadPool> group_recycle_function_pool)
57
            : s3_producer_pool(std::move(s3_producer_pool)),
58
              recycle_tablet_pool(std::move(recycle_tablet_pool)),
59
10
              group_recycle_function_pool(std::move(group_recycle_function_pool)) {}
60
172
    ~RecyclerThreadPoolGroup() = default;
61
77
    RecyclerThreadPoolGroup(const RecyclerThreadPoolGroup&) = default;
62
    RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup& other) = default;
63
10
    RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup&& other) = default;
64
77
    RecyclerThreadPoolGroup(RecyclerThreadPoolGroup&&) = default;
65
    // used for accessor.delete_files, accessor.delete_directory
66
    std::shared_ptr<SimpleThreadPool> s3_producer_pool;
67
    // used for InstanceRecycler::recycle_tablet
68
    std::shared_ptr<SimpleThreadPool> recycle_tablet_pool;
69
    std::shared_ptr<SimpleThreadPool> group_recycle_function_pool;
70
};
71
72
class Recycler {
73
public:
74
    explicit Recycler(std::shared_ptr<TxnKv> txn_kv);
75
    ~Recycler();
76
77
    // returns 0 for success otherwise error
78
    int start(brpc::Server* server);
79
80
    void stop();
81
82
291
    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
83
84
private:
85
    void recycle_callback();
86
87
    void instance_scanner_callback();
88
89
    void lease_recycle_jobs();
90
91
    void check_recycle_tasks();
92
93
private:
94
    friend class RecyclerServiceImpl;
95
96
    std::shared_ptr<TxnKv> txn_kv_;
97
    std::atomic_bool stopped_ {false};
98
99
    std::vector<std::thread> workers_;
100
101
    std::mutex mtx_;
102
    // notify recycle workers
103
    std::condition_variable pending_instance_cond_;
104
    std::deque<InstanceInfoPB> pending_instance_queue_;
105
    std::unordered_set<std::string> pending_instance_set_;
106
    std::unordered_map<std::string, std::shared_ptr<InstanceRecycler>> recycling_instance_map_;
107
    // notify instance scanner and lease thread
108
    std::condition_variable notifier_;
109
110
    std::string ip_port_;
111
112
    WhiteBlackList instance_filter_;
113
    std::unique_ptr<Checker> checker_;
114
115
    RecyclerThreadPoolGroup _thread_pool_group;
116
117
    std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
118
};
119
120
enum class RowsetRecyclingState {
121
    FORMAL_ROWSET,
122
    TMP_ROWSET,
123
};
124
125
class RecyclerMetricsContext {
126
public:
127
1
    RecyclerMetricsContext() = default;
128
129
    RecyclerMetricsContext(std::string instance_id, std::string operation_type)
130
312
            : operation_type(std::move(operation_type)), instance_id(std::move(instance_id)) {
131
312
        start();
132
312
    }
133
134
312
    ~RecyclerMetricsContext() = default;
135
136
    std::atomic_ullong total_need_recycle_data_size = 0;
137
    std::atomic_ullong total_need_recycle_num = 0;
138
139
    std::atomic_ullong total_recycled_data_size = 0;
140
    std::atomic_ullong total_recycled_num = 0;
141
142
    std::string operation_type;
143
    std::string instance_id;
144
145
    double start_time = 0;
146
147
312
    void start() {
148
312
        start_time = duration_cast<std::chrono::milliseconds>(
149
312
                             std::chrono::system_clock::now().time_since_epoch())
150
312
                             .count();
151
312
    }
152
153
178
    double duration() const {
154
178
        return duration_cast<std::chrono::milliseconds>(
155
178
                       std::chrono::system_clock::now().time_since_epoch())
156
178
                       .count() -
157
178
               start_time;
158
178
    }
159
160
20
    void reset() {
161
20
        total_need_recycle_data_size = 0;
162
20
        total_need_recycle_num = 0;
163
20
        total_recycled_data_size = 0;
164
20
        total_recycled_num = 0;
165
20
        start_time = duration_cast<std::chrono::milliseconds>(
166
20
                             std::chrono::system_clock::now().time_since_epoch())
167
20
                             .count();
168
20
    }
169
170
178
    void finish_report() {
171
178
        if (!operation_type.empty()) {
172
178
            double cost = duration();
173
178
            g_bvar_recycler_instance_last_round_recycle_elpased_ts.put(
174
178
                    {instance_id, operation_type}, cost);
175
178
            g_bvar_recycler_instance_recycle_round.put({instance_id, operation_type}, 1);
176
178
            LOG(INFO) << "recycle instance: " << instance_id
177
178
                      << ", operation type: " << operation_type << ", cost: " << cost
178
178
                      << " ms, total recycled num: " << total_recycled_num.load()
179
178
                      << ", total recycled data size: " << total_recycled_data_size.load()
180
178
                      << " bytes";
181
178
            if (cost != 0) {
182
172
                if (total_recycled_num.load() != 0) {
183
29
                    g_bvar_recycler_instance_recycle_time_per_resource.put(
184
29
                            {instance_id, operation_type}, cost / total_recycled_num.load());
185
29
                }
186
172
                g_bvar_recycler_instance_recycle_bytes_per_ms.put(
187
172
                        {instance_id, operation_type}, total_recycled_data_size.load() / cost);
188
172
            }
189
178
        }
190
178
    }
191
192
    // `is_begin` is used to initialize total num of items need to be recycled
193
24.5k
    void report(bool is_begin = false) {
194
24.5k
        if (!operation_type.empty()) {
195
            // is init
196
24.5k
            if (is_begin) {
197
0
                auto value = total_need_recycle_num.load();
198
199
0
                g_bvar_recycler_instance_last_round_to_recycle_bytes.put(
200
0
                        {instance_id, operation_type}, total_need_recycle_data_size.load());
201
0
                g_bvar_recycler_instance_last_round_to_recycle_num.put(
202
0
                        {instance_id, operation_type}, value);
203
24.5k
            } else {
204
24.5k
                g_bvar_recycler_instance_last_round_recycled_bytes.put(
205
24.5k
                        {instance_id, operation_type}, total_recycled_data_size.load());
206
24.5k
                g_bvar_recycler_instance_recycle_total_bytes_since_started.put(
207
24.5k
                        {instance_id, operation_type}, total_recycled_data_size.load());
208
24.5k
                g_bvar_recycler_instance_last_round_recycled_num.put({instance_id, operation_type},
209
24.5k
                                                                     total_recycled_num.load());
210
24.5k
                g_bvar_recycler_instance_recycle_total_num_since_started.put(
211
24.5k
                        {instance_id, operation_type}, total_recycled_num.load());
212
24.5k
            }
213
24.5k
        }
214
24.5k
    }
215
};
216
217
class TabletRecyclerMetricsContext : public RecyclerMetricsContext {
218
public:
219
77
    TabletRecyclerMetricsContext() : RecyclerMetricsContext("global_recycler", "recycle_tablet") {}
220
};
221
222
class SegmentRecyclerMetricsContext : public RecyclerMetricsContext {
223
public:
224
    SegmentRecyclerMetricsContext()
225
77
            : RecyclerMetricsContext("global_recycler", "recycle_segment") {}
226
};
227
228
class InstanceRecycler {
229
public:
230
    explicit InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const InstanceInfoPB& instance,
231
                              RecyclerThreadPoolGroup thread_pool_group,
232
                              std::shared_ptr<TxnLazyCommitter> txn_lazy_committer);
233
    ~InstanceRecycler();
234
235
    // returns 0 for success otherwise error
236
    int init();
237
238
0
    void stop() { stopped_.store(true, std::memory_order_release); }
239
46
    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
240
241
    // returns 0 for success otherwise error
242
    int do_recycle();
243
244
    // remove all kv and data in this instance, ONLY be called when instance has been deleted
245
    // returns 0 for success otherwise error
246
    int recycle_deleted_instance();
247
248
    // scan and recycle expired indexes:
249
    // 1. dropped table, dropped mv
250
    // 2. half-successtable/index when create
251
    // returns 0 for success otherwise error
252
    int recycle_indexes();
253
254
    // scan and recycle expired partitions:
255
    // 1. dropped parttion
256
    // 2. half-success partition when create
257
    // returns 0 for success otherwise error
258
    int recycle_partitions();
259
260
    // scan and recycle expired rowsets:
261
    // 1. prepare_rowset will produce recycle_rowset before uploading data to remote storage (memo)
262
    // 2. compaction will change the input rowsets to recycle_rowset
263
    // returns 0 for success otherwise error
264
    int recycle_rowsets();
265
266
    // scan and recycle expired tmp rowsets:
267
    // 1. commit_rowset will produce tmp_rowset when finish upload data (load or compaction) to remote storage
268
    // returns 0 for success otherwise error
269
    int recycle_tmp_rowsets();
270
271
    /**
272
     * recycle all tablets belonging to the index specified by `index_id`
273
     *
274
     * @param partition_id if positive, only recycle tablets in this partition belonging to the specified index
275
     * @param is_empty_tablet indicates whether the tablet has object files, can skip delete objects if tablet is empty
276
     * @return 0 for success otherwise error
277
     */
278
    int recycle_tablets(int64_t table_id, int64_t index_id, RecyclerMetricsContext& ctx,
279
                        int64_t partition_id = -1, bool is_empty_tablet = false);
280
281
    /**
282
     * recycle all rowsets belonging to the tablet specified by `tablet_id`
283
     *
284
     * @return 0 for success otherwise error
285
     */
286
    int recycle_tablet(int64_t tablet_id, RecyclerMetricsContext& metrics_context);
287
288
    // scan and recycle useless partition version kv
289
    int recycle_versions();
290
291
    // scan and abort timeout txn label
292
    // returns 0 for success otherwise error
293
    int abort_timeout_txn();
294
295
    //scan and recycle expire txn label
296
    // returns 0 for success otherwise error
297
    int recycle_expired_txn_label();
298
299
    // scan and recycle finished or timeout copy jobs
300
    // returns 0 for success otherwise error
301
    int recycle_copy_jobs();
302
303
    // scan and recycle dropped internal stage
304
    // returns 0 for success otherwise error
305
    int recycle_stage();
306
307
    // scan and recycle expired stage objects
308
    // returns 0 for success otherwise error
309
    int recycle_expired_stage_objects();
310
311
    // scan and recycle expired restore jobs
312
    // returns 0 for success otherwise error
313
    int recycle_restore_jobs();
314
315
    bool check_recycle_tasks();
316
317
    int scan_and_statistics_indexes();
318
319
    int scan_and_statistics_partitions();
320
321
    int scan_and_statistics_rowsets();
322
323
    int scan_and_statistics_tmp_rowsets();
324
325
    int scan_and_statistics_abort_timeout_txn();
326
327
    int scan_and_statistics_expired_txn_label();
328
329
    int scan_and_statistics_copy_jobs();
330
331
    int scan_and_statistics_stage();
332
333
    int scan_and_statistics_expired_stage_objects();
334
335
    int scan_and_statistics_versions();
336
337
    int scan_and_statistics_restore_jobs();
338
339
private:
340
    // returns 0 for success otherwise error
341
    int init_obj_store_accessors();
342
343
    // returns 0 for success otherwise error
344
    int init_storage_vault_accessors();
345
346
    /**
347
     * Scan key-value pairs between [`begin`, `end`), and perform `recycle_func` on each key-value pair.
348
     *
349
     * @param recycle_func defines how to recycle resources corresponding to a key-value pair. Returns 0 if the recycling is successful.
350
     * @param loop_done is called after `RangeGetIterator` has no next kv. Usually used to perform a batch recycling. Returns 0 if success. 
351
     * @return 0 if all corresponding resources are recycled successfully, otherwise non-zero
352
     */
353
    int scan_and_recycle(std::string begin, std::string_view end,
354
                         std::function<int(std::string_view k, std::string_view v)> recycle_func,
355
                         std::function<int()> loop_done = nullptr);
356
357
    // return 0 for success otherwise error
358
    int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb);
359
360
    // return 0 for success otherwise error
361
    // NOTE: this function ONLY be called when the file paths cannot be calculated
362
    int delete_rowset_data(const std::string& resource_id, int64_t tablet_id,
363
                           const std::string& rowset_id);
364
365
    // return 0 for success otherwise error
366
    int delete_rowset_data(const std::map<std::string, doris::RowsetMetaCloudPB>& rowsets,
367
                           RowsetRecyclingState type, RecyclerMetricsContext& metrics_context);
368
369
    /**
370
     * Get stage storage info from instance and init StorageVaultAccessor
371
     * @return 0 if accessor is successfully inited, 1 if stage not found, negative for error
372
     */
373
    int init_copy_job_accessor(const std::string& stage_id, const StagePB::StageType& stage_type,
374
                               std::shared_ptr<StorageVaultAccessor>* accessor);
375
376
    void register_recycle_task(const std::string& task_name, int64_t start_time);
377
378
    void unregister_recycle_task(const std::string& task_name);
379
380
    // for scan all tablets and statistics metrics
381
    int scan_tablets_and_statistics(int64_t tablet_id, int64_t index_id,
382
                                    RecyclerMetricsContext& metrics_context,
383
                                    int64_t partition_id = -1, bool is_empty_tablet = false);
384
385
    // for scan all rs of tablet and statistics metrics
386
    int scan_tablet_and_statistics(int64_t tablet_id, RecyclerMetricsContext& metrics_context);
387
388
private:
389
    std::atomic_bool stopped_ {false};
390
    std::shared_ptr<TxnKv> txn_kv_;
391
    std::string instance_id_;
392
    InstanceInfoPB instance_info_;
393
394
    // TODO(plat1ko): Add new accessor to map in runtime for new created storage vaults
395
    std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>> accessor_map_;
396
    using InvertedIndexInfo =
397
            std::pair<InvertedIndexStorageFormatPB, std::vector<std::pair<int64_t, std::string>>>;
398
399
    class InvertedIndexIdCache;
400
    std::unique_ptr<InvertedIndexIdCache> inverted_index_id_cache_;
401
402
    std::mutex recycled_tablets_mtx_;
403
    // Store recycled tablets, we can skip deleting rowset data of these tablets because these data has already been deleted.
404
    std::unordered_set<int64_t> recycled_tablets_;
405
406
    std::mutex recycle_tasks_mutex;
407
    // <task_name, start_time>>
408
    std::map<std::string, int64_t> running_recycle_tasks;
409
410
    RecyclerThreadPoolGroup _thread_pool_group;
411
412
    std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
413
414
    TabletRecyclerMetricsContext tablet_metrics_context_;
415
    SegmentRecyclerMetricsContext segment_metrics_context_;
416
};
417
418
} // namespace doris::cloud