Coverage Report

Created: 2025-12-26 15:06

/root/doris/cloud/src/recycler/checker.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include "resource-manager/resource_manager.h"
21
#if defined(USE_LIBCPP) && _LIBCPP_ABI_VERSION <= 1
22
#define _LIBCPP_ABI_INCOMPLETE_TYPES_IN_DEQUE
23
#endif
24
#include <atomic>
25
#include <condition_variable>
26
#include <deque>
27
#include <functional>
28
#include <thread>
29
#include <unordered_map>
30
#include <unordered_set>
31
32
#include "recycler/storage_vault_accessor.h"
33
#include "recycler/white_black_list.h"
34
#include "snapshot/snapshot_manager.h"
35
36
namespace doris {
37
class RowsetMetaCloudPB;
38
} // namespace doris
39
40
namespace doris::cloud {
41
class StorageVaultAccessor;
42
class InstanceChecker;
43
class TxnKv;
44
class InstanceInfoPB;
45
46
class Checker {
47
public:
48
    explicit Checker(std::shared_ptr<TxnKv> txn_kv);
49
    ~Checker();
50
51
    int start();
52
53
    void stop();
54
326
    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
55
56
private:
57
    void lease_check_jobs();
58
    void inspect_instance_check_interval();
59
    void do_inspect(const InstanceInfoPB& instance);
60
61
private:
62
    friend class RecyclerServiceImpl;
63
64
    std::shared_ptr<TxnKv> txn_kv_;
65
    std::atomic_bool stopped_ {false};
66
    std::string ip_port_;
67
    std::vector<std::thread> workers_;
68
69
    std::mutex mtx_;
70
    // notify check workers
71
    std::condition_variable pending_instance_cond_;
72
    std::deque<InstanceInfoPB> pending_instance_queue_;
73
    // instance_id -> enqueue_timestamp
74
    std::unordered_map<std::string, long> pending_instance_map_;
75
    std::unordered_map<std::string, std::shared_ptr<InstanceChecker>> working_instance_map_;
76
    // notify instance scanner and lease thread
77
    std::condition_variable notifier_;
78
79
    WhiteBlackList instance_filter_;
80
};
81
82
class InstanceChecker {
83
public:
84
    explicit InstanceChecker(std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id);
85
    // Return 0 if success, otherwise error
86
    int init(const InstanceInfoPB& instance);
87
    // Return 0 if success.
88
    // Return 1 if data leak is identified.
89
    // Return negative if a temporary error occurred during the check process.
90
    int do_inverted_check();
91
92
    // Return 0 if success.
93
    // Return 1 if data loss is identified.
94
    // Return negative if a temporary error occurred during the check process.
95
    int do_check();
96
97
    // Return 0 if success.
98
    // Return 1 if delete bitmap leak is identified.
99
    // Return negative if a temporary error occurred during the check process.
100
    int do_delete_bitmap_inverted_check();
101
102
    // version = 1 : https://github.com/apache/doris/pull/40204
103
    // checks if https://github.com/apache/doris/pull/40204 works as expected
104
    // the stale delete bitmap will be cleared in MS when BE delete expired stale rowsets
105
    // NOTE: stale rowsets will be lost after BE restarts, so there may be some stale delete bitmaps
106
    // which will not be cleared.
107
    // version = 2 : https://github.com/apache/doris/pull/49822
108
    int do_delete_bitmap_storage_optimize_check(int version = 2);
109
110
    int do_mow_job_key_check();
111
112
    int do_tablet_stats_key_check();
113
114
    int do_restore_job_check();
115
116
    int do_txn_key_check();
117
118
    // check table and partition version key
119
    // table version should be greater than the versions of all its partitions
120
    // Return 0 if success, otherwise error
121
    int do_version_key_check();
122
123
    // Return 0 if success.
124
    // Return 1 if meta rowset key leak or loss is identified.
125
    // Return negative if a temporary error occurred during the check process.
126
    int do_meta_rowset_key_check();
127
128
    // Return 0 if success.
129
    // Return 1 if snapshot key and file leak or loss is identified.
130
    // Return negative if a temporary error occurred during the check process.
131
    int do_snapshots_check();
132
133
    // Return 0 if success.
134
    // Return 1 if mvcc meta key and data leak or loss is identified.
135
    // Return negative if a temporary error occurred during the check process.
136
    int do_mvcc_meta_key_check();
137
138
    // Return 0 if success.
139
    // Return 1 if packed file metadata leak or loss is identified.
140
    // Return negative if a temporary error occurred during the check process.
141
    int do_packed_file_check();
142
143
    StorageVaultAccessor* get_accessor(const std::string& id);
144
145
0
    ResourceManager* resource_mgr() const { return resource_mgr_.get(); }
146
147
    void get_all_accessor(std::vector<StorageVaultAccessor*>* accessors);
148
149
0
    std::string_view instance_id() const { return instance_id_; }
150
151
0
    void TEST_add_accessor(std::string_view id, std::shared_ptr<StorageVaultAccessor> accessor) {
152
0
        accessor_map_.insert({std::string(id), std::move(accessor)});
153
0
    }
154
155
    // If there are multiple buckets, return the minimum lifecycle; if there are no buckets (i.e.
156
    // all accessors are HdfsAccessor), return INT64_MAX.
157
    // Return 0 if success, otherwise error
158
    int get_bucket_lifecycle(int64_t* lifecycle_days);
159
0
    void stop() { stopped_.store(true, std::memory_order_release); }
160
6.01k
    bool stopped() const { return stopped_.load(std::memory_order_acquire); }
161
162
private:
163
    struct RowsetIndexesFormatV1 {
164
        std::string rowset_id;
165
        std::unordered_set<int64_t> segment_ids;
166
        std::unordered_set<std::string> index_ids;
167
    };
168
169
    struct RowsetIndexesFormatV2 {
170
        std::string rowset_id;
171
        std::unordered_set<int64_t> segment_ids;
172
    };
173
174
private:
175
    // returns 0 for success otherwise error
176
    int init_obj_store_accessors(const InstanceInfoPB& instance);
177
178
    // returns 0 for success otherwise error
179
    int init_storage_vault_accessors(const InstanceInfoPB& instance);
180
181
    int traverse_mow_tablet(const std::function<int(int64_t, bool)>& check_func);
182
    int traverse_rowset_delete_bitmaps(
183
            int64_t tablet_id, std::string rowset_id,
184
            const std::function<int(int64_t, std::string_view, int64_t, int64_t)>& callback);
185
    int collect_tablet_rowsets(
186
            int64_t tablet_id,
187
            const std::function<void(const doris::RowsetMetaCloudPB&)>& collect_cb);
188
    int get_pending_delete_bitmap_keys(int64_t tablet_id,
189
                                       std::unordered_set<std::string>& pending_delete_bitmaps);
190
    int check_delete_bitmap_storage_optimize_v2(int64_t tablet_id, bool has_sequence_col,
191
                                                int64_t& abnormal_rowsets_num);
192
193
    int check_inverted_index_file_storage_format_v1(int64_t tablet_id, const std::string& file_path,
194
                                                    const std::string& rowset_info,
195
                                                    RowsetIndexesFormatV1& rowset_index_cache_v1);
196
197
    int check_inverted_index_file_storage_format_v2(int64_t tablet_id, const std::string& file_path,
198
                                                    const std::string& rowset_info,
199
                                                    RowsetIndexesFormatV2& rowset_index_cache_v2);
200
201
    // Return 0 if success.
202
    // Return 1 if key loss is abnormal.
203
    // Return negative if a temporary error occurred during the check process.
204
    int check_stats_tablet_key(std::string_view key, std::string_view value);
205
206
    // Return 0 if success.
207
    // Return 1 if key loss is identified.
208
    // Return negative if a temporary error occurred during the check process.
209
    int check_stats_tablet_key_exists(std::string_view key, std::string_view value);
210
211
    // Return 0 if success.
212
    // Return 1 if key leak is identified.
213
    // Return negative if a temporary error occurred during the check process.
214
    int check_stats_tablet_key_leaked(std::string_view key, std::string_view value);
215
    int check_txn_info_key(std::string_view key, std::string_view value);
216
217
    int check_txn_label_key(std::string_view key, std::string_view value);
218
219
    int check_txn_index_key(std::string_view key, std::string_view value);
220
221
    int check_txn_running_key(std::string_view key, std::string_view value);
222
223
    // Only check whether the meta rowset key is leak
224
    // in do_inverted_check() function, check whether the key is lost by comparing data file with key
225
    // Return 0 if success.
226
    // Return 1 if meta rowset key leak is identified.
227
    // Return negative if a temporary error occurred during the check process.
228
    int check_meta_rowset_key(std::string_view key, std::string_view value);
229
230
    // if TxnInfoKey's finish time > current time, it should not find tmp rowset
231
    // Return 0 if success.
232
    // Return 1 if meta tmp rowset key is abnormal.
233
    // Return negative if a temporary error occurred during the check process.
234
    int check_meta_tmp_rowset_key(std::string_view key, std::string_view value);
235
236
    /**
237
     * It is used to scan the key in the range from start_key to end_key 
238
     * and then perform handle operations on each group of kv
239
     * 
240
     * @param start_key Range begining. Note that this function will modify the `start_key`
241
     * @param end_key Range ending
242
     * @param handle_kv Operations on kv
243
     * @return code int 0 for success to scan and hanle, 1 for success to scan but handle abnormally, -1 for failed to handle 
244
     */
245
    int scan_and_handle_kv(std::string& start_key, const std::string& end_key,
246
                           std::function<int(std::string_view, std::string_view)> handle_kv);
247
248
    std::atomic_bool stopped_ {false};
249
    std::shared_ptr<TxnKv> txn_kv_;
250
    std::string instance_id_;
251
    // id -> accessor
252
    std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>> accessor_map_;
253
    std::shared_ptr<SnapshotManager> snapshot_manager_;
254
    std::shared_ptr<ResourceManager> resource_mgr_;
255
};
256
257
} // namespace doris::cloud