Coverage Report

Created: 2025-05-21 15:28

/root/doris/cloud/src/recycler/recycler.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/cloud.pb.h>
21
22
#include <atomic>
23
#include <condition_variable>
24
#include <deque>
25
#include <functional>
26
#include <memory>
27
#include <string>
28
#include <string_view>
29
#include <thread>
30
31
#include "meta-service/txn_lazy_committer.h"
32
#include "recycler/storage_vault_accessor.h"
33
#include "recycler/white_black_list.h"
34
35
namespace brpc {
36
class Server;
37
} // namespace brpc
38
39
namespace doris::cloud {
40
class TxnKv;
41
class InstanceRecycler;
42
class StorageVaultAccessor;
43
class Checker;
44
class SimpleThreadPool;
45
struct RecyclerThreadPoolGroup {
46
6
    RecyclerThreadPoolGroup() = default;
47
    RecyclerThreadPoolGroup(std::shared_ptr<SimpleThreadPool> s3_producer_pool,
48
                            std::shared_ptr<SimpleThreadPool> recycle_tablet_pool,
49
                            std::shared_ptr<SimpleThreadPool> group_recycle_function_pool)
50
            : s3_producer_pool(std::move(s3_producer_pool)),
51
              recycle_tablet_pool(std::move(recycle_tablet_pool)),
52
8
              group_recycle_function_pool(std::move(group_recycle_function_pool)) {}
53
150
    ~RecyclerThreadPoolGroup() = default;
54
68
    RecyclerThreadPoolGroup(const RecyclerThreadPoolGroup&) = default;
55
    RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup& other) = default;
56
8
    RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup&& other) = default;
57
68
    RecyclerThreadPoolGroup(RecyclerThreadPoolGroup&&) = default;
58
    // used for accessor.delete_files, accessor.delete_directory
59
    std::shared_ptr<SimpleThreadPool> s3_producer_pool;
60
    // used for InstanceRecycler::recycle_tablet
61
    std::shared_ptr<SimpleThreadPool> recycle_tablet_pool;
62
    std::shared_ptr<SimpleThreadPool> group_recycle_function_pool;
63
};
64
65
class Recycler {
66
public:
67
    explicit Recycler(std::shared_ptr<TxnKv> txn_kv);
68
    ~Recycler();
69
70
    // returns 0 for success otherwise error
71
    int start(brpc::Server* server);
72
73
    void stop();
74
75
292
    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
76
77
private:
78
    void recycle_callback();
79
80
    void instance_scanner_callback();
81
82
    void lease_recycle_jobs();
83
84
    void check_recycle_tasks();
85
86
private:
87
    friend class RecyclerServiceImpl;
88
89
    std::shared_ptr<TxnKv> txn_kv_;
90
    std::atomic_bool stopped_ {false};
91
92
    std::vector<std::thread> workers_;
93
94
    std::mutex mtx_;
95
    // notify recycle workers
96
    std::condition_variable pending_instance_cond_;
97
    std::deque<InstanceInfoPB> pending_instance_queue_;
98
    std::unordered_set<std::string> pending_instance_set_;
99
    std::unordered_map<std::string, std::shared_ptr<InstanceRecycler>> recycling_instance_map_;
100
    // notify instance scanner and lease thread
101
    std::condition_variable notifier_;
102
103
    std::string ip_port_;
104
105
    WhiteBlackList instance_filter_;
106
    std::unique_ptr<Checker> checker_;
107
108
    RecyclerThreadPoolGroup _thread_pool_group;
109
110
    std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
111
};
112
113
enum class RowsetRecyclingState {
114
    FORMAL_ROWSET,
115
    TMP_ROWSET,
116
};
117
118
class InstanceRecycler {
119
public:
120
    explicit InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const InstanceInfoPB& instance,
121
                              RecyclerThreadPoolGroup thread_pool_group,
122
                              std::shared_ptr<TxnLazyCommitter> txn_lazy_committer);
123
    ~InstanceRecycler();
124
125
    // returns 0 for success otherwise error
126
    int init();
127
128
0
    void stop() { stopped_.store(true, std::memory_order_release); }
129
32
    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
130
131
    // returns 0 for success otherwise error
132
    int do_recycle();
133
134
    // remove all kv and data in this instance, ONLY be called when instance has been deleted
135
    // returns 0 for success otherwise error
136
    int recycle_deleted_instance();
137
138
    // scan and recycle expired indexes:
139
    // 1. dropped table, dropped mv
140
    // 2. half-successtable/index when create
141
    // returns 0 for success otherwise error
142
    int recycle_indexes();
143
144
    // scan and recycle expired partitions:
145
    // 1. dropped parttion
146
    // 2. half-success partition when create
147
    // returns 0 for success otherwise error
148
    int recycle_partitions();
149
150
    // scan and recycle expired rowsets:
151
    // 1. prepare_rowset will produce recycle_rowset before uploading data to remote storage (memo)
152
    // 2. compaction will change the input rowsets to recycle_rowset
153
    // returns 0 for success otherwise error
154
    int recycle_rowsets();
155
156
    // scan and recycle expired tmp rowsets:
157
    // 1. commit_rowset will produce tmp_rowset when finish upload data (load or compaction) to remote storage
158
    // returns 0 for success otherwise error
159
    int recycle_tmp_rowsets();
160
161
    /**
162
     * recycle all tablets belonging to the index specified by `index_id`
163
     *
164
     * @param partition_id if positive, only recycle tablets in this partition belonging to the specified index
165
     * @param is_empty_tablet indicates whether the tablet has object files, can skip delete objects if tablet is empty
166
     * @return 0 for success otherwise error
167
     */
168
    int recycle_tablets(int64_t table_id, int64_t index_id, int64_t partition_id = -1,
169
                        bool is_empty_tablet = false);
170
171
    /**
172
     * recycle all rowsets belonging to the tablet specified by `tablet_id`
173
     *
174
     * @return 0 for success otherwise error
175
     */
176
    int recycle_tablet(int64_t tablet_id);
177
178
    // scan and recycle useless partition version kv
179
    int recycle_versions();
180
181
    // scan and abort timeout txn label
182
    // returns 0 for success otherwise error
183
    int abort_timeout_txn();
184
185
    //scan and recycle expire txn label
186
    // returns 0 for success otherwise error
187
    int recycle_expired_txn_label();
188
189
    // scan and recycle finished or timeout copy jobs
190
    // returns 0 for success otherwise error
191
    int recycle_copy_jobs();
192
193
    // scan and recycle dropped internal stage
194
    // returns 0 for success otherwise error
195
    int recycle_stage();
196
197
    // scan and recycle expired stage objects
198
    // returns 0 for success otherwise error
199
    int recycle_expired_stage_objects();
200
201
    bool check_recycle_tasks();
202
203
private:
204
    // returns 0 for success otherwise error
205
    int init_obj_store_accessors();
206
207
    // returns 0 for success otherwise error
208
    int init_storage_vault_accessors();
209
210
    /**
211
     * Scan key-value pairs between [`begin`, `end`), and perform `recycle_func` on each key-value pair.
212
     *
213
     * @param recycle_func defines how to recycle resources corresponding to a key-value pair. Returns 0 if the recycling is successful.
214
     * @param loop_done is called after `RangeGetIterator` has no next kv. Usually used to perform a batch recycling. Returns 0 if success. 
215
     * @return 0 if all corresponding resources are recycled successfully, otherwise non-zero
216
     */
217
    int scan_and_recycle(std::string begin, std::string_view end,
218
                         std::function<int(std::string_view k, std::string_view v)> recycle_func,
219
                         std::function<int()> loop_done = nullptr);
220
221
    // return 0 for success otherwise error
222
    int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb);
223
224
    // return 0 for success otherwise error
225
    // NOTE: this function ONLY be called when the file paths cannot be calculated
226
    int delete_rowset_data(const std::string& resource_id, int64_t tablet_id,
227
                           const std::string& rowset_id);
228
229
    // return 0 for success otherwise error
230
    int delete_rowset_data(const std::vector<doris::RowsetMetaCloudPB>& rowsets,
231
                           RowsetRecyclingState type);
232
233
    /**
234
     * Get stage storage info from instance and init StorageVaultAccessor
235
     * @return 0 if accessor is successfully inited, 1 if stage not found, negative for error
236
     */
237
    int init_copy_job_accessor(const std::string& stage_id, const StagePB::StageType& stage_type,
238
                               std::shared_ptr<StorageVaultAccessor>* accessor);
239
240
    void register_recycle_task(const std::string& task_name, int64_t start_time);
241
242
    void unregister_recycle_task(const std::string& task_name);
243
244
private:
245
    std::atomic_bool stopped_ {false};
246
    std::shared_ptr<TxnKv> txn_kv_;
247
    std::string instance_id_;
248
    InstanceInfoPB instance_info_;
249
250
    // TODO(plat1ko): Add new accessor to map in runtime for new created storage vaults
251
    std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>> accessor_map_;
252
    using InvertedIndexInfo =
253
            std::pair<InvertedIndexStorageFormatPB, std::vector<std::pair<int64_t, std::string>>>;
254
255
    class InvertedIndexIdCache;
256
    std::unique_ptr<InvertedIndexIdCache> inverted_index_id_cache_;
257
258
    std::mutex recycled_tablets_mtx_;
259
    // Store recycled tablets, we can skip deleting rowset data of these tablets because these data has already been deleted.
260
    std::unordered_set<int64_t> recycled_tablets_;
261
262
    std::mutex recycle_tasks_mutex;
263
    // <task_name, start_time>>
264
    std::map<std::string, int64_t> running_recycle_tasks;
265
266
    RecyclerThreadPoolGroup _thread_pool_group;
267
268
    std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
269
};
270
271
} // namespace doris::cloud