/root/doris/cloud/src/recycler/checker.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #if defined(USE_LIBCPP) && _LIBCPP_ABI_VERSION <= 1 |
21 | | #define _LIBCPP_ABI_INCOMPLETE_TYPES_IN_DEQUE |
22 | | #endif |
23 | | #include <atomic> |
24 | | #include <condition_variable> |
25 | | #include <deque> |
26 | | #include <functional> |
27 | | #include <thread> |
28 | | #include <unordered_map> |
29 | | #include <unordered_set> |
30 | | |
31 | | #include "recycler/storage_vault_accessor.h" |
32 | | #include "recycler/white_black_list.h" |
33 | | |
34 | | namespace doris { |
35 | | class RowsetMetaCloudPB; |
36 | | } // namespace doris |
37 | | |
38 | | namespace doris::cloud { |
39 | | class StorageVaultAccessor; |
40 | | class InstanceChecker; |
41 | | class TxnKv; |
42 | | class InstanceInfoPB; |
43 | | |
44 | | class Checker { |
45 | | public: |
46 | | explicit Checker(std::shared_ptr<TxnKv> txn_kv); |
47 | | ~Checker(); |
48 | | |
49 | | int start(); |
50 | | |
51 | | void stop(); |
52 | 321 | bool stopped() const { return stopped_.load(std::memory_order_acquire); } |
53 | | |
54 | | private: |
55 | | void lease_check_jobs(); |
56 | | void inspect_instance_check_interval(); |
57 | | void do_inspect(const InstanceInfoPB& instance); |
58 | | |
59 | | private: |
60 | | friend class RecyclerServiceImpl; |
61 | | |
62 | | std::shared_ptr<TxnKv> txn_kv_; |
63 | | std::atomic_bool stopped_ {false}; |
64 | | std::string ip_port_; |
65 | | std::vector<std::thread> workers_; |
66 | | |
67 | | std::mutex mtx_; |
68 | | // notify check workers |
69 | | std::condition_variable pending_instance_cond_; |
70 | | std::deque<InstanceInfoPB> pending_instance_queue_; |
71 | | // instance_id -> enqueue_timestamp |
72 | | std::unordered_map<std::string, long> pending_instance_map_; |
73 | | std::unordered_map<std::string, std::shared_ptr<InstanceChecker>> working_instance_map_; |
74 | | // notify instance scanner and lease thread |
75 | | std::condition_variable notifier_; |
76 | | |
77 | | WhiteBlackList instance_filter_; |
78 | | }; |
79 | | |
80 | | class InstanceChecker { |
81 | | public: |
82 | | explicit InstanceChecker(std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id); |
83 | | // Return 0 if success, otherwise error |
84 | | int init(const InstanceInfoPB& instance); |
85 | | // Return 0 if success. |
86 | | // Return 1 if data leak is identified. |
87 | | // Return negative if a temporary error occurred during the check process. |
88 | | int do_inverted_check(); |
89 | | |
90 | | // Return 0 if success. |
91 | | // Return 1 if data loss is identified. |
92 | | // Return negative if a temporary error occurred during the check process. |
93 | | int do_check(); |
94 | | |
95 | | // Return 0 if success. |
96 | | // Return 1 if delete bitmap leak is identified. |
97 | | // Return negative if a temporary error occurred during the check process. |
98 | | int do_delete_bitmap_inverted_check(); |
99 | | |
100 | | // version = 1 : https://github.com/apache/doris/pull/40204 |
101 | | // checks if https://github.com/apache/doris/pull/40204 works as expected |
102 | | // the stale delete bitmap will be cleared in MS when BE delete expired stale rowsets |
103 | | // NOTE: stale rowsets will be lost after BE restarts, so there may be some stale delete bitmaps |
104 | | // which will not be cleared. |
105 | | // version = 2 : https://github.com/apache/doris/pull/49822 |
106 | | int do_delete_bitmap_storage_optimize_check(int version = 2); |
107 | | |
108 | | int do_mow_job_key_check(); |
109 | | |
110 | | int do_tablet_stats_key_check(); |
111 | | |
112 | | int do_restore_job_check(); |
113 | | |
114 | | int do_txn_key_check(); |
115 | | |
116 | | // If there are multiple buckets, return the minimum lifecycle; if there are no buckets (i.e. |
117 | | // all accessors are HdfsAccessor), return INT64_MAX. |
118 | | // Return 0 if success, otherwise error |
119 | | int get_bucket_lifecycle(int64_t* lifecycle_days); |
120 | 0 | void stop() { stopped_.store(true, std::memory_order_release); } |
121 | 5.79k | bool stopped() const { return stopped_.load(std::memory_order_acquire); } |
122 | | |
123 | | private: |
124 | | struct RowsetIndexesFormatV1 { |
125 | | std::string rowset_id; |
126 | | std::unordered_set<int64_t> segment_ids; |
127 | | std::unordered_set<std::string> index_ids; |
128 | | }; |
129 | | |
130 | | struct RowsetIndexesFormatV2 { |
131 | | std::string rowset_id; |
132 | | std::unordered_set<int64_t> segment_ids; |
133 | | }; |
134 | | |
135 | | private: |
136 | | // returns 0 for success otherwise error |
137 | | int init_obj_store_accessors(const InstanceInfoPB& instance); |
138 | | |
139 | | // returns 0 for success otherwise error |
140 | | int init_storage_vault_accessors(const InstanceInfoPB& instance); |
141 | | |
142 | | int traverse_mow_tablet(const std::function<int(int64_t, bool)>& check_func); |
143 | | int traverse_rowset_delete_bitmaps( |
144 | | int64_t tablet_id, std::string rowset_id, |
145 | | const std::function<int(int64_t, std::string_view, int64_t, int64_t)>& callback); |
146 | | int collect_tablet_rowsets( |
147 | | int64_t tablet_id, |
148 | | const std::function<void(const doris::RowsetMetaCloudPB&)>& collect_cb); |
149 | | int get_pending_delete_bitmap_keys(int64_t tablet_id, |
150 | | std::unordered_set<std::string>& pending_delete_bitmaps); |
151 | | int check_delete_bitmap_storage_optimize_v2(int64_t tablet_id, bool has_sequence_col, |
152 | | int64_t& abnormal_rowsets_num); |
153 | | |
154 | | int check_inverted_index_file_storage_format_v1(int64_t tablet_id, const std::string& file_path, |
155 | | const std::string& rowset_info, |
156 | | RowsetIndexesFormatV1& rowset_index_cache_v1); |
157 | | |
158 | | int check_inverted_index_file_storage_format_v2(int64_t tablet_id, const std::string& file_path, |
159 | | const std::string& rowset_info, |
160 | | RowsetIndexesFormatV2& rowset_index_cache_v2); |
161 | | |
162 | | // Return 0 if success. |
163 | | // Return 1 if key loss is abnormal. |
164 | | // Return negative if a temporary error occurred during the check process. |
165 | | int check_stats_tablet_key(std::string_view key, std::string_view value); |
166 | | |
167 | | // Return 0 if success. |
168 | | // Return 1 if key loss is identified. |
169 | | // Return negative if a temporary error occurred during the check process. |
170 | | int check_stats_tablet_key_exists(std::string_view key, std::string_view value); |
171 | | |
172 | | // Return 0 if success. |
173 | | // Return 1 if key leak is identified. |
174 | | // Return negative if a temporary error occurred during the check process. |
175 | | int check_stats_tablet_key_leaked(std::string_view key, std::string_view value); |
176 | | int check_txn_info_key(std::string_view key, std::string_view value); |
177 | | |
178 | | int check_txn_label_key(std::string_view key, std::string_view value); |
179 | | |
180 | | int check_txn_index_key(std::string_view key, std::string_view value); |
181 | | |
182 | | int check_txn_running_key(std::string_view key, std::string_view value); |
183 | | |
184 | | /** |
185 | | * It is used to scan the key in the range from start_key to end_key |
186 | | * and then perform handle operations on each group of kv |
187 | | * |
188 | | * @param start_key Range begining. Note that this function will modify the `start_key` |
189 | | * @param end_key Range ending |
190 | | * @param handle_kv Operations on kv |
191 | | * @return code int 0 for success to scan and hanle, 1 for success to scan but handle abnormally, -1 for failed to handle |
192 | | */ |
193 | | int scan_and_handle_kv(std::string& start_key, const std::string& end_key, |
194 | | std::function<int(std::string_view, std::string_view)> handle_kv); |
195 | | |
196 | | std::atomic_bool stopped_ {false}; |
197 | | std::shared_ptr<TxnKv> txn_kv_; |
198 | | std::string instance_id_; |
199 | | // id -> accessor |
200 | | std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>> accessor_map_; |
201 | | }; |
202 | | |
203 | | } // namespace doris::cloud |