be/src/cloud/cloud_meta_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | #include "cloud/cloud_meta_mgr.h" |
18 | | |
19 | | #include <brpc/channel.h> |
20 | | #include <brpc/controller.h> |
21 | | #include <brpc/errno.pb.h> |
22 | | #include <bthread/bthread.h> |
23 | | #include <bthread/condition_variable.h> |
24 | | #include <bthread/mutex.h> |
25 | | #include <gen_cpp/FrontendService.h> |
26 | | #include <gen_cpp/HeartbeatService_types.h> |
27 | | #include <gen_cpp/PlanNodes_types.h> |
28 | | #include <gen_cpp/Types_types.h> |
29 | | #include <gen_cpp/cloud.pb.h> |
30 | | #include <gen_cpp/olap_file.pb.h> |
31 | | #include <glog/logging.h> |
32 | | |
33 | | #include <algorithm> |
34 | | #include <atomic> |
35 | | #include <chrono> |
36 | | #include <cstdint> |
37 | | #include <memory> |
38 | | #include <mutex> |
39 | | #include <random> |
40 | | #include <shared_mutex> |
41 | | #include <string> |
42 | | #include <type_traits> |
43 | | #include <vector> |
44 | | |
45 | | #include "cloud/cloud_ms_backpressure_handler.h" |
46 | | #include "cloud/cloud_ms_rpc_rate_limiters.h" |
47 | | #include "cloud/cloud_storage_engine.h" |
48 | | #include "cloud/cloud_tablet.h" |
49 | | #include "cloud/cloud_warm_up_manager.h" |
50 | | #include "cloud/config.h" |
51 | | #include "cloud/delete_bitmap_file_reader.h" |
52 | | #include "cloud/delete_bitmap_file_writer.h" |
53 | | #include "cloud/pb_convert.h" |
54 | | #include "common/config.h" |
55 | | #include "common/logging.h" |
56 | | #include "common/status.h" |
57 | | #include "cpp/sync_point.h" |
58 | | #include "io/fs/obj_storage_client.h" |
59 | | #include "load/stream_load/stream_load_context.h" |
60 | | #include "runtime/exec_env.h" |
61 | | #include "storage/olap_common.h" |
62 | | #include "storage/rowset/rowset.h" |
63 | | #include "storage/rowset/rowset_factory.h" |
64 | | #include "storage/rowset/rowset_fwd.h" |
65 | | #include "storage/storage_engine.h" |
66 | | #include "storage/tablet/tablet_meta.h" |
67 | | #include "util/client_cache.h" |
68 | | #include "util/network_util.h" |
69 | | #include "util/s3_util.h" |
70 | | #include "util/thrift_rpc_helper.h" |
71 | | |
72 | | namespace doris::cloud { |
73 | | using namespace ErrorCode; |
74 | | |
75 | 22 | void* run_bthread_work(void* arg) { |
76 | 22 | auto* f = reinterpret_cast<std::function<void()>*>(arg); |
77 | 22 | (*f)(); |
78 | 22 | delete f; |
79 | 22 | return nullptr; |
80 | 22 | } |
81 | | |
82 | 4 | Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency) { |
83 | 4 | if (tasks.empty()) { |
84 | 0 | return Status::OK(); |
85 | 0 | } |
86 | | |
87 | 4 | bthread::Mutex lock; |
88 | 4 | bthread::ConditionVariable cond; |
89 | 4 | Status status; // Guard by lock |
90 | 4 | int count = 0; // Guard by lock |
91 | | |
92 | 22 | for (const auto& task : tasks) { |
93 | 22 | { |
94 | 22 | std::unique_lock lk(lock); |
95 | | // Wait until there are available slots |
96 | 29 | while (status.ok() && count >= concurrency) { |
97 | 7 | cond.wait(lk); |
98 | 7 | } |
99 | 22 | if (!status.ok()) { |
100 | 2 | break; |
101 | 2 | } |
102 | | |
103 | | // Increase running task count |
104 | 20 | ++count; |
105 | 20 | } |
106 | | |
107 | | // dispatch task into bthreads |
108 | 19 | auto* fn = new std::function<void()>([&, &task = task] { |
109 | 19 | auto st = task(); |
110 | 19 | { |
111 | 19 | std::lock_guard lk(lock); |
112 | 19 | --count; |
113 | 19 | if (!st.ok()) { |
114 | 2 | std::swap(st, status); |
115 | 2 | } |
116 | 19 | cond.notify_one(); |
117 | 19 | } |
118 | 19 | }); |
119 | | |
120 | 20 | bthread_t bthread_id; |
121 | 20 | if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, fn) != 0) { |
122 | 0 | run_bthread_work(fn); |
123 | 0 | } |
124 | 20 | } |
125 | | |
126 | | // Wait until all running tasks have done |
127 | 4 | { |
128 | 4 | std::unique_lock lk(lock); |
129 | 9 | while (count > 0) { |
130 | 5 | cond.wait(lk); |
131 | 5 | } |
132 | 4 | } |
133 | | |
134 | 4 | return status; |
135 | 4 | } |
136 | | |
137 | | Status bthread_fork_join(std::vector<std::function<Status()>>&& tasks, int concurrency, |
138 | 2 | std::future<Status>* fut) { |
139 | | // std::function will cause `copy`, we need to use heap memory to avoid copy ctor called |
140 | 2 | auto prom = std::make_shared<std::promise<Status>>(); |
141 | 2 | *fut = prom->get_future(); |
142 | 2 | std::function<void()>* fn = new std::function<void()>( |
143 | 2 | [tasks = std::move(tasks), concurrency, p = std::move(prom)]() mutable { |
144 | 2 | p->set_value(bthread_fork_join(tasks, concurrency)); |
145 | 2 | }); |
146 | | |
147 | 2 | bthread_t bthread_id; |
148 | 2 | if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, fn) != 0) { |
149 | 0 | delete fn; |
150 | 0 | return Status::InternalError<false>("failed to create bthread"); |
151 | 0 | } |
152 | 2 | return Status::OK(); |
153 | 2 | } |
154 | | |
155 | | namespace { |
156 | | constexpr int kBrpcRetryTimes = 3; |
157 | | |
158 | | bvar::LatencyRecorder _get_rowset_latency("doris_cloud_meta_mgr_get_rowset"); |
159 | | bvar::LatencyRecorder g_cloud_commit_txn_resp_redirect_latency("cloud_table_stats_report_latency"); |
160 | | bvar::Adder<uint64_t> g_cloud_meta_mgr_rpc_timeout_count("cloud_meta_mgr_rpc_timeout_count"); |
161 | | bvar::Window<bvar::Adder<uint64_t>> g_cloud_ms_rpc_timeout_count_window( |
162 | | "cloud_meta_mgr_rpc_timeout_qps", &g_cloud_meta_mgr_rpc_timeout_count, 30); |
163 | | bvar::LatencyRecorder g_cloud_be_mow_get_dbm_lock_backoff_sleep_time( |
164 | | "cloud_be_mow_get_dbm_lock_backoff_sleep_time"); |
165 | | bvar::Adder<uint64_t> g_cloud_version_hole_filled_count("cloud_version_hole_filled_count"); |
166 | | |
167 | | class MetaServiceProxy { |
168 | | public: |
169 | 21 | static Status get_proxy(MetaServiceProxy** proxy) { |
170 | | // The 'stub' is a useless parameter, added only to reuse the `get_pooled_client` function. |
171 | 21 | std::shared_ptr<MetaService_Stub> stub; |
172 | 21 | return get_pooled_client(&stub, proxy); |
173 | 21 | } |
174 | | |
175 | 0 | void set_unhealthy() { |
176 | 0 | std::unique_lock lock(_mutex); |
177 | 0 | maybe_unhealthy = true; |
178 | 0 | } |
179 | | |
180 | 0 | bool need_reconn(long now) { |
181 | 0 | return maybe_unhealthy && ((now - last_reconn_time_ms.front()) > |
182 | 0 | config::meta_service_rpc_reconnect_interval_ms); |
183 | 0 | } |
184 | | |
185 | 0 | Status get(std::shared_ptr<MetaService_Stub>* stub) { |
186 | 0 | using namespace std::chrono; |
187 | |
|
188 | 0 | auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
189 | 0 | { |
190 | 0 | std::shared_lock lock(_mutex); |
191 | 0 | if (_deadline_ms >= now && !is_idle_timeout(now) && !need_reconn(now)) { |
192 | 0 | _last_access_at_ms.store(now, std::memory_order_relaxed); |
193 | 0 | *stub = _stub; |
194 | 0 | return Status::OK(); |
195 | 0 | } |
196 | 0 | } |
197 | | |
198 | 0 | auto channel = std::make_unique<brpc::Channel>(); |
199 | 0 | Status s = init_channel(channel.get()); |
200 | 0 | if (!s.ok()) [[unlikely]] { |
201 | 0 | return s; |
202 | 0 | } |
203 | | |
204 | 0 | *stub = std::make_shared<MetaService_Stub>(channel.release(), |
205 | 0 | google::protobuf::Service::STUB_OWNS_CHANNEL); |
206 | |
|
207 | 0 | long deadline = now; |
208 | | // connection age only works without list endpoint. |
209 | 0 | if (config::meta_service_connection_age_base_seconds > 0) { |
210 | 0 | std::default_random_engine rng(static_cast<uint32_t>(now)); |
211 | 0 | std::uniform_int_distribution<> uni( |
212 | 0 | config::meta_service_connection_age_base_seconds, |
213 | 0 | config::meta_service_connection_age_base_seconds * 2); |
214 | 0 | deadline = now + duration_cast<milliseconds>(seconds(uni(rng))).count(); |
215 | 0 | } |
216 | | |
217 | | // Last one WIN |
218 | 0 | std::unique_lock lock(_mutex); |
219 | 0 | _last_access_at_ms.store(now, std::memory_order_relaxed); |
220 | 0 | _deadline_ms = deadline; |
221 | 0 | _stub = *stub; |
222 | |
|
223 | 0 | last_reconn_time_ms.push(now); |
224 | 0 | last_reconn_time_ms.pop(); |
225 | 0 | maybe_unhealthy = false; |
226 | |
|
227 | 0 | return Status::OK(); |
228 | 0 | } |
229 | | |
230 | | private: |
231 | 0 | static bool is_meta_service_endpoint_list() { |
232 | 0 | return config::meta_service_endpoint.find(',') != std::string::npos; |
233 | 0 | } |
234 | | |
235 | | /** |
236 | | * This function initializes a pool of `MetaServiceProxy` objects and selects one using |
237 | | * round-robin. It returns a client stub via the selected proxy. |
238 | | * |
239 | | * @param stub A pointer to a shared pointer of `MetaService_Stub` to be retrieved. |
240 | | * @param proxy (Optional) A pointer to store the selected `MetaServiceProxy`. |
241 | | * |
242 | | * @return Status Returns `Status::OK()` on success or an error status on failure. |
243 | | */ |
244 | | static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub, |
245 | 21 | MetaServiceProxy** proxy) { |
246 | 21 | static std::once_flag proxies_flag; |
247 | 21 | static size_t num_proxies = 1; |
248 | 21 | static std::atomic<size_t> index(0); |
249 | 21 | static std::unique_ptr<MetaServiceProxy[]> proxies; |
250 | 21 | if (config::meta_service_endpoint.empty()) { |
251 | 21 | return Status::InvalidArgument( |
252 | 21 | "Meta service endpoint is empty. Please configure manually or wait for " |
253 | 21 | "heartbeat to obtain."); |
254 | 21 | } |
255 | 0 | std::call_once( |
256 | 0 | proxies_flag, +[]() { |
257 | 0 | if (config::meta_service_connection_pooled) { |
258 | 0 | num_proxies = config::meta_service_connection_pool_size; |
259 | 0 | } |
260 | 0 | proxies = std::make_unique<MetaServiceProxy[]>(num_proxies); |
261 | 0 | }); |
262 | |
|
263 | 0 | for (size_t i = 0; i + 1 < num_proxies; ++i) { |
264 | 0 | size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies; |
265 | 0 | Status s = proxies[next_index].get(stub); |
266 | 0 | if (proxy != nullptr) { |
267 | 0 | *proxy = &(proxies[next_index]); |
268 | 0 | } |
269 | 0 | if (s.ok()) return Status::OK(); |
270 | 0 | } |
271 | | |
272 | 0 | size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies; |
273 | 0 | if (proxy != nullptr) { |
274 | 0 | *proxy = &(proxies[next_index]); |
275 | 0 | } |
276 | 0 | return proxies[next_index].get(stub); |
277 | 0 | } |
278 | | |
279 | 0 | static Status init_channel(brpc::Channel* channel) { |
280 | 0 | static std::atomic<size_t> index = 1; |
281 | |
|
282 | 0 | const char* load_balancer_name = nullptr; |
283 | 0 | std::string endpoint; |
284 | 0 | if (is_meta_service_endpoint_list()) { |
285 | 0 | endpoint = fmt::format("list://{}", config::meta_service_endpoint); |
286 | 0 | load_balancer_name = "random"; |
287 | 0 | } else { |
288 | 0 | std::string ip; |
289 | 0 | uint16_t port; |
290 | 0 | Status s = get_meta_service_ip_and_port(&ip, &port); |
291 | 0 | if (!s.ok()) { |
292 | 0 | LOG(WARNING) << "fail to get meta service ip and port: " << s; |
293 | 0 | return s; |
294 | 0 | } |
295 | | |
296 | 0 | endpoint = get_host_port(ip, port); |
297 | 0 | } |
298 | | |
299 | 0 | brpc::ChannelOptions options; |
300 | 0 | options.connection_group = |
301 | 0 | fmt::format("ms_{}", index.fetch_add(1, std::memory_order_relaxed)); |
302 | 0 | if (channel->Init(endpoint.c_str(), load_balancer_name, &options) != 0) { |
303 | 0 | return Status::InvalidArgument("failed to init brpc channel, endpoint: {}", endpoint); |
304 | 0 | } |
305 | 0 | return Status::OK(); |
306 | 0 | } |
307 | | |
308 | 0 | static Status get_meta_service_ip_and_port(std::string* ip, uint16_t* port) { |
309 | 0 | std::string parsed_host; |
310 | 0 | if (!parse_endpoint(config::meta_service_endpoint, &parsed_host, port)) { |
311 | 0 | return Status::InvalidArgument("invalid meta service endpoint: {}", |
312 | 0 | config::meta_service_endpoint); |
313 | 0 | } |
314 | 0 | if (is_valid_ip(parsed_host)) { |
315 | 0 | *ip = std::move(parsed_host); |
316 | 0 | return Status::OK(); |
317 | 0 | } |
318 | 0 | return hostname_to_ip(parsed_host, *ip); |
319 | 0 | } |
320 | | |
321 | 0 | bool is_idle_timeout(long now) { |
322 | 0 | auto idle_timeout_ms = config::meta_service_idle_connection_timeout_ms; |
323 | | // idle timeout only works without list endpoint. |
324 | 0 | return !is_meta_service_endpoint_list() && idle_timeout_ms > 0 && |
325 | 0 | _last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now; |
326 | 0 | } |
327 | | |
328 | | std::shared_mutex _mutex; |
329 | | std::atomic<long> _last_access_at_ms {0}; |
330 | | long _deadline_ms {0}; |
331 | | std::shared_ptr<MetaService_Stub> _stub; |
332 | | |
333 | | std::queue<long> last_reconn_time_ms {std::deque<long> {0, 0, 0}}; |
334 | | bool maybe_unhealthy = false; |
335 | | }; |
336 | | |
337 | | template <typename T, typename... Ts> |
338 | | struct is_any : std::disjunction<std::is_same<T, Ts>...> {}; |
339 | | |
340 | | template <typename T, typename... Ts> |
341 | | constexpr bool is_any_v = is_any<T, Ts...>::value; |
342 | | |
343 | | template <typename Request> |
344 | 0 | static std::string debug_info(const Request& req) { |
345 | 0 | if constexpr (is_any_v<Request, CommitTxnRequest, AbortTxnRequest, PrecommitTxnRequest>) { |
346 | 0 | return fmt::format(" txn_id={}", req.txn_id()); |
347 | 0 | } else if constexpr (is_any_v<Request, StartTabletJobRequest, FinishTabletJobRequest>) { |
348 | 0 | return fmt::format(" tablet_id={}", req.job().idx().tablet_id()); |
349 | 0 | } else if constexpr (is_any_v<Request, UpdateDeleteBitmapRequest>) { |
350 | 0 | return fmt::format(" tablet_id={}, lock_id={}", req.tablet_id(), req.lock_id()); |
351 | 0 | } else if constexpr (is_any_v<Request, GetDeleteBitmapUpdateLockRequest>) { |
352 | 0 | return fmt::format(" table_id={}, lock_id={}", req.table_id(), req.lock_id()); |
353 | 0 | } else if constexpr (is_any_v<Request, GetTabletRequest>) { |
354 | 0 | return fmt::format(" tablet_id={}", req.tablet_id()); |
355 | | } else if constexpr (is_any_v<Request, GetObjStoreInfoRequest, ListSnapshotRequest, |
356 | 0 | GetInstanceRequest, GetClusterStatusRequest>) { |
357 | 0 | return ""; |
358 | 0 | } else if constexpr (is_any_v<Request, CreateRowsetRequest>) { |
359 | 0 | return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id()); |
360 | | } else if constexpr (is_any_v<Request, RemoveDeleteBitmapRequest>) { |
361 | | return fmt::format(" tablet_id={}", req.tablet_id()); |
362 | 0 | } else if constexpr (is_any_v<Request, RemoveDeleteBitmapUpdateLockRequest>) { |
363 | 0 | return fmt::format(" table_id={}, tablet_id={}, lock_id={}", req.table_id(), |
364 | 0 | req.tablet_id(), req.lock_id()); |
365 | 0 | } else if constexpr (is_any_v<Request, GetDeleteBitmapRequest>) { |
366 | 0 | return fmt::format(" tablet_id={}", req.tablet_id()); |
367 | | } else if constexpr (is_any_v<Request, GetSchemaDictRequest>) { |
368 | | return fmt::format(" index_id={}", req.index_id()); |
369 | 0 | } else if constexpr (is_any_v<Request, RestoreJobRequest>) { |
370 | 0 | return fmt::format(" tablet_id={}", req.tablet_id()); |
371 | 0 | } else if constexpr (is_any_v<Request, UpdatePackedFileInfoRequest>) { |
372 | 0 | return fmt::format(" packed_file_path={}", req.packed_file_path()); |
373 | | } else { |
374 | | static_assert(!sizeof(Request)); |
375 | | } |
376 | 0 | } Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_16GetTabletRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_22GetDeleteBitmapRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_19CreateRowsetRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_16CommitTxnRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_15AbortTxnRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_19PrecommitTxnRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_17RestoreJobRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_22GetObjStoreInfoRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_21StartTabletJobRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_22FinishTabletJobRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_25UpdateDeleteBitmapRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_32GetDeleteBitmapUpdateLockRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_35RemoveDeleteBitmapUpdateLockRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_19ListSnapshotRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_18GetInstanceRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_27UpdatePackedFileInfoRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_23GetClusterStatusRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ |
377 | | |
378 | 21 | inline std::default_random_engine make_random_engine() { |
379 | 21 | return std::default_random_engine( |
380 | 21 | static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count())); |
381 | 21 | } |
382 | | |
383 | | // Convert MetaServiceRPC to LoadRelatedRpc |
384 | | // Returns LoadRelatedRpc::COUNT if the RPC is not a load-related RPC |
385 | 0 | LoadRelatedRpc to_load_related_rpc(MetaServiceRPC rpc) { |
386 | 0 | switch (rpc) { |
387 | 0 | case MetaServiceRPC::PREPARE_ROWSET: |
388 | 0 | return LoadRelatedRpc::PREPARE_ROWSET; |
389 | 0 | case MetaServiceRPC::COMMIT_ROWSET: |
390 | 0 | return LoadRelatedRpc::COMMIT_ROWSET; |
391 | 0 | case MetaServiceRPC::UPDATE_TMP_ROWSET: |
392 | 0 | return LoadRelatedRpc::UPDATE_TMP_ROWSET; |
393 | 0 | case MetaServiceRPC::UPDATE_PACKED_FILE_INFO: |
394 | 0 | return LoadRelatedRpc::UPDATE_PACKED_FILE_INFO; |
395 | 0 | case MetaServiceRPC::UPDATE_DELETE_BITMAP: |
396 | 0 | return LoadRelatedRpc::UPDATE_DELETE_BITMAP; |
397 | 0 | default: |
398 | 0 | return LoadRelatedRpc::COUNT; // Not a load-related RPC |
399 | 0 | } |
400 | 0 | } |
401 | | |
402 | | template <typename Request, typename Response> |
403 | | using MetaServiceMethod = void (MetaService_Stub::*)(::google::protobuf::RpcController*, |
404 | | const Request*, Response*, |
405 | | ::google::protobuf::Closure*); |
406 | | |
407 | | // Rate limiting context for retry_rpc |
408 | | struct RpcRateLimitCtx { |
409 | | HostLevelMSRpcRateLimiters* host_limiters {nullptr}; |
410 | | MSBackpressureHandler* backpressure_handler {nullptr}; |
411 | | int64_t table_id {-1}; // For table-level backpressure, passed from caller |
412 | | }; |
413 | | |
414 | | // Apply rate limiting before RPC (both host-level and table-level) |
415 | 0 | void apply_rate_limit(MetaServiceRPC rpc, const RpcRateLimitCtx& ctx) { |
416 | | // Table-level rate limit (for load-related RPCs only) |
417 | 0 | if (ctx.backpressure_handler && ctx.table_id > 0) { |
418 | 0 | LoadRelatedRpc load_rpc = to_load_related_rpc(rpc); |
419 | 0 | if (load_rpc != LoadRelatedRpc::COUNT) { |
420 | 0 | auto wait_until = ctx.backpressure_handler->before_rpc(load_rpc, ctx.table_id); |
421 | 0 | auto now = std::chrono::steady_clock::now(); |
422 | 0 | if (wait_until > now) { |
423 | 0 | auto wait_us = |
424 | 0 | std::chrono::duration_cast<std::chrono::microseconds>(wait_until - now) |
425 | 0 | .count(); |
426 | 0 | if (wait_us > 0) { |
427 | 0 | if (auto* recorder = get_throttle_wait_recorder(load_rpc); |
428 | 0 | recorder != nullptr) { |
429 | 0 | *recorder << wait_us; |
430 | 0 | } |
431 | 0 | bthread_usleep(wait_us); |
432 | 0 | } |
433 | 0 | } |
434 | 0 | } |
435 | 0 | } |
436 | | |
437 | | // Host-level rate limit |
438 | 0 | if (ctx.host_limiters) { |
439 | 0 | ctx.host_limiters->limit(rpc); |
440 | 0 | } |
441 | 0 | } |
442 | | |
443 | | // Record RPC QPS statistics after RPC (for table-level tracking) |
444 | 0 | void record_rpc_qps(MetaServiceRPC rpc, const RpcRateLimitCtx& ctx) { |
445 | 0 | if (ctx.backpressure_handler && ctx.table_id > 0) { |
446 | 0 | LoadRelatedRpc load_rpc = to_load_related_rpc(rpc); |
447 | 0 | if (load_rpc != LoadRelatedRpc::COUNT) { |
448 | 0 | ctx.backpressure_handler->after_rpc(load_rpc, ctx.table_id); |
449 | 0 | } |
450 | 0 | } |
451 | 0 | } |
452 | | |
453 | | template <typename Request, typename Response> |
454 | | Status retry_rpc(MetaServiceRPC rpc, const Request& req, Response* res, |
455 | | MetaServiceMethod<Request, Response> method, |
456 | 21 | const RpcRateLimitCtx& rate_limit_ctx = {}) { |
457 | 21 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); |
458 | 21 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); |
459 | | |
460 | 21 | std::string_view op_name = meta_service_rpc_display_name(rpc); |
461 | | |
462 | | // Applies only to the current file, and all req are non-const, but passed as const types. |
463 | 21 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); |
464 | | |
465 | 21 | int retry_times = 0; |
466 | 21 | uint32_t duration_ms = 0; |
467 | 21 | std::string error_msg; |
468 | 21 | std::default_random_engine rng = make_random_engine(); |
469 | 21 | std::uniform_int_distribution<uint32_t> u(20, 200); |
470 | 21 | std::uniform_int_distribution<uint32_t> u2(500, 1000); |
471 | 21 | MetaServiceProxy* proxy; |
472 | 21 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); |
473 | | |
474 | 0 | while (true) { |
475 | 0 | std::shared_ptr<MetaService_Stub> stub; |
476 | 0 | RETURN_IF_ERROR(proxy->get(&stub)); |
477 | | |
478 | | // Apply rate limiting (both host-level and table-level) |
479 | 0 | apply_rate_limit(rpc, rate_limit_ctx); |
480 | 0 | TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc); |
481 | |
|
482 | 0 | brpc::Controller cntl; |
483 | 0 | if (rpc == MetaServiceRPC::GET_DELETE_BITMAP || |
484 | 0 | rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) { |
485 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); |
486 | 0 | } else { |
487 | 0 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); |
488 | 0 | } |
489 | 0 | cntl.set_max_retry(kBrpcRetryTimes); |
490 | 0 | res->Clear(); |
491 | 0 | int error_code = 0; |
492 | 0 | (stub.get()->*method)(&cntl, &req, res, nullptr); |
493 | | |
494 | | // Record QPS statistics for all RPCs sent to MS (success or failure) |
495 | 0 | record_rpc_qps(rpc, rate_limit_ctx); |
496 | |
|
497 | 0 | if (cntl.Failed()) [[unlikely]] { |
498 | 0 | error_msg = cntl.ErrorText(); |
499 | 0 | error_code = cntl.ErrorCode(); |
500 | 0 | proxy->set_unhealthy(); |
501 | 0 | } else if (res->status().code() == MetaServiceCode::OK) { |
502 | 0 | return Status::OK(); |
503 | 0 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { |
504 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, |
505 | 0 | res->status().msg()); |
506 | 0 | } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) { |
507 | | // MS_BUSY should also be retried |
508 | 0 | if (rate_limit_ctx.backpressure_handler) { |
509 | 0 | rate_limit_ctx.backpressure_handler->on_ms_busy(); |
510 | 0 | } |
511 | 0 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { |
512 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, |
513 | 0 | res->status().msg()); |
514 | 0 | } else { |
515 | 0 | error_msg = res->status().msg(); |
516 | 0 | } |
517 | | |
518 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { |
519 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; |
520 | 0 | } |
521 | |
|
522 | 0 | ++retry_times; |
523 | 0 | if (retry_times > config::meta_service_rpc_retry_times || |
524 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && |
525 | 0 | error_code == brpc::ERPCTIMEDOUT) || |
526 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && |
527 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { |
528 | 0 | break; |
529 | 0 | } |
530 | | |
531 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); |
532 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times |
533 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); |
534 | 0 | bthread_usleep(duration_ms * 1000); |
535 | 0 | } |
536 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); |
537 | 0 | } cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_16GetTabletRequestENS0_17GetTabletResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Line | Count | Source | 456 | 10 | const RpcRateLimitCtx& rate_limit_ctx = {}) { | 457 | 10 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 458 | 10 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 459 | | | 460 | 10 | std::string_view op_name = meta_service_rpc_display_name(rpc); | 461 | | | 462 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 463 | 10 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 464 | | | 465 | 10 | int retry_times = 0; | 466 | 10 | uint32_t duration_ms = 0; | 467 | 10 | std::string error_msg; | 468 | 10 | std::default_random_engine rng = make_random_engine(); | 469 | 10 | std::uniform_int_distribution<uint32_t> u(20, 200); | 470 | 10 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 471 | 10 | MetaServiceProxy* proxy; | 472 | 10 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 473 | | | 474 | 0 | while (true) { | 475 | 0 | std::shared_ptr<MetaService_Stub> stub; | 476 | 0 | RETURN_IF_ERROR(proxy->get(&stub)); | 477 | | | 478 | | // Apply rate limiting (both host-level and table-level) | 479 | 0 | apply_rate_limit(rpc, rate_limit_ctx); | 480 | 0 | TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc); | 481 | |
| 482 | 0 | brpc::Controller cntl; | 483 | 0 | if (rpc == MetaServiceRPC::GET_DELETE_BITMAP || | 484 | 0 | rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) { | 485 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 486 | 0 | } else { | 487 | 0 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 488 | 0 | } | 489 | 0 | cntl.set_max_retry(kBrpcRetryTimes); | 490 | 0 | res->Clear(); | 491 | 0 | int error_code = 0; | 492 | 0 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 493 | | | 494 | | // Record QPS statistics for all RPCs sent to MS (success or failure) | 495 | 0 | record_rpc_qps(rpc, rate_limit_ctx); | 496 | |
| 497 | 0 | if (cntl.Failed()) [[unlikely]] { | 498 | 0 | error_msg = cntl.ErrorText(); | 499 | 0 | error_code = cntl.ErrorCode(); | 500 | 0 | proxy->set_unhealthy(); | 501 | 0 | } else if (res->status().code() == MetaServiceCode::OK) { | 502 | 0 | return Status::OK(); | 503 | 0 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 504 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 505 | 0 | res->status().msg()); | 506 | 0 | } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) { | 507 | | // MS_BUSY should also be retried | 508 | 0 | if (rate_limit_ctx.backpressure_handler) { | 509 | 0 | rate_limit_ctx.backpressure_handler->on_ms_busy(); | 510 | 0 | } | 511 | 0 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 512 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 513 | 0 | res->status().msg()); | 514 | 0 | } else { | 515 | 0 | error_msg = res->status().msg(); | 516 | 0 | } | 517 | | | 518 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 519 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 520 | 0 | } | 521 | |
| 522 | 0 | ++retry_times; | 523 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 524 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 525 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 526 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 527 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 528 | 0 | break; | 529 | 0 | } | 530 | | | 531 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 532 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 533 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 534 | 0 | bthread_usleep(duration_ms * 1000); | 535 | 0 | } | 536 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 537 | 0 | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_22GetDeleteBitmapRequestENS0_23GetDeleteBitmapResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Line | Count | Source | 456 | 11 | const RpcRateLimitCtx& rate_limit_ctx = {}) { | 457 | 11 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 458 | 11 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 459 | | | 460 | 11 | std::string_view op_name = meta_service_rpc_display_name(rpc); | 461 | | | 462 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 463 | 11 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 464 | | | 465 | 11 | int retry_times = 0; | 466 | 11 | uint32_t duration_ms = 0; | 467 | 11 | std::string error_msg; | 468 | 11 | std::default_random_engine rng = make_random_engine(); | 469 | 11 | std::uniform_int_distribution<uint32_t> u(20, 200); | 470 | 11 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 471 | 11 | MetaServiceProxy* proxy; | 472 | 11 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 473 | | | 474 | 0 | while (true) { | 475 | 0 | std::shared_ptr<MetaService_Stub> stub; | 476 | 0 | RETURN_IF_ERROR(proxy->get(&stub)); | 477 | | | 478 | | // Apply rate limiting (both host-level and table-level) | 479 | 0 | apply_rate_limit(rpc, rate_limit_ctx); | 480 | 0 | TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc); | 481 | |
| 482 | 0 | brpc::Controller cntl; | 483 | 0 | if (rpc == MetaServiceRPC::GET_DELETE_BITMAP || | 484 | 0 | rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) { | 485 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 486 | 0 | } else { | 487 | 0 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 488 | 0 | } | 489 | 0 | cntl.set_max_retry(kBrpcRetryTimes); | 490 | 0 | res->Clear(); | 491 | 0 | int error_code = 0; | 492 | 0 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 493 | | | 494 | | // Record QPS statistics for all RPCs sent to MS (success or failure) | 495 | 0 | record_rpc_qps(rpc, rate_limit_ctx); | 496 | |
| 497 | 0 | if (cntl.Failed()) [[unlikely]] { | 498 | 0 | error_msg = cntl.ErrorText(); | 499 | 0 | error_code = cntl.ErrorCode(); | 500 | 0 | proxy->set_unhealthy(); | 501 | 0 | } else if (res->status().code() == MetaServiceCode::OK) { | 502 | 0 | return Status::OK(); | 503 | 0 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 504 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 505 | 0 | res->status().msg()); | 506 | 0 | } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) { | 507 | | // MS_BUSY should also be retried | 508 | 0 | if (rate_limit_ctx.backpressure_handler) { | 509 | 0 | rate_limit_ctx.backpressure_handler->on_ms_busy(); | 510 | 0 | } | 511 | 0 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 512 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 513 | 0 | res->status().msg()); | 514 | 0 | } else { | 515 | 0 | error_msg = res->status().msg(); | 516 | 0 | } | 517 | | | 518 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 519 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 520 | 0 | } | 521 | |
| 522 | 0 | ++retry_times; | 523 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 524 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 525 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 526 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 527 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 528 | 0 | break; | 529 | 0 | } | 530 | | | 531 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 532 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 533 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 534 | 0 | bthread_usleep(duration_ms * 1000); | 535 | 0 | } | 536 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 537 | 0 | } |
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_19CreateRowsetRequestENS0_20CreateRowsetResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_16CommitTxnRequestENS0_17CommitTxnResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_15AbortTxnRequestENS0_16AbortTxnResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_19PrecommitTxnRequestENS0_20PrecommitTxnResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_17RestoreJobRequestENS0_18RestoreJobResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_22GetObjStoreInfoRequestENS0_23GetObjStoreInfoResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_21StartTabletJobRequestENS0_22StartTabletJobResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_22FinishTabletJobRequestENS0_23FinishTabletJobResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_25UpdateDeleteBitmapRequestENS0_26UpdateDeleteBitmapResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_32GetDeleteBitmapUpdateLockRequestENS0_33GetDeleteBitmapUpdateLockResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_35RemoveDeleteBitmapUpdateLockRequestENS0_36RemoveDeleteBitmapUpdateLockResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_19ListSnapshotRequestENS0_20ListSnapshotResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_18GetInstanceRequestENS0_19GetInstanceResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_27UpdatePackedFileInfoRequestENS0_28UpdatePackedFileInfoResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_23GetClusterStatusRequestENS0_24GetClusterStatusResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE |
538 | | |
539 | | } // namespace |
540 | | |
541 | 0 | Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta) { |
542 | 0 | VLOG_DEBUG << "send GetTabletRequest, tablet_id: " << tablet_id; |
543 | 0 | TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::get_tablet_meta", Status::OK(), tablet_id, |
544 | 0 | tablet_meta); |
545 | 0 | GetTabletRequest req; |
546 | 0 | GetTabletResponse resp; |
547 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
548 | 0 | req.set_tablet_id(tablet_id); |
549 | 0 | Status st = |
550 | 0 | retry_rpc(MetaServiceRPC::GET_TABLET_META, req, &resp, &MetaService_Stub::get_tablet, |
551 | 0 | { |
552 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
553 | 0 | .backpressure_handler = ms_backpressure_handler_, |
554 | 0 | }); |
555 | 0 | if (!st.ok()) { |
556 | 0 | if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) { |
557 | 0 | return Status::NotFound("failed to get tablet meta: {}", resp.status().msg()); |
558 | 0 | } |
559 | 0 | return st; |
560 | 0 | } |
561 | | |
562 | 0 | *tablet_meta = std::make_shared<TabletMeta>(); |
563 | 0 | (*tablet_meta) |
564 | 0 | ->init_from_pb(cloud_tablet_meta_to_doris(std::move(*resp.mutable_tablet_meta()))); |
565 | 0 | VLOG_DEBUG << "get tablet meta, tablet_id: " << (*tablet_meta)->tablet_id(); |
566 | 0 | return Status::OK(); |
567 | 0 | } |
568 | | |
569 | | Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, const SyncOptions& options, |
570 | 7 | SyncRowsetStats* sync_stats) { |
571 | 7 | std::unique_lock lock {tablet->get_sync_meta_lock()}; |
572 | 7 | return sync_tablet_rowsets_unlocked(tablet, lock, options, sync_stats); |
573 | 7 | } |
574 | | |
575 | | Status CloudMetaMgr::_log_mow_delete_bitmap(CloudTablet* tablet, GetRowsetResponse& resp, |
576 | | DeleteBitmap& delete_bitmap, int64_t old_max_version, |
577 | 0 | bool full_sync, int32_t read_version) { |
578 | 0 | if (config::enable_mow_verbose_log && !resp.rowset_meta().empty() && |
579 | 0 | delete_bitmap.cardinality() > 0) { |
580 | 0 | int64_t tablet_id = tablet->tablet_id(); |
581 | 0 | std::vector<std::string> new_rowset_msgs; |
582 | 0 | std::vector<std::string> old_rowset_msgs; |
583 | 0 | std::unordered_set<RowsetId> new_rowset_ids; |
584 | 0 | int64_t new_max_version = resp.rowset_meta().rbegin()->end_version(); |
585 | 0 | for (const auto& rs : resp.rowset_meta()) { |
586 | 0 | RowsetId rowset_id; |
587 | 0 | rowset_id.init(rs.rowset_id_v2()); |
588 | 0 | new_rowset_ids.insert(rowset_id); |
589 | 0 | DeleteBitmap rowset_dbm(tablet_id); |
590 | 0 | delete_bitmap.subset({rowset_id, 0, 0}, |
591 | 0 | {rowset_id, std::numeric_limits<DeleteBitmap::SegmentId>::max(), |
592 | 0 | std::numeric_limits<DeleteBitmap::Version>::max()}, |
593 | 0 | &rowset_dbm); |
594 | 0 | size_t cardinality = rowset_dbm.cardinality(); |
595 | 0 | size_t count = rowset_dbm.get_delete_bitmap_count(); |
596 | 0 | if (cardinality > 0) { |
597 | 0 | new_rowset_msgs.push_back(fmt::format("({}[{}-{}],{},{})", rs.rowset_id_v2(), |
598 | 0 | rs.start_version(), rs.end_version(), count, |
599 | 0 | cardinality)); |
600 | 0 | } |
601 | 0 | } |
602 | |
|
603 | 0 | if (old_max_version > 0) { |
604 | 0 | std::vector<RowsetSharedPtr> old_rowsets; |
605 | 0 | RowsetIdUnorderedSet old_rowset_ids; |
606 | 0 | { |
607 | 0 | std::lock_guard<std::shared_mutex> rlock(tablet->get_header_lock()); |
608 | 0 | RETURN_IF_ERROR(tablet->get_all_rs_id_unlocked(old_max_version, &old_rowset_ids)); |
609 | 0 | old_rowsets = tablet->get_rowset_by_ids(&old_rowset_ids); |
610 | 0 | } |
611 | 0 | for (const auto& rs : old_rowsets) { |
612 | 0 | if (!new_rowset_ids.contains(rs->rowset_id())) { |
613 | 0 | DeleteBitmap rowset_dbm(tablet_id); |
614 | 0 | delete_bitmap.subset( |
615 | 0 | {rs->rowset_id(), 0, 0}, |
616 | 0 | {rs->rowset_id(), std::numeric_limits<DeleteBitmap::SegmentId>::max(), |
617 | 0 | std::numeric_limits<DeleteBitmap::Version>::max()}, |
618 | 0 | &rowset_dbm); |
619 | 0 | size_t cardinality = rowset_dbm.cardinality(); |
620 | 0 | size_t count = rowset_dbm.get_delete_bitmap_count(); |
621 | 0 | if (cardinality > 0) { |
622 | 0 | old_rowset_msgs.push_back( |
623 | 0 | fmt::format("({}{},{},{})", rs->rowset_id().to_string(), |
624 | 0 | rs->version().to_string(), count, cardinality)); |
625 | 0 | } |
626 | 0 | } |
627 | 0 | } |
628 | 0 | } |
629 | | |
630 | 0 | std::string tablet_info = fmt::format( |
631 | 0 | "tablet_id={} table_id={} index_id={} partition_id={}", tablet->tablet_id(), |
632 | 0 | tablet->table_id(), tablet->index_id(), tablet->partition_id()); |
633 | 0 | LOG_INFO("[verbose] sync tablet delete bitmap " + tablet_info) |
634 | 0 | .tag("full_sync", full_sync) |
635 | 0 | .tag("read_version", read_version) |
636 | 0 | .tag("old_max_version", old_max_version) |
637 | 0 | .tag("new_max_version", new_max_version) |
638 | 0 | .tag("cumu_compaction_cnt", resp.stats().cumulative_compaction_cnt()) |
639 | 0 | .tag("base_compaction_cnt", resp.stats().base_compaction_cnt()) |
640 | 0 | .tag("cumu_point", resp.stats().cumulative_point()) |
641 | 0 | .tag("rowset_num", resp.rowset_meta().size()) |
642 | 0 | .tag("delete_bitmap_cardinality", delete_bitmap.cardinality()) |
643 | 0 | .tag("old_rowsets(rowset,count,cardinality)", |
644 | 0 | fmt::format("[{}]", fmt::join(old_rowset_msgs, ", "))) |
645 | 0 | .tag("new_rowsets(rowset,count,cardinality)", |
646 | 0 | fmt::format("[{}]", fmt::join(new_rowset_msgs, ", "))); |
647 | 0 | } |
648 | 0 | return Status::OK(); |
649 | 0 | } |
650 | | |
651 | | Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet, |
652 | | std::unique_lock<bthread::Mutex>& lock, |
653 | | const SyncOptions& options, |
654 | 0 | SyncRowsetStats* sync_stats) { |
655 | 0 | using namespace std::chrono; |
656 | |
|
657 | 0 | TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet); |
658 | 0 | DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.before.inject_error", { |
659 | 0 | auto target_tablet_id = dp->param<int64_t>("tablet_id", -1); |
660 | 0 | auto target_table_id = dp->param<int64_t>("table_id", -1); |
661 | 0 | if (target_tablet_id == tablet->tablet_id() || target_table_id == tablet->table_id()) { |
662 | 0 | return Status::InternalError( |
663 | 0 | "[sync_tablet_rowsets_unlocked] injected error for testing"); |
664 | 0 | } |
665 | 0 | }); |
666 | |
|
667 | 0 | MetaServiceProxy* proxy; |
668 | 0 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); |
669 | 0 | std::string tablet_info = |
670 | 0 | fmt::format("tablet_id={} table_id={} index_id={} partition_id={}", tablet->tablet_id(), |
671 | 0 | tablet->table_id(), tablet->index_id(), tablet->partition_id()); |
672 | 0 | int tried = 0; |
673 | 0 | while (true) { |
674 | 0 | std::shared_ptr<MetaService_Stub> stub; |
675 | 0 | RETURN_IF_ERROR(proxy->get(&stub)); |
676 | 0 | brpc::Controller cntl; |
677 | 0 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); |
678 | 0 | GetRowsetRequest req; |
679 | 0 | GetRowsetResponse resp; |
680 | |
|
681 | 0 | int64_t tablet_id = tablet->tablet_id(); |
682 | 0 | int64_t table_id = tablet->table_id(); |
683 | 0 | int64_t index_id = tablet->index_id(); |
684 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
685 | 0 | auto* idx = req.mutable_idx(); |
686 | 0 | idx->set_tablet_id(tablet_id); |
687 | 0 | idx->set_table_id(table_id); |
688 | 0 | idx->set_index_id(index_id); |
689 | 0 | idx->set_partition_id(tablet->partition_id()); |
690 | 0 | { |
691 | 0 | auto lock_start = std::chrono::steady_clock::now(); |
692 | 0 | std::shared_lock rlock(tablet->get_header_lock()); |
693 | 0 | if (sync_stats) { |
694 | 0 | sync_stats->meta_lock_wait_ns += |
695 | 0 | std::chrono::duration_cast<std::chrono::nanoseconds>( |
696 | 0 | std::chrono::steady_clock::now() - lock_start) |
697 | 0 | .count(); |
698 | 0 | } |
699 | 0 | if (options.full_sync) { |
700 | 0 | req.set_start_version(0); |
701 | 0 | } else { |
702 | 0 | req.set_start_version(tablet->max_version_unlocked() + 1); |
703 | 0 | } |
704 | 0 | req.set_base_compaction_cnt(tablet->base_compaction_cnt()); |
705 | 0 | req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt()); |
706 | 0 | req.set_full_compaction_cnt(tablet->full_compaction_cnt()); |
707 | 0 | req.set_cumulative_point(tablet->cumulative_layer_point()); |
708 | 0 | } |
709 | 0 | req.set_end_version(-1); |
710 | 0 | VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString(); |
711 | | |
712 | | // Host-level rate limiting for get_rowset |
713 | 0 | if (host_level_ms_rpc_rate_limiters_) { |
714 | 0 | host_level_ms_rpc_rate_limiters_->limit(MetaServiceRPC::GET_ROWSET); |
715 | 0 | } |
716 | |
|
717 | 0 | auto start = std::chrono::steady_clock::now(); |
718 | 0 | stub->get_rowset(&cntl, &req, &resp, nullptr); |
719 | 0 | auto end = std::chrono::steady_clock::now(); |
720 | 0 | int64_t latency = cntl.latency_us(); |
721 | 0 | _get_rowset_latency << latency; |
722 | 0 | int retry_times = config::meta_service_rpc_retry_times; |
723 | 0 | if (cntl.Failed()) { |
724 | 0 | proxy->set_unhealthy(); |
725 | 0 | if (tried++ < retry_times) { |
726 | 0 | auto rng = make_random_engine(); |
727 | 0 | std::uniform_int_distribution<uint32_t> u(20, 200); |
728 | 0 | std::uniform_int_distribution<uint32_t> u1(500, 1000); |
729 | 0 | uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng); |
730 | 0 | bthread_usleep(duration_ms * 1000); |
731 | 0 | LOG_INFO("failed to get rowset meta, " + tablet_info) |
732 | 0 | .tag("reason", cntl.ErrorText()) |
733 | 0 | .tag("tried", tried) |
734 | 0 | .tag("sleep", duration_ms); |
735 | 0 | continue; |
736 | 0 | } |
737 | 0 | return Status::RpcError("failed to get rowset meta: {}", cntl.ErrorText()); |
738 | 0 | } |
739 | 0 | if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) { |
740 | 0 | LOG(WARNING) << "failed to get rowset meta, err=" << resp.status().msg() << " " |
741 | 0 | << tablet_info; |
742 | 0 | return Status::NotFound("failed to get rowset meta: {}, {}", resp.status().msg(), |
743 | 0 | tablet_info); |
744 | 0 | } |
745 | 0 | if (resp.status().code() == MetaServiceCode::MS_TOO_BUSY) { |
746 | | // MS_BUSY should also be retried |
747 | 0 | if (ms_backpressure_handler_) { |
748 | 0 | ms_backpressure_handler_->on_ms_busy(); |
749 | 0 | } |
750 | 0 | if (tried++ < retry_times) { |
751 | 0 | auto rng = make_random_engine(); |
752 | 0 | std::uniform_int_distribution<uint32_t> u(20, 200); |
753 | 0 | std::uniform_int_distribution<uint32_t> u1(500, 1000); |
754 | 0 | uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng); |
755 | 0 | bthread_usleep(duration_ms * 1000); |
756 | 0 | LOG_INFO("meta service is too busy when getting rowset meta, " + tablet_info) |
757 | 0 | .tag("reason", resp.status().msg()) |
758 | 0 | .tag("tried", tried) |
759 | 0 | .tag("sleep", duration_ms); |
760 | 0 | continue; |
761 | 0 | } |
762 | 0 | return Status::RpcError("failed to get rowset meta: {}", resp.status().msg()); |
763 | 0 | } |
764 | 0 | if (resp.status().code() != MetaServiceCode::OK) { |
765 | 0 | LOG(WARNING) << " failed to get rowset meta, err=" << resp.status().msg() << " " |
766 | 0 | << tablet_info; |
767 | 0 | return Status::InternalError("failed to get rowset meta: {}, {}", resp.status().msg(), |
768 | 0 | tablet_info); |
769 | 0 | } |
770 | 0 | if (latency > 100 * 1000) { // 100ms |
771 | 0 | LOG(INFO) << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size() |
772 | 0 | << ", latency=" << latency << "us" |
773 | 0 | << " " << tablet_info; |
774 | 0 | } else { |
775 | 0 | LOG_EVERY_N(INFO, 100) |
776 | 0 | << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size() |
777 | 0 | << ", latency=" << latency << "us" |
778 | 0 | << " " << tablet_info; |
779 | 0 | } |
780 | |
|
781 | 0 | int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); |
782 | 0 | tablet->last_sync_time_s = now; |
783 | |
|
784 | 0 | if (sync_stats) { |
785 | 0 | sync_stats->get_remote_rowsets_rpc_ns += |
786 | 0 | std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count(); |
787 | 0 | sync_stats->get_remote_rowsets_num += resp.rowset_meta().size(); |
788 | 0 | } |
789 | | |
790 | | // If is mow, the tablet has no delete bitmap in base rowsets. |
791 | | // So dont need to sync it. |
792 | 0 | if (options.sync_delete_bitmap && tablet->enable_unique_key_merge_on_write() && |
793 | 0 | tablet->tablet_state() == TABLET_RUNNING) { |
794 | 0 | DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.sync_tablet_delete_bitmap.block", |
795 | 0 | DBUG_BLOCK); |
796 | 0 | DeleteBitmap delete_bitmap(tablet_id); |
797 | 0 | int64_t old_max_version = req.start_version() - 1; |
798 | 0 | auto read_version = config::delete_bitmap_store_read_version; |
799 | 0 | auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(), |
800 | 0 | resp.stats(), req.idx(), &delete_bitmap, |
801 | 0 | options.full_sync, sync_stats, read_version, false); |
802 | 0 | if (st.is<ErrorCode::ROWSETS_EXPIRED>() && tried++ < retry_times) { |
803 | 0 | LOG_INFO("rowset meta is expired, need to retry, " + tablet_info) |
804 | 0 | .tag("tried", tried) |
805 | 0 | .error(st); |
806 | 0 | continue; |
807 | 0 | } |
808 | 0 | if (!st.ok()) { |
809 | 0 | LOG_WARNING("failed to get delete bitmap, " + tablet_info).error(st); |
810 | 0 | return st; |
811 | 0 | } |
812 | 0 | tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap); |
813 | 0 | RETURN_IF_ERROR(_log_mow_delete_bitmap(tablet, resp, delete_bitmap, old_max_version, |
814 | 0 | options.full_sync, read_version)); |
815 | 0 | RETURN_IF_ERROR( |
816 | 0 | _check_delete_bitmap_v2_correctness(tablet, req, resp, old_max_version)); |
817 | 0 | } |
818 | 0 | DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.before.modify_tablet_meta", { |
819 | 0 | auto target_tablet_id = dp->param<int64_t>("tablet_id", -1); |
820 | 0 | if (target_tablet_id == tablet->tablet_id()) { |
821 | 0 | DBUG_BLOCK |
822 | 0 | } |
823 | 0 | }); |
824 | 0 | { |
825 | 0 | const auto& stats = resp.stats(); |
826 | 0 | auto lock_start = std::chrono::steady_clock::now(); |
827 | 0 | std::unique_lock wlock(tablet->get_header_lock()); |
828 | 0 | if (sync_stats) { |
829 | 0 | sync_stats->meta_lock_wait_ns += |
830 | 0 | std::chrono::duration_cast<std::chrono::nanoseconds>( |
831 | 0 | std::chrono::steady_clock::now() - lock_start) |
832 | 0 | .count(); |
833 | 0 | } |
834 | | |
835 | | // ATTN: we are facing following data race |
836 | | // |
837 | | // resp_base_compaction_cnt=0|base_compaction_cnt=0|resp_cumulative_compaction_cnt=0|cumulative_compaction_cnt=1|resp_max_version=11|max_version=8 |
838 | | // |
839 | | // BE-compaction-thread meta-service BE-query-thread |
840 | | // | | | |
841 | | // local | commit cumu-compaction | | |
842 | | // cc_cnt=0 | ---------------------------> | sync rowset (long rpc, local cc_cnt=0 ) | local |
843 | | // | | <----------------------------------------- | cc_cnt=0 |
844 | | // | | -. | |
845 | | // local | done cc_cnt=1 | \ | |
846 | | // cc_cnt=1 | <--------------------------- | \ | |
847 | | // | | \ returned with resp cc_cnt=0 (snapshot) | |
848 | | // | | '------------------------------------> | local |
849 | | // | | | cc_cnt=1 |
850 | | // | | | |
851 | | // | | | CHECK FAIL |
852 | | // | | | need retry |
853 | | // To get rid of just retry syncing tablet |
854 | 0 | if (stats.base_compaction_cnt() < tablet->base_compaction_cnt() || |
855 | 0 | stats.cumulative_compaction_cnt() < tablet->cumulative_compaction_cnt()) |
856 | 0 | [[unlikely]] { |
857 | | // stale request, ignore |
858 | 0 | LOG_WARNING("stale get rowset meta request " + tablet_info) |
859 | 0 | .tag("resp_base_compaction_cnt", stats.base_compaction_cnt()) |
860 | 0 | .tag("base_compaction_cnt", tablet->base_compaction_cnt()) |
861 | 0 | .tag("resp_cumulative_compaction_cnt", stats.cumulative_compaction_cnt()) |
862 | 0 | .tag("cumulative_compaction_cnt", tablet->cumulative_compaction_cnt()) |
863 | 0 | .tag("tried", tried); |
864 | 0 | if (tried++ < 10) continue; |
865 | 0 | return Status::OK(); |
866 | 0 | } |
867 | 0 | std::vector<RowsetSharedPtr> rowsets; |
868 | 0 | rowsets.reserve(resp.rowset_meta().size()); |
869 | 0 | for (const auto& cloud_rs_meta_pb : resp.rowset_meta()) { |
870 | 0 | VLOG_DEBUG << "get rowset meta, tablet_id=" << cloud_rs_meta_pb.tablet_id() |
871 | 0 | << ", version=[" << cloud_rs_meta_pb.start_version() << '-' |
872 | 0 | << cloud_rs_meta_pb.end_version() << ']'; |
873 | 0 | auto existed_rowset = tablet->get_rowset_by_version( |
874 | 0 | {cloud_rs_meta_pb.start_version(), cloud_rs_meta_pb.end_version()}); |
875 | 0 | if (existed_rowset && |
876 | 0 | existed_rowset->rowset_id().to_string() == cloud_rs_meta_pb.rowset_id_v2()) { |
877 | 0 | continue; // Same rowset, skip it |
878 | 0 | } |
879 | 0 | RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris(cloud_rs_meta_pb); |
880 | 0 | auto rs_meta = std::make_shared<RowsetMeta>(); |
881 | 0 | rs_meta->init_from_pb(meta_pb); |
882 | 0 | RowsetSharedPtr rowset; |
883 | | // schema is nullptr implies using RowsetMeta.tablet_schema |
884 | 0 | Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset); |
885 | 0 | if (!s.ok()) { |
886 | 0 | LOG_WARNING("create rowset").tag("status", s); |
887 | 0 | return s; |
888 | 0 | } |
889 | 0 | rowsets.push_back(std::move(rowset)); |
890 | 0 | } |
891 | 0 | if (!rowsets.empty()) { |
892 | | // `rowsets.empty()` could happen after doing EMPTY_CUMULATIVE compaction. e.g.: |
893 | | // BE has [0-1][2-11][12-12], [12-12] is delete predicate, cp is 2; |
894 | | // after doing EMPTY_CUMULATIVE compaction, MS cp is 13, get_rowset will return [2-11][12-12]. |
895 | 0 | bool version_overlap = |
896 | 0 | tablet->max_version_unlocked() >= rowsets.front()->start_version(); |
897 | 0 | tablet->add_rowsets(std::move(rowsets), version_overlap, wlock, |
898 | 0 | options.warmup_delta_data || |
899 | 0 | config::enable_warmup_immediately_on_new_rowset); |
900 | 0 | } |
901 | | |
902 | | // Fill version holes |
903 | 0 | int64_t partition_max_version = |
904 | 0 | resp.has_partition_max_version() ? resp.partition_max_version() : -1; |
905 | 0 | RETURN_IF_ERROR(fill_version_holes(tablet, partition_max_version, wlock)); |
906 | | |
907 | 0 | tablet->last_base_compaction_success_time_ms = stats.last_base_compaction_time_ms(); |
908 | 0 | tablet->last_cumu_compaction_success_time_ms = stats.last_cumu_compaction_time_ms(); |
909 | 0 | tablet->set_base_compaction_cnt(stats.base_compaction_cnt()); |
910 | 0 | tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt()); |
911 | 0 | tablet->set_full_compaction_cnt(stats.full_compaction_cnt()); |
912 | 0 | tablet->set_cumulative_layer_point(stats.cumulative_point()); |
913 | 0 | tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), |
914 | 0 | stats.num_rows(), stats.data_size()); |
915 | | |
916 | | // Sync last active cluster info for compaction read-write separation |
917 | 0 | if (config::enable_compaction_rw_separation && stats.has_last_active_cluster_id()) { |
918 | 0 | tablet->set_last_active_cluster_info(stats.last_active_cluster_id(), |
919 | 0 | stats.last_active_time_ms()); |
920 | 0 | } |
921 | 0 | } |
922 | 0 | return Status::OK(); |
923 | 0 | } |
924 | 0 | } |
925 | | |
926 | | bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, |
927 | | std::ranges::range auto&& rs_metas, |
928 | 0 | DeleteBitmap* delete_bitmap) { |
929 | 0 | std::set<int64_t> txn_processed; |
930 | 0 | for (auto& rs_meta : rs_metas) { |
931 | 0 | auto txn_id = rs_meta.txn_id(); |
932 | 0 | if (txn_processed.find(txn_id) != txn_processed.end()) { |
933 | 0 | continue; |
934 | 0 | } |
935 | 0 | txn_processed.insert(txn_id); |
936 | 0 | DeleteBitmapPtr tmp_delete_bitmap; |
937 | 0 | std::shared_ptr<PublishStatus> publish_status = |
938 | 0 | std::make_shared<PublishStatus>(PublishStatus::INIT); |
939 | 0 | CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud(); |
940 | 0 | Status status = engine.txn_delete_bitmap_cache().get_delete_bitmap( |
941 | 0 | txn_id, tablet->tablet_id(), &tmp_delete_bitmap, nullptr, &publish_status); |
942 | | // CloudMetaMgr::sync_tablet_delete_bitmap_by_cache() is called after we sync rowsets from meta services. |
943 | | // If the control flows reaches here, it's gauranteed that the rowsets is commited in meta services, so we can |
944 | | // use the delete bitmap from cache directly if *publish_status == PublishStatus::SUCCEED without checking other |
945 | | // stats(version or compaction stats) |
946 | 0 | if (status.ok() && *publish_status == PublishStatus::SUCCEED) { |
947 | | // tmp_delete_bitmap contains sentinel marks, we should remove it before merge it to delete bitmap. |
948 | | // Also, the version of delete bitmap key in tmp_delete_bitmap is DeleteBitmap::TEMP_VERSION_COMMON, |
949 | | // we should replace it with the rowset's real version |
950 | 0 | DCHECK(rs_meta.start_version() == rs_meta.end_version()); |
951 | 0 | int64_t rowset_version = rs_meta.start_version(); |
952 | 0 | for (const auto& [delete_bitmap_key, bitmap_value] : tmp_delete_bitmap->delete_bitmap) { |
953 | | // skip sentinel mark, which is used for delete bitmap correctness check |
954 | 0 | if (std::get<1>(delete_bitmap_key) != DeleteBitmap::INVALID_SEGMENT_ID) { |
955 | 0 | delete_bitmap->merge({std::get<0>(delete_bitmap_key), |
956 | 0 | std::get<1>(delete_bitmap_key), rowset_version}, |
957 | 0 | bitmap_value); |
958 | 0 | } |
959 | 0 | } |
960 | 0 | engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id, |
961 | 0 | tablet->tablet_id()); |
962 | 0 | } else { |
963 | 0 | LOG_EVERY_N(INFO, 20) |
964 | 0 | << "delete bitmap not found in cache, will sync rowset to get. tablet_id= " |
965 | 0 | << tablet->tablet_id() << ", txn_id=" << txn_id << ", status=" << status; |
966 | 0 | return false; |
967 | 0 | } |
968 | 0 | } |
969 | 0 | return true; |
970 | 0 | } |
971 | | |
972 | | Status CloudMetaMgr::_get_delete_bitmap_from_ms(GetDeleteBitmapRequest& req, |
973 | 11 | GetDeleteBitmapResponse& res) { |
974 | 11 | VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString(); |
975 | 11 | TEST_SYNC_POINT_CALLBACK("CloudMetaMgr::_get_delete_bitmap_from_ms", &req, &res); |
976 | | |
977 | 11 | auto st = retry_rpc(MetaServiceRPC::GET_DELETE_BITMAP, req, &res, |
978 | 11 | &MetaService_Stub::get_delete_bitmap, |
979 | 11 | { |
980 | 11 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
981 | 11 | .backpressure_handler = ms_backpressure_handler_, |
982 | 11 | }); |
983 | 11 | if (st.code() == ErrorCode::THRIFT_RPC_ERROR) { |
984 | 0 | return st; |
985 | 0 | } |
986 | | |
987 | 11 | if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) { |
988 | 1 | return Status::NotFound("failed to get delete bitmap: {}", res.status().msg()); |
989 | 1 | } |
990 | | // The delete bitmap of stale rowsets will be removed when commit compaction job, |
991 | | // then delete bitmap of stale rowsets cannot be obtained. But the rowsets obtained |
992 | | // by sync_tablet_rowsets may include these stale rowsets. When this case happend, the |
993 | | // error code of ROWSETS_EXPIRED will be returned, we need to retry sync rowsets again. |
994 | | // |
995 | | // Be query thread meta-service Be compaction thread |
996 | | // | | | |
997 | | // | get rowset | | |
998 | | // |--------------------------->| | |
999 | | // | return get rowset | | |
1000 | | // |<---------------------------| | |
1001 | | // | | commit job | |
1002 | | // | |<------------------------| |
1003 | | // | | return commit job | |
1004 | | // | |------------------------>| |
1005 | | // | get delete bitmap | | |
1006 | | // |--------------------------->| | |
1007 | | // | return get delete bitmap | | |
1008 | | // |<---------------------------| | |
1009 | | // | | | |
1010 | 10 | if (res.status().code() == MetaServiceCode::ROWSETS_EXPIRED) { |
1011 | 0 | return Status::Error<ErrorCode::ROWSETS_EXPIRED, false>("failed to get delete bitmap: {}", |
1012 | 0 | res.status().msg()); |
1013 | 0 | } |
1014 | 10 | if (res.status().code() != MetaServiceCode::OK) { |
1015 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to get delete bitmap: {}", |
1016 | 0 | res.status().msg()); |
1017 | 0 | } |
1018 | 10 | return Status::OK(); |
1019 | 10 | } |
1020 | | |
1021 | | Status CloudMetaMgr::_get_delete_bitmap_from_ms_by_batch(GetDeleteBitmapRequest& req, |
1022 | | GetDeleteBitmapResponse& res, |
1023 | 6 | int64_t bytes_threadhold) { |
1024 | 6 | std::unordered_set<std::string> finished_rowset_ids {}; |
1025 | 6 | int count = 0; |
1026 | 11 | do { |
1027 | 11 | GetDeleteBitmapRequest cur_req; |
1028 | 11 | GetDeleteBitmapResponse cur_res; |
1029 | | |
1030 | 11 | cur_req.set_cloud_unique_id(config::cloud_unique_id); |
1031 | 11 | cur_req.set_tablet_id(req.tablet_id()); |
1032 | 11 | cur_req.set_base_compaction_cnt(req.base_compaction_cnt()); |
1033 | 11 | cur_req.set_cumulative_compaction_cnt(req.cumulative_compaction_cnt()); |
1034 | 11 | cur_req.set_cumulative_point(req.cumulative_point()); |
1035 | 11 | *(cur_req.mutable_idx()) = req.idx(); |
1036 | 11 | cur_req.set_store_version(req.store_version()); |
1037 | 11 | if (bytes_threadhold > 0) { |
1038 | 11 | cur_req.set_dbm_bytes_threshold(bytes_threadhold); |
1039 | 11 | } |
1040 | 45 | for (int i = 0; i < req.rowset_ids_size(); i++) { |
1041 | 34 | if (!finished_rowset_ids.contains(req.rowset_ids(i))) { |
1042 | 25 | cur_req.add_rowset_ids(req.rowset_ids(i)); |
1043 | 25 | cur_req.add_begin_versions(req.begin_versions(i)); |
1044 | 25 | cur_req.add_end_versions(req.end_versions(i)); |
1045 | 25 | } |
1046 | 34 | } |
1047 | | |
1048 | 11 | RETURN_IF_ERROR(_get_delete_bitmap_from_ms(cur_req, cur_res)); |
1049 | 10 | ++count; |
1050 | | |
1051 | | // v1 delete bitmap |
1052 | 10 | res.mutable_rowset_ids()->MergeFrom(cur_res.rowset_ids()); |
1053 | 10 | res.mutable_segment_ids()->MergeFrom(cur_res.segment_ids()); |
1054 | 10 | res.mutable_versions()->MergeFrom(cur_res.versions()); |
1055 | 10 | res.mutable_segment_delete_bitmaps()->MergeFrom(cur_res.segment_delete_bitmaps()); |
1056 | | |
1057 | | // v2 delete bitmap |
1058 | 10 | res.mutable_delta_rowset_ids()->MergeFrom(cur_res.delta_rowset_ids()); |
1059 | 10 | res.mutable_delete_bitmap_storages()->MergeFrom(cur_res.delete_bitmap_storages()); |
1060 | | |
1061 | 15 | for (const auto& rowset_id : cur_res.returned_rowset_ids()) { |
1062 | 15 | finished_rowset_ids.insert(rowset_id); |
1063 | 15 | } |
1064 | | |
1065 | 10 | bool has_more = cur_res.has_has_more() && cur_res.has_more(); |
1066 | 10 | if (!has_more) { |
1067 | 5 | break; |
1068 | 5 | } |
1069 | 5 | LOG_INFO("batch get delete bitmap, progress={}/{}", finished_rowset_ids.size(), |
1070 | 5 | req.rowset_ids_size()) |
1071 | 5 | .tag("tablet_id", req.tablet_id()) |
1072 | 5 | .tag("cur_returned_rowsets", cur_res.returned_rowset_ids_size()) |
1073 | 5 | .tag("rpc_count", count); |
1074 | 5 | } while (finished_rowset_ids.size() < req.rowset_ids_size()); |
1075 | 5 | return Status::OK(); |
1076 | 6 | } |
1077 | | |
1078 | | Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version, |
1079 | | std::ranges::range auto&& rs_metas, |
1080 | | const TabletStatsPB& stats, const TabletIndexPB& idx, |
1081 | | DeleteBitmap* delete_bitmap, bool full_sync, |
1082 | | SyncRowsetStats* sync_stats, int32_t read_version, |
1083 | 0 | bool full_sync_v2) { |
1084 | 0 | if (rs_metas.empty()) { |
1085 | 0 | return Status::OK(); |
1086 | 0 | } |
1087 | | |
1088 | 0 | if (!full_sync && config::enable_sync_tablet_delete_bitmap_by_cache && |
1089 | 0 | sync_tablet_delete_bitmap_by_cache(tablet, rs_metas, delete_bitmap)) { |
1090 | 0 | if (sync_stats) { |
1091 | 0 | sync_stats->get_local_delete_bitmap_rowsets_num += rs_metas.size(); |
1092 | 0 | } |
1093 | 0 | return Status::OK(); |
1094 | 0 | } else { |
1095 | 0 | DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id()); |
1096 | 0 | *delete_bitmap = *new_delete_bitmap; |
1097 | 0 | } |
1098 | | |
1099 | 0 | if (read_version == 2 && config::delete_bitmap_store_write_version == 1) { |
1100 | 0 | return Status::InternalError( |
1101 | 0 | "please set delete_bitmap_store_read_version to 1 or 3 because " |
1102 | 0 | "delete_bitmap_store_write_version is 1"); |
1103 | 0 | } else if (read_version == 1 && config::delete_bitmap_store_write_version == 2) { |
1104 | 0 | return Status::InternalError( |
1105 | 0 | "please set delete_bitmap_store_read_version to 2 or 3 because " |
1106 | 0 | "delete_bitmap_store_write_version is 2"); |
1107 | 0 | } |
1108 | | |
1109 | 0 | int64_t new_max_version = std::max(old_max_version, rs_metas.rbegin()->end_version()); |
1110 | | // When there are many delete bitmaps that need to be synchronized, it |
1111 | | // may take a longer time, especially when loading the tablet for the |
1112 | | // first time, so set a relatively long timeout time. |
1113 | 0 | GetDeleteBitmapRequest req; |
1114 | 0 | GetDeleteBitmapResponse res; |
1115 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1116 | 0 | req.set_tablet_id(tablet->tablet_id()); |
1117 | 0 | req.set_base_compaction_cnt(stats.base_compaction_cnt()); |
1118 | 0 | req.set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt()); |
1119 | 0 | req.set_cumulative_point(stats.cumulative_point()); |
1120 | 0 | *(req.mutable_idx()) = idx; |
1121 | 0 | req.set_store_version(read_version); |
1122 | | // New rowset sync all versions of delete bitmap |
1123 | 0 | for (const auto& rs_meta : rs_metas) { |
1124 | 0 | req.add_rowset_ids(rs_meta.rowset_id_v2()); |
1125 | 0 | req.add_begin_versions(0); |
1126 | 0 | req.add_end_versions(new_max_version); |
1127 | 0 | } |
1128 | |
|
1129 | 0 | if (!full_sync_v2) { |
1130 | | // old rowset sync incremental versions of delete bitmap |
1131 | 0 | if (old_max_version > 0 && old_max_version < new_max_version) { |
1132 | 0 | RowsetIdUnorderedSet all_rs_ids; |
1133 | 0 | RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids)); |
1134 | 0 | for (const auto& rs_id : all_rs_ids) { |
1135 | 0 | req.add_rowset_ids(rs_id.to_string()); |
1136 | 0 | req.add_begin_versions(old_max_version + 1); |
1137 | 0 | req.add_end_versions(new_max_version); |
1138 | 0 | } |
1139 | 0 | } |
1140 | 0 | } else { |
1141 | 0 | if (old_max_version > 0) { |
1142 | 0 | RowsetIdUnorderedSet all_rs_ids; |
1143 | 0 | RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids)); |
1144 | 0 | for (const auto& rs_id : all_rs_ids) { |
1145 | 0 | req.add_rowset_ids(rs_id.to_string()); |
1146 | 0 | req.add_begin_versions(0); |
1147 | 0 | req.add_end_versions(new_max_version); |
1148 | 0 | } |
1149 | 0 | } |
1150 | 0 | } |
1151 | 0 | if (sync_stats) { |
1152 | 0 | sync_stats->get_remote_delete_bitmap_rowsets_num += req.rowset_ids_size(); |
1153 | 0 | } |
1154 | |
|
1155 | 0 | auto start = std::chrono::steady_clock::now(); |
1156 | 0 | if (config::enable_batch_get_delete_bitmap) { |
1157 | 0 | RETURN_IF_ERROR(_get_delete_bitmap_from_ms_by_batch( |
1158 | 0 | req, res, config::get_delete_bitmap_bytes_threshold)); |
1159 | 0 | } else { |
1160 | 0 | RETURN_IF_ERROR(_get_delete_bitmap_from_ms(req, res)); |
1161 | 0 | } |
1162 | 0 | auto end = std::chrono::steady_clock::now(); |
1163 | | |
1164 | | // v1 delete bitmap |
1165 | 0 | const auto& rowset_ids = res.rowset_ids(); |
1166 | 0 | const auto& segment_ids = res.segment_ids(); |
1167 | 0 | const auto& vers = res.versions(); |
1168 | 0 | const auto& delete_bitmaps = res.segment_delete_bitmaps(); |
1169 | 0 | if (rowset_ids.size() != segment_ids.size() || rowset_ids.size() != vers.size() || |
1170 | 0 | rowset_ids.size() != delete_bitmaps.size()) { |
1171 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
1172 | 0 | "get delete bitmap data wrong," |
1173 | 0 | "rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}", |
1174 | 0 | rowset_ids.size(), segment_ids.size(), vers.size(), delete_bitmaps.size()); |
1175 | 0 | } |
1176 | 0 | for (int i = 0; i < rowset_ids.size(); i++) { |
1177 | 0 | RowsetId rst_id; |
1178 | 0 | rst_id.init(rowset_ids[i]); |
1179 | 0 | delete_bitmap->merge( |
1180 | 0 | {rst_id, segment_ids[i], vers[i]}, |
1181 | 0 | roaring::Roaring::readSafe(delete_bitmaps[i].data(), delete_bitmaps[i].length())); |
1182 | 0 | } |
1183 | | // v2 delete bitmap |
1184 | 0 | const auto& delta_rowset_ids = res.delta_rowset_ids(); |
1185 | 0 | const auto& delete_bitmap_storages = res.delete_bitmap_storages(); |
1186 | 0 | if (delta_rowset_ids.size() != delete_bitmap_storages.size()) { |
1187 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
1188 | 0 | "get delete bitmap data wrong, delta_rowset_ids.size={}, " |
1189 | 0 | "delete_bitmap_storages.size={}", |
1190 | 0 | delta_rowset_ids.size(), delete_bitmap_storages.size()); |
1191 | 0 | } |
1192 | 0 | int64_t remote_delete_bitmap_bytes = 0; |
1193 | 0 | RETURN_IF_ERROR(_read_tablet_delete_bitmap_v2(tablet, old_max_version, rs_metas, delete_bitmap, |
1194 | 0 | res, remote_delete_bitmap_bytes, full_sync_v2)); |
1195 | | |
1196 | 0 | if (sync_stats) { |
1197 | 0 | sync_stats->get_remote_delete_bitmap_rpc_ns += |
1198 | 0 | std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count(); |
1199 | 0 | sync_stats->get_remote_delete_bitmap_key_count += |
1200 | 0 | delete_bitmaps.size() + delete_bitmap_storages.size(); |
1201 | 0 | for (const auto& dbm : delete_bitmaps) { |
1202 | 0 | sync_stats->get_remote_delete_bitmap_bytes += dbm.length(); |
1203 | 0 | } |
1204 | 0 | sync_stats->get_remote_delete_bitmap_bytes += remote_delete_bitmap_bytes; |
1205 | 0 | } |
1206 | 0 | int64_t latency = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count(); |
1207 | 0 | if (latency > 100 * 1000) { // 100ms |
1208 | 0 | LOG(INFO) << "finish get_delete_bitmap rpcs. rowset_ids.size()=" << rowset_ids.size() |
1209 | 0 | << ", delete_bitmaps.size()=" << delete_bitmaps.size() |
1210 | 0 | << ", delta_delete_bitmaps.size()=" << delta_rowset_ids.size() |
1211 | 0 | << ", latency=" << latency << "us, read_version=" << read_version; |
1212 | 0 | } else { |
1213 | 0 | LOG_EVERY_N(INFO, 100) << "finish get_delete_bitmap rpcs. rowset_ids.size()=" |
1214 | 0 | << rowset_ids.size() |
1215 | 0 | << ", delete_bitmaps.size()=" << delete_bitmaps.size() |
1216 | 0 | << ", delta_delete_bitmaps.size()=" << delta_rowset_ids.size() |
1217 | 0 | << ", latency=" << latency << "us, read_version=" << read_version; |
1218 | 0 | } |
1219 | 0 | return Status::OK(); |
1220 | 0 | } |
1221 | | |
1222 | | Status CloudMetaMgr::_check_delete_bitmap_v2_correctness(CloudTablet* tablet, GetRowsetRequest& req, |
1223 | | GetRowsetResponse& resp, |
1224 | 0 | int64_t old_max_version) { |
1225 | 0 | if (!config::enable_delete_bitmap_store_v2_check_correctness || |
1226 | 0 | config::delete_bitmap_store_write_version == 1 || resp.rowset_meta().empty()) { |
1227 | 0 | return Status::OK(); |
1228 | 0 | } |
1229 | 0 | int64_t tablet_id = tablet->tablet_id(); |
1230 | 0 | int64_t new_max_version = std::max(old_max_version, resp.rowset_meta().rbegin()->end_version()); |
1231 | | // rowset_id, num_segments |
1232 | 0 | std::vector<std::pair<RowsetId, int64_t>> all_rowsets; |
1233 | 0 | std::map<std::string, std::string> rowset_to_resource; |
1234 | 0 | for (const auto& rs_meta : resp.rowset_meta()) { |
1235 | 0 | RowsetId rowset_id; |
1236 | 0 | rowset_id.init(rs_meta.rowset_id_v2()); |
1237 | 0 | all_rowsets.emplace_back(std::make_pair(rowset_id, rs_meta.num_segments())); |
1238 | 0 | rowset_to_resource[rs_meta.rowset_id_v2()] = rs_meta.resource_id(); |
1239 | 0 | } |
1240 | 0 | if (old_max_version > 0) { |
1241 | 0 | RowsetIdUnorderedSet all_rs_ids; |
1242 | 0 | RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids)); |
1243 | 0 | for (auto& rowset : tablet->get_rowset_by_ids(&all_rs_ids)) { |
1244 | 0 | all_rowsets.emplace_back(std::make_pair(rowset->rowset_id(), rowset->num_segments())); |
1245 | 0 | rowset_to_resource[rowset->rowset_id().to_string()] = |
1246 | 0 | rowset->rowset_meta()->resource_id(); |
1247 | 0 | } |
1248 | 0 | } |
1249 | | |
1250 | 0 | auto compare_delete_bitmap = [&](DeleteBitmap* delete_bitmap, int version) { |
1251 | 0 | bool success = true; |
1252 | 0 | for (auto& [rs_id, num_segments] : all_rowsets) { |
1253 | 0 | for (int seg_id = 0; seg_id < num_segments; ++seg_id) { |
1254 | 0 | DeleteBitmap::BitmapKey key = {rs_id, seg_id, new_max_version}; |
1255 | 0 | auto dm1 = tablet->tablet_meta()->delete_bitmap().get_agg(key); |
1256 | 0 | auto dm2 = delete_bitmap->get_agg_without_cache(key); |
1257 | 0 | if (*dm1 != *dm2) { |
1258 | 0 | success = false; |
1259 | 0 | LOG(WARNING) << "failed to check delete bitmap correctness by v" |
1260 | 0 | << std::to_string(version) << ", tablet_id=" << tablet->tablet_id() |
1261 | 0 | << ", rowset_id=" << rs_id.to_string() << ", segment_id=" << seg_id |
1262 | 0 | << ", max_version=" << new_max_version |
1263 | 0 | << ". size1=" << dm1->cardinality() |
1264 | 0 | << ", size2=" << dm2->cardinality(); |
1265 | 0 | } |
1266 | 0 | } |
1267 | 0 | } |
1268 | 0 | if (success) { |
1269 | 0 | LOG(INFO) << "succeed to check delete bitmap correctness by v" |
1270 | 0 | << std::to_string(version) << ", tablet_id=" << tablet->tablet_id() |
1271 | 0 | << ", max_version=" << new_max_version; |
1272 | 0 | } |
1273 | 0 | }; |
1274 | |
|
1275 | 0 | DeleteBitmap full_delete_bitmap(tablet_id); |
1276 | 0 | auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(), resp.stats(), |
1277 | 0 | req.idx(), &full_delete_bitmap, false, nullptr, 2, true); |
1278 | 0 | if (!st.ok()) { |
1279 | 0 | LOG_WARNING("failed to check delete bitmap correctness by v2") |
1280 | 0 | .tag("tablet", tablet->tablet_id()) |
1281 | 0 | .error(st); |
1282 | 0 | } else { |
1283 | 0 | compare_delete_bitmap(&full_delete_bitmap, 2); |
1284 | 0 | } |
1285 | 0 | return Status::OK(); |
1286 | 0 | } |
1287 | | |
1288 | | Status CloudMetaMgr::_read_tablet_delete_bitmap_v2(CloudTablet* tablet, int64_t old_max_version, |
1289 | | std::ranges::range auto&& rs_metas, |
1290 | | DeleteBitmap* delete_bitmap, |
1291 | | GetDeleteBitmapResponse& res, |
1292 | | int64_t& remote_delete_bitmap_bytes, |
1293 | 0 | bool full_sync_v2) { |
1294 | 0 | if (res.delta_rowset_ids().empty()) { |
1295 | 0 | return Status::OK(); |
1296 | 0 | } |
1297 | 0 | const auto& rowset_ids = res.delta_rowset_ids(); |
1298 | 0 | const auto& delete_bitmap_storages = res.delete_bitmap_storages(); |
1299 | 0 | RowsetIdUnorderedSet all_rs_ids; |
1300 | 0 | std::map<std::string, std::string> rowset_to_resource; |
1301 | 0 | if (old_max_version > 0) { |
1302 | 0 | RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids)); |
1303 | 0 | if (full_sync_v2) { |
1304 | 0 | for (auto& rowset : tablet->get_rowset_by_ids(&all_rs_ids)) { |
1305 | 0 | rowset_to_resource[rowset->rowset_id().to_string()] = |
1306 | 0 | rowset->rowset_meta()->resource_id(); |
1307 | 0 | } |
1308 | 0 | } |
1309 | 0 | } |
1310 | 0 | for (const auto& rs_meta : rs_metas) { |
1311 | 0 | RowsetId rs_id; |
1312 | 0 | rs_id.init(rs_meta.rowset_id_v2()); |
1313 | 0 | all_rs_ids.emplace(rs_id); |
1314 | 0 | rowset_to_resource[rs_meta.rowset_id_v2()] = rs_meta.resource_id(); |
1315 | 0 | } |
1316 | 0 | if (config::enable_mow_verbose_log) { |
1317 | 0 | LOG(INFO) << "read delete bitmap for tablet_id=" << tablet->tablet_id() |
1318 | 0 | << ", old_max_version=" << old_max_version |
1319 | 0 | << ", new rowset num=" << rs_metas.size() |
1320 | 0 | << ", rowset has delete bitmap num=" << rowset_ids.size() |
1321 | 0 | << ". all rowset num=" << all_rs_ids.size(); |
1322 | 0 | } |
1323 | |
|
1324 | 0 | std::mutex result_mtx; |
1325 | 0 | Status result; |
1326 | 0 | auto merge_delete_bitmap = [&](const std::string& rowset_id, DeleteBitmapPB& dbm) { |
1327 | 0 | if (dbm.rowset_ids_size() != dbm.segment_ids_size() || |
1328 | 0 | dbm.rowset_ids_size() != dbm.versions_size() || |
1329 | 0 | dbm.rowset_ids_size() != dbm.segment_delete_bitmaps_size()) { |
1330 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
1331 | 0 | "get delete bitmap data wrong, rowset_id={}" |
1332 | 0 | "rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}", |
1333 | 0 | rowset_id, dbm.rowset_ids_size(), dbm.segment_ids_size(), dbm.versions_size(), |
1334 | 0 | dbm.segment_delete_bitmaps_size()); |
1335 | 0 | } |
1336 | 0 | if (config::enable_mow_verbose_log) { |
1337 | 0 | LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id() |
1338 | 0 | << ", rowset_id=" << rowset_id |
1339 | 0 | << ", delete_bitmap num=" << dbm.segment_delete_bitmaps_size(); |
1340 | 0 | } |
1341 | 0 | std::lock_guard lock(result_mtx); |
1342 | 0 | for (int j = 0; j < dbm.rowset_ids_size(); j++) { |
1343 | 0 | RowsetId rst_id; |
1344 | 0 | rst_id.init(dbm.rowset_ids(j)); |
1345 | 0 | if (!all_rs_ids.contains(rst_id)) { |
1346 | 0 | LOG(INFO) << "skip merge delete bitmap for tablet_id=" << tablet->tablet_id() |
1347 | 0 | << ", rowset_id=" << rowset_id << ", unused rowset_id=" << rst_id; |
1348 | 0 | continue; |
1349 | 0 | } |
1350 | 0 | delete_bitmap->merge( |
1351 | 0 | {rst_id, dbm.segment_ids(j), dbm.versions(j)}, |
1352 | 0 | roaring::Roaring::readSafe(dbm.segment_delete_bitmaps(j).data(), |
1353 | 0 | dbm.segment_delete_bitmaps(j).length())); |
1354 | 0 | remote_delete_bitmap_bytes += dbm.segment_delete_bitmaps(j).length(); |
1355 | 0 | } |
1356 | 0 | return Status::OK(); |
1357 | 0 | }; |
1358 | 0 | auto get_delete_bitmap_from_file = [&](const std::string& rowset_id, |
1359 | 0 | const DeleteBitmapStoragePB& storage) { |
1360 | 0 | if (config::enable_mow_verbose_log) { |
1361 | 0 | LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id() |
1362 | 0 | << ", rowset_id=" << rowset_id << " from file" |
1363 | 0 | << ", is_packed=" << storage.has_packed_slice_location(); |
1364 | 0 | } |
1365 | 0 | if (rowset_to_resource.find(rowset_id) == rowset_to_resource.end()) { |
1366 | 0 | return Status::InternalError("vault id not found for tablet_id={}, rowset_id={}", |
1367 | 0 | tablet->tablet_id(), rowset_id); |
1368 | 0 | } |
1369 | 0 | auto resource_id = rowset_to_resource[rowset_id]; |
1370 | 0 | CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud(); |
1371 | 0 | auto storage_resource = engine.get_storage_resource(resource_id); |
1372 | 0 | if (!storage_resource) { |
1373 | 0 | return Status::InternalError("vault id not found, maybe not sync, vault id {}", |
1374 | 0 | resource_id); |
1375 | 0 | } |
1376 | | |
1377 | | // Use packed file reader if packed_slice_location is present |
1378 | 0 | std::unique_ptr<DeleteBitmapFileReader> reader; |
1379 | 0 | if (storage.has_packed_slice_location() && |
1380 | 0 | !storage.packed_slice_location().packed_file_path().empty()) { |
1381 | 0 | reader = std::make_unique<DeleteBitmapFileReader>(tablet->tablet_id(), rowset_id, |
1382 | 0 | storage_resource, |
1383 | 0 | storage.packed_slice_location()); |
1384 | 0 | } else { |
1385 | 0 | reader = std::make_unique<DeleteBitmapFileReader>(tablet->tablet_id(), rowset_id, |
1386 | 0 | storage_resource); |
1387 | 0 | } |
1388 | |
|
1389 | 0 | RETURN_IF_ERROR(reader->init()); |
1390 | 0 | DeleteBitmapPB dbm; |
1391 | 0 | RETURN_IF_ERROR(reader->read(dbm)); |
1392 | 0 | RETURN_IF_ERROR(reader->close()); |
1393 | 0 | return merge_delete_bitmap(rowset_id, dbm); |
1394 | 0 | }; |
1395 | 0 | CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud(); |
1396 | 0 | std::unique_ptr<ThreadPoolToken> token = engine.sync_delete_bitmap_thread_pool().new_token( |
1397 | 0 | ThreadPool::ExecutionMode::CONCURRENT); |
1398 | 0 | bthread::CountdownEvent wait {rowset_ids.size()}; |
1399 | 0 | for (int i = 0; i < rowset_ids.size(); i++) { |
1400 | 0 | auto& rowset_id = rowset_ids[i]; |
1401 | 0 | if (delete_bitmap_storages[i].store_in_fdb()) { |
1402 | 0 | wait.signal(); |
1403 | 0 | DeleteBitmapPB dbm = delete_bitmap_storages[i].delete_bitmap(); |
1404 | 0 | RETURN_IF_ERROR(merge_delete_bitmap(rowset_id, dbm)); |
1405 | 0 | } else { |
1406 | 0 | const auto& storage = delete_bitmap_storages[i]; |
1407 | 0 | auto submit_st = token->submit_func([&, rowset_id, storage]() { |
1408 | 0 | auto status = get_delete_bitmap_from_file(rowset_id, storage); |
1409 | 0 | if (!status.ok()) { |
1410 | 0 | LOG(WARNING) << "failed to get delete bitmap for tablet_id=" |
1411 | 0 | << tablet->tablet_id() << ", rowset_id=" << rowset_id |
1412 | 0 | << " from file, st=" << status.to_string(); |
1413 | 0 | std::lock_guard lock(result_mtx); |
1414 | 0 | if (result.ok()) { |
1415 | 0 | result = status; |
1416 | 0 | } |
1417 | 0 | } |
1418 | 0 | wait.signal(); |
1419 | 0 | }); |
1420 | 0 | RETURN_IF_ERROR(submit_st); |
1421 | 0 | } |
1422 | 0 | } |
1423 | | // wait for all finished |
1424 | 0 | wait.wait(); |
1425 | 0 | token->wait(); |
1426 | 0 | return result; |
1427 | 0 | } |
1428 | | |
1429 | | Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id, |
1430 | 0 | int64_t table_id, RowsetMetaSharedPtr* existed_rs_meta) { |
1431 | 0 | VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id() |
1432 | 0 | << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id(); |
1433 | 0 | { |
1434 | 0 | Status ret_st; |
1435 | 0 | TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_rowset", ret_st); |
1436 | 0 | } |
1437 | 0 | CreateRowsetRequest req; |
1438 | 0 | CreateRowsetResponse resp; |
1439 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1440 | 0 | req.set_txn_id(rs_meta.txn_id()); |
1441 | 0 | req.set_tablet_job_id(job_id); |
1442 | |
|
1443 | 0 | RowsetMetaPB doris_rs_meta = rs_meta.get_rowset_pb(/*skip_schema=*/true); |
1444 | 0 | doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(doris_rs_meta)); |
1445 | |
|
1446 | 0 | Status st = |
1447 | 0 | retry_rpc(MetaServiceRPC::PREPARE_ROWSET, req, &resp, &MetaService_Stub::prepare_rowset, |
1448 | 0 | { |
1449 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
1450 | 0 | .backpressure_handler = ms_backpressure_handler_, |
1451 | 0 | .table_id = table_id, |
1452 | 0 | }); |
1453 | 0 | if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) { |
1454 | 0 | if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) { |
1455 | 0 | RowsetMetaPB doris_rs_meta_tmp = |
1456 | 0 | cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta())); |
1457 | 0 | *existed_rs_meta = std::make_shared<RowsetMeta>(); |
1458 | 0 | (*existed_rs_meta)->init_from_pb(doris_rs_meta_tmp); |
1459 | 0 | } |
1460 | 0 | return Status::AlreadyExist("failed to prepare rowset: {}", resp.status().msg()); |
1461 | 0 | } |
1462 | 0 | return st; |
1463 | 0 | } |
1464 | | |
1465 | | Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_id, int64_t table_id, |
1466 | 0 | RowsetMetaSharedPtr* existed_rs_meta) { |
1467 | 0 | VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id() |
1468 | 0 | << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id(); |
1469 | 0 | { |
1470 | 0 | Status ret_st; |
1471 | 0 | TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_rowset", ret_st); |
1472 | 0 | } |
1473 | 0 | check_table_size_correctness(rs_meta); |
1474 | 0 | CreateRowsetRequest req; |
1475 | 0 | CreateRowsetResponse resp; |
1476 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1477 | 0 | req.set_txn_id(rs_meta.txn_id()); |
1478 | 0 | req.set_tablet_job_id(job_id); |
1479 | |
|
1480 | 0 | RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(); |
1481 | 0 | doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb)); |
1482 | 0 | Status st = |
1483 | 0 | retry_rpc(MetaServiceRPC::COMMIT_ROWSET, req, &resp, &MetaService_Stub::commit_rowset, |
1484 | 0 | { |
1485 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
1486 | 0 | .backpressure_handler = ms_backpressure_handler_, |
1487 | 0 | .table_id = table_id, |
1488 | 0 | }); |
1489 | 0 | if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) { |
1490 | 0 | if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) { |
1491 | 0 | RowsetMetaPB doris_rs_meta = |
1492 | 0 | cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta())); |
1493 | 0 | *existed_rs_meta = std::make_shared<RowsetMeta>(); |
1494 | 0 | (*existed_rs_meta)->init_from_pb(doris_rs_meta); |
1495 | 0 | } |
1496 | 0 | return Status::AlreadyExist("failed to commit rowset: {}", resp.status().msg()); |
1497 | 0 | } |
1498 | 0 | int64_t timeout_ms = -1; |
1499 | | // if the `job_id` is not empty, it means this rowset was produced by a compaction job. |
1500 | 0 | if (config::enable_compaction_delay_commit_for_warm_up && !job_id.empty()) { |
1501 | | // 1. assume the download speed is 100MB/s |
1502 | | // 2. we double the download time as timeout for safety |
1503 | | // 3. for small rowsets, the timeout we calculate maybe quite small, so we need a min_time_out |
1504 | 0 | const double speed_mbps = 100.0; // 100MB/s |
1505 | 0 | const double safety_factor = 2.0; |
1506 | 0 | timeout_ms = std::min( |
1507 | 0 | std::max(static_cast<int64_t>(static_cast<double>(rs_meta.total_disk_size()) / |
1508 | 0 | (speed_mbps * 1024 * 1024) * safety_factor * 1000), |
1509 | 0 | config::warm_up_rowset_sync_wait_min_timeout_ms), |
1510 | 0 | config::warm_up_rowset_sync_wait_max_timeout_ms); |
1511 | 0 | LOG(INFO) << "warm up rowset: " << rs_meta.version() << ", job_id: " << job_id |
1512 | 0 | << ", with timeout: " << timeout_ms << " ms"; |
1513 | 0 | } |
1514 | 0 | auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); |
1515 | 0 | manager.warm_up_rowset(rs_meta, timeout_ms); |
1516 | 0 | return st; |
1517 | 0 | } |
1518 | | |
1519 | 0 | void CloudMetaMgr::cache_committed_rowset(RowsetMetaSharedPtr rs_meta, int64_t expiration_time) { |
1520 | | // For load-generated rowsets (job_id is empty), add to pending rowset manager |
1521 | | // so FE can notify BE to promote them later |
1522 | | |
1523 | | // TODO(bobhan1): copy rs_meta? |
1524 | 0 | int64_t txn_id = rs_meta->txn_id(); |
1525 | 0 | int64_t tablet_id = rs_meta->tablet_id(); |
1526 | 0 | ExecEnv::GetInstance()->storage_engine().to_cloud().committed_rs_mgr().add_committed_rowset( |
1527 | 0 | txn_id, tablet_id, std::move(rs_meta), expiration_time); |
1528 | 0 | } |
1529 | | |
1530 | 0 | Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta, int64_t table_id) { |
1531 | 0 | VLOG_DEBUG << "update committed rowset, tablet_id: " << rs_meta.tablet_id() |
1532 | 0 | << ", rowset_id: " << rs_meta.rowset_id(); |
1533 | 0 | CreateRowsetRequest req; |
1534 | 0 | CreateRowsetResponse resp; |
1535 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1536 | | |
1537 | | // Variant schema maybe updated, so we need to update the schema as well. |
1538 | | // The updated rowset meta after `rowset->merge_rowset_meta` in `BaseTablet::update_delete_bitmap` |
1539 | | // will be lost in `update_tmp_rowset` if skip_schema.So in order to keep the latest schema we should keep schema in update_tmp_rowset |
1540 | | // for variant type |
1541 | 0 | bool skip_schema = rs_meta.tablet_schema()->num_variant_columns() == 0; |
1542 | 0 | RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(skip_schema); |
1543 | 0 | doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb)); |
1544 | 0 | Status st = retry_rpc(MetaServiceRPC::UPDATE_TMP_ROWSET, req, &resp, |
1545 | 0 | &MetaService_Stub::update_tmp_rowset, |
1546 | 0 | { |
1547 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
1548 | 0 | .backpressure_handler = ms_backpressure_handler_, |
1549 | 0 | .table_id = table_id, |
1550 | 0 | }); |
1551 | 0 | if (!st.ok() && resp.status().code() == MetaServiceCode::ROWSET_META_NOT_FOUND) { |
1552 | 0 | return Status::InternalError("failed to update committed rowset: {}", resp.status().msg()); |
1553 | 0 | } |
1554 | 0 | return st; |
1555 | 0 | } |
1556 | | |
1557 | | // async send TableStats(in res) to FE coz we are in streamload ctx, response to the user ASAP |
1558 | | static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id, |
1559 | | const std::string& label, CommitTxnResponse& res, |
1560 | 0 | const std::vector<int64_t>& tablet_ids) { |
1561 | 0 | std::string protobufBytes; |
1562 | 0 | if (txn_id != -1) { |
1563 | 0 | res.SerializeToString(&protobufBytes); |
1564 | 0 | } |
1565 | 0 | auto st = ExecEnv::GetInstance()->send_table_stats_thread_pool()->submit_func( |
1566 | 0 | [db_id, txn_id, label, protobufBytes, tablet_ids]() -> Status { |
1567 | 0 | TReportCommitTxnResultRequest request; |
1568 | 0 | TStatus result; |
1569 | |
|
1570 | 0 | if (txn_id != -1 && protobufBytes.length() <= 0) { |
1571 | 0 | LOG(WARNING) << "protobufBytes: " << protobufBytes.length(); |
1572 | 0 | return Status::OK(); // nobody cares the return status |
1573 | 0 | } |
1574 | | |
1575 | 0 | request.__set_dbId(db_id); |
1576 | 0 | request.__set_txnId(txn_id); |
1577 | 0 | request.__set_label(label); |
1578 | 0 | request.__set_payload(protobufBytes); |
1579 | 0 | request.__set_tabletIds(tablet_ids); |
1580 | |
|
1581 | 0 | Status status; |
1582 | 0 | int64_t duration_ns = 0; |
1583 | 0 | TNetworkAddress master_addr = |
1584 | 0 | ExecEnv::GetInstance()->cluster_info()->master_fe_addr; |
1585 | 0 | if (master_addr.hostname.empty() || master_addr.port == 0) { |
1586 | 0 | status = Status::Error<SERVICE_UNAVAILABLE>( |
1587 | 0 | "Have not get FE Master heartbeat yet"); |
1588 | 0 | } else { |
1589 | 0 | SCOPED_RAW_TIMER(&duration_ns); |
1590 | |
|
1591 | 0 | RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
1592 | 0 | master_addr.hostname, master_addr.port, |
1593 | 0 | [&request, &result](FrontendServiceConnection& client) { |
1594 | 0 | client->reportCommitTxnResult(result, request); |
1595 | 0 | })); |
1596 | | |
1597 | 0 | status = Status::create<false>(result); |
1598 | 0 | } |
1599 | 0 | g_cloud_commit_txn_resp_redirect_latency << duration_ns / 1000; |
1600 | |
|
1601 | 0 | if (!status.ok()) { |
1602 | 0 | LOG(WARNING) << "TableStats report RPC to FE failed, errmsg=" << status |
1603 | 0 | << " dbId=" << db_id << " txnId=" << txn_id << " label=" << label; |
1604 | 0 | return Status::OK(); // nobody cares the return status |
1605 | 0 | } else { |
1606 | 0 | LOG(INFO) << "TableStats report RPC to FE success, msg=" << status |
1607 | 0 | << " dbId=" << db_id << " txnId=" << txn_id << " label=" << label; |
1608 | 0 | return Status::OK(); |
1609 | 0 | } |
1610 | 0 | }); |
1611 | 0 | if (!st.ok()) { |
1612 | 0 | LOG(WARNING) << "TableStats report to FE task submission failed: " << st.to_string(); |
1613 | 0 | } |
1614 | 0 | } |
1615 | | |
1616 | 0 | Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) { |
1617 | 0 | VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id |
1618 | 0 | << ", label: " << ctx.label << ", is_2pc: " << is_2pc; |
1619 | 0 | { |
1620 | 0 | Status ret_st; |
1621 | 0 | TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_txn", ret_st); |
1622 | 0 | } |
1623 | 0 | CommitTxnRequest req; |
1624 | 0 | CommitTxnResponse res; |
1625 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1626 | 0 | req.set_db_id(ctx.db_id); |
1627 | 0 | req.set_txn_id(ctx.txn_id); |
1628 | 0 | req.set_is_2pc(is_2pc); |
1629 | 0 | req.set_enable_txn_lazy_commit(config::enable_cloud_txn_lazy_commit); |
1630 | 0 | auto st = retry_rpc(MetaServiceRPC::COMMIT_TXN, req, &res, &MetaService_Stub::commit_txn, |
1631 | 0 | { |
1632 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
1633 | 0 | .backpressure_handler = ms_backpressure_handler_, |
1634 | 0 | }); |
1635 | |
|
1636 | 0 | if (st.ok()) { |
1637 | 0 | std::vector<int64_t> tablet_ids; |
1638 | 0 | for (auto& commit_info : ctx.commit_infos) { |
1639 | 0 | tablet_ids.emplace_back(commit_info.tabletId); |
1640 | 0 | } |
1641 | 0 | send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, res, tablet_ids); |
1642 | 0 | } |
1643 | |
|
1644 | 0 | return st; |
1645 | 0 | } |
1646 | | |
1647 | 0 | Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) { |
1648 | 0 | VLOG_DEBUG << "abort txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id |
1649 | 0 | << ", label: " << ctx.label; |
1650 | 0 | { |
1651 | 0 | Status ret_st; |
1652 | 0 | TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::abort_txn", ret_st); |
1653 | 0 | } |
1654 | 0 | AbortTxnRequest req; |
1655 | 0 | AbortTxnResponse res; |
1656 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1657 | 0 | req.set_reason(std::string(ctx.status.msg().substr(0, 1024))); |
1658 | 0 | if (ctx.db_id > 0 && !ctx.label.empty()) { |
1659 | 0 | req.set_db_id(ctx.db_id); |
1660 | 0 | req.set_label(ctx.label); |
1661 | 0 | } else if (ctx.txn_id > 0) { |
1662 | 0 | req.set_txn_id(ctx.txn_id); |
1663 | 0 | } else { |
1664 | 0 | LOG(WARNING) << "failed abort txn, with illegal input, db_id=" << ctx.db_id |
1665 | 0 | << " txn_id=" << ctx.txn_id << " label=" << ctx.label; |
1666 | 0 | return Status::InternalError<false>("failed to abort txn"); |
1667 | 0 | } |
1668 | 0 | return retry_rpc(MetaServiceRPC::ABORT_TXN, req, &res, &MetaService_Stub::abort_txn, |
1669 | 0 | { |
1670 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
1671 | 0 | .backpressure_handler = ms_backpressure_handler_, |
1672 | 0 | }); |
1673 | 0 | } |
1674 | | |
1675 | 0 | Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) { |
1676 | 0 | VLOG_DEBUG << "precommit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id |
1677 | 0 | << ", label: " << ctx.label; |
1678 | 0 | { |
1679 | 0 | Status ret_st; |
1680 | 0 | TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::precommit_txn", ret_st); |
1681 | 0 | } |
1682 | 0 | PrecommitTxnRequest req; |
1683 | 0 | PrecommitTxnResponse res; |
1684 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1685 | 0 | req.set_db_id(ctx.db_id); |
1686 | 0 | req.set_txn_id(ctx.txn_id); |
1687 | 0 | return retry_rpc(MetaServiceRPC::PRECOMMIT_TXN, req, &res, &MetaService_Stub::precommit_txn, |
1688 | 0 | { |
1689 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
1690 | 0 | .backpressure_handler = ms_backpressure_handler_, |
1691 | 0 | }); |
1692 | 0 | } |
1693 | | |
1694 | 0 | Status CloudMetaMgr::prepare_restore_job(const TabletMetaPB& tablet_meta) { |
1695 | 0 | VLOG_DEBUG << "prepare restore job, tablet_id: " << tablet_meta.tablet_id(); |
1696 | 0 | RestoreJobRequest req; |
1697 | 0 | RestoreJobResponse resp; |
1698 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1699 | 0 | req.set_tablet_id(tablet_meta.tablet_id()); |
1700 | 0 | req.set_expiration(config::snapshot_expire_time_sec); |
1701 | 0 | req.set_action(RestoreJobRequest::PREPARE); |
1702 | |
|
1703 | 0 | doris_tablet_meta_to_cloud(req.mutable_tablet_meta(), std::move(tablet_meta)); |
1704 | 0 | return retry_rpc(MetaServiceRPC::PREPARE_RESTORE_JOB, req, &resp, |
1705 | 0 | &MetaService_Stub::prepare_restore_job, |
1706 | 0 | { |
1707 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
1708 | 0 | .backpressure_handler = ms_backpressure_handler_, |
1709 | 0 | }); |
1710 | 0 | } |
1711 | | |
1712 | 0 | Status CloudMetaMgr::commit_restore_job(const int64_t tablet_id) { |
1713 | 0 | VLOG_DEBUG << "commit restore job, tablet_id: " << tablet_id; |
1714 | 0 | RestoreJobRequest req; |
1715 | 0 | RestoreJobResponse resp; |
1716 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1717 | 0 | req.set_tablet_id(tablet_id); |
1718 | 0 | req.set_action(RestoreJobRequest::COMMIT); |
1719 | 0 | req.set_store_version(config::delete_bitmap_store_write_version); |
1720 | |
|
1721 | 0 | return retry_rpc(MetaServiceRPC::COMMIT_RESTORE_JOB, req, &resp, |
1722 | 0 | &MetaService_Stub::commit_restore_job, |
1723 | 0 | { |
1724 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
1725 | 0 | .backpressure_handler = ms_backpressure_handler_, |
1726 | 0 | }); |
1727 | 0 | } |
1728 | | |
1729 | 0 | Status CloudMetaMgr::finish_restore_job(const int64_t tablet_id, bool is_completed) { |
1730 | 0 | VLOG_DEBUG << "finish restore job, tablet_id: " << tablet_id |
1731 | 0 | << ", is_completed: " << is_completed; |
1732 | 0 | RestoreJobRequest req; |
1733 | 0 | RestoreJobResponse resp; |
1734 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1735 | 0 | req.set_tablet_id(tablet_id); |
1736 | 0 | req.set_action(is_completed ? RestoreJobRequest::COMPLETE : RestoreJobRequest::ABORT); |
1737 | |
|
1738 | 0 | return retry_rpc(MetaServiceRPC::FINISH_RESTORE_JOB, req, &resp, |
1739 | 0 | &MetaService_Stub::finish_restore_job, |
1740 | 0 | { |
1741 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
1742 | 0 | .backpressure_handler = ms_backpressure_handler_, |
1743 | 0 | }); |
1744 | 0 | } |
1745 | | |
1746 | 0 | Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos, bool* is_vault_mode) { |
1747 | 0 | GetObjStoreInfoRequest req; |
1748 | 0 | GetObjStoreInfoResponse resp; |
1749 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1750 | 0 | Status s = retry_rpc(MetaServiceRPC::GET_OBJ_STORE_INFO, req, &resp, |
1751 | 0 | &MetaService_Stub::get_obj_store_info, |
1752 | 0 | { |
1753 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
1754 | 0 | .backpressure_handler = ms_backpressure_handler_, |
1755 | 0 | }); |
1756 | 0 | if (!s.ok()) { |
1757 | 0 | return s; |
1758 | 0 | } |
1759 | | |
1760 | 0 | *is_vault_mode = resp.enable_storage_vault(); |
1761 | |
|
1762 | 0 | auto add_obj_store = [&vault_infos](const auto& obj_store) { |
1763 | 0 | vault_infos->emplace_back(obj_store.id(), S3Conf::get_s3_conf(obj_store), |
1764 | 0 | StorageVaultPB_PathFormat {}); |
1765 | 0 | }; |
1766 | |
|
1767 | 0 | std::ranges::for_each(resp.obj_info(), add_obj_store); |
1768 | 0 | std::ranges::for_each(resp.storage_vault(), [&](const auto& vault) { |
1769 | 0 | if (vault.has_hdfs_info()) { |
1770 | 0 | vault_infos->emplace_back(vault.id(), vault.hdfs_info(), vault.path_format()); |
1771 | 0 | } |
1772 | 0 | if (vault.has_obj_info()) { |
1773 | 0 | add_obj_store(vault.obj_info()); |
1774 | 0 | } |
1775 | 0 | }); |
1776 | | |
1777 | | // desensitization, hide secret |
1778 | 0 | for (int i = 0; i < resp.obj_info_size(); ++i) { |
1779 | 0 | resp.mutable_obj_info(i)->set_sk(resp.obj_info(i).sk().substr(0, 2) + "xxx"); |
1780 | 0 | } |
1781 | 0 | for (int i = 0; i < resp.storage_vault_size(); ++i) { |
1782 | 0 | auto* j = resp.mutable_storage_vault(i); |
1783 | 0 | if (!j->has_obj_info()) continue; |
1784 | 0 | j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx"); |
1785 | 0 | } |
1786 | |
|
1787 | 0 | for (int i = 0; i < resp.obj_info_size(); ++i) { |
1788 | 0 | resp.mutable_obj_info(i)->set_ak(hide_access_key(resp.obj_info(i).sk())); |
1789 | 0 | } |
1790 | 0 | for (int i = 0; i < resp.storage_vault_size(); ++i) { |
1791 | 0 | auto* j = resp.mutable_storage_vault(i); |
1792 | 0 | if (!j->has_obj_info()) continue; |
1793 | 0 | j->mutable_obj_info()->set_sk(hide_access_key(j->obj_info().sk())); |
1794 | 0 | } |
1795 | |
|
1796 | 0 | LOG(INFO) << "get storage vault, enable_storage_vault=" << *is_vault_mode |
1797 | 0 | << " response=" << resp.ShortDebugString(); |
1798 | 0 | return Status::OK(); |
1799 | 0 | } |
1800 | | |
1801 | 0 | Status CloudMetaMgr::prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res) { |
1802 | 0 | VLOG_DEBUG << "prepare_tablet_job: " << job.ShortDebugString(); |
1803 | 0 | TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_tablet_job", Status::OK(), job, res); |
1804 | |
|
1805 | 0 | StartTabletJobRequest req; |
1806 | 0 | req.mutable_job()->CopyFrom(job); |
1807 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1808 | 0 | return retry_rpc(MetaServiceRPC::START_TABLET_JOB, req, res, |
1809 | 0 | &MetaService_Stub::start_tablet_job, |
1810 | 0 | { |
1811 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
1812 | 0 | .backpressure_handler = ms_backpressure_handler_, |
1813 | 0 | }); |
1814 | 0 | } |
1815 | | |
1816 | 0 | Status CloudMetaMgr::commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJobResponse* res) { |
1817 | 0 | VLOG_DEBUG << "commit_tablet_job: " << job.ShortDebugString(); |
1818 | 0 | TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_tablet_job", Status::OK(), job, res); |
1819 | 0 | DBUG_EXECUTE_IF("CloudMetaMgr::commit_tablet_job.fail", { |
1820 | 0 | return Status::InternalError<false>("inject CloudMetaMgr::commit_tablet_job.fail"); |
1821 | 0 | }); |
1822 | |
|
1823 | 0 | FinishTabletJobRequest req; |
1824 | 0 | req.mutable_job()->CopyFrom(job); |
1825 | 0 | req.set_action(FinishTabletJobRequest::COMMIT); |
1826 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1827 | 0 | auto st = retry_rpc(MetaServiceRPC::FINISH_TABLET_JOB, req, res, |
1828 | 0 | &MetaService_Stub::finish_tablet_job, |
1829 | 0 | { |
1830 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
1831 | 0 | .backpressure_handler = ms_backpressure_handler_, |
1832 | 0 | }); |
1833 | 0 | if (res->status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) { |
1834 | 0 | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>( |
1835 | 0 | "txn conflict when commit tablet job {}", job.ShortDebugString()); |
1836 | 0 | } |
1837 | | |
1838 | 0 | if (st.ok() && !job.compaction().empty() && job.has_idx()) { |
1839 | 0 | CommitTxnResponse commit_txn_resp; |
1840 | 0 | std::vector<int64_t> tablet_ids = {job.idx().tablet_id()}; |
1841 | 0 | send_stats_to_fe_async(-1, -1, "", commit_txn_resp, tablet_ids); |
1842 | 0 | } |
1843 | |
|
1844 | 0 | return st; |
1845 | 0 | } |
1846 | | |
1847 | 0 | Status CloudMetaMgr::abort_tablet_job(const TabletJobInfoPB& job) { |
1848 | 0 | VLOG_DEBUG << "abort_tablet_job: " << job.ShortDebugString(); |
1849 | 0 | TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::abort_tablet_job", Status::OK(), job); |
1850 | 0 | FinishTabletJobRequest req; |
1851 | 0 | FinishTabletJobResponse res; |
1852 | 0 | req.mutable_job()->CopyFrom(job); |
1853 | 0 | req.set_action(FinishTabletJobRequest::ABORT); |
1854 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1855 | 0 | return retry_rpc(MetaServiceRPC::FINISH_TABLET_JOB, req, &res, |
1856 | 0 | &MetaService_Stub::finish_tablet_job, |
1857 | 0 | { |
1858 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
1859 | 0 | .backpressure_handler = ms_backpressure_handler_, |
1860 | 0 | }); |
1861 | 0 | } |
1862 | | |
1863 | 0 | Status CloudMetaMgr::lease_tablet_job(const TabletJobInfoPB& job) { |
1864 | 0 | VLOG_DEBUG << "lease_tablet_job: " << job.ShortDebugString(); |
1865 | 0 | FinishTabletJobRequest req; |
1866 | 0 | FinishTabletJobResponse res; |
1867 | 0 | req.mutable_job()->CopyFrom(job); |
1868 | 0 | req.set_action(FinishTabletJobRequest::LEASE); |
1869 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1870 | 0 | return retry_rpc(MetaServiceRPC::FINISH_TABLET_JOB, req, &res, |
1871 | 0 | &MetaService_Stub::finish_tablet_job, |
1872 | 0 | { |
1873 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
1874 | 0 | .backpressure_handler = ms_backpressure_handler_, |
1875 | 0 | }); |
1876 | 0 | } |
1877 | | |
1878 | | static void add_delete_bitmap(DeleteBitmapPB& delete_bitmap_pb, const DeleteBitmap::BitmapKey& key, |
1879 | 0 | roaring::Roaring& bitmap) { |
1880 | 0 | delete_bitmap_pb.add_rowset_ids(std::get<0>(key).to_string()); |
1881 | 0 | delete_bitmap_pb.add_segment_ids(std::get<1>(key)); |
1882 | 0 | delete_bitmap_pb.add_versions(std::get<2>(key)); |
1883 | | // To save space, convert array and bitmap containers to run containers |
1884 | 0 | bitmap.runOptimize(); |
1885 | 0 | std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); |
1886 | 0 | bitmap.write(bitmap_data.data()); |
1887 | 0 | *(delete_bitmap_pb.add_segment_delete_bitmaps()) = std::move(bitmap_data); |
1888 | 0 | } |
1889 | | |
1890 | | static Status store_delete_bitmap(std::string& rowset_id, DeleteBitmapPB& delete_bitmap_pb, |
1891 | | int64_t tablet_id, |
1892 | | std::optional<StorageResource> storage_resource, |
1893 | 0 | UpdateDeleteBitmapRequest& req, int64_t txn_id) { |
1894 | 0 | if (config::enable_mow_verbose_log) { |
1895 | 0 | std::stringstream ss; |
1896 | 0 | for (int i = 0; i < delete_bitmap_pb.rowset_ids_size(); i++) { |
1897 | 0 | ss << "{rid=" << delete_bitmap_pb.rowset_ids(i) |
1898 | 0 | << ", sid=" << delete_bitmap_pb.segment_ids(i) |
1899 | 0 | << ", ver=" << delete_bitmap_pb.versions(i) << "}, "; |
1900 | 0 | } |
1901 | 0 | LOG(INFO) << "handle one rowset delete bitmap for tablet_id: " << tablet_id |
1902 | 0 | << ", rowset_id: " << rowset_id |
1903 | 0 | << ", delete_bitmap num: " << delete_bitmap_pb.rowset_ids_size() |
1904 | 0 | << ", size: " << delete_bitmap_pb.ByteSizeLong() << ", keys=[" << ss.str() |
1905 | 0 | << "]"; |
1906 | 0 | } |
1907 | 0 | if (delete_bitmap_pb.rowset_ids_size() == 0) { |
1908 | 0 | return Status::OK(); |
1909 | 0 | } |
1910 | 0 | DeleteBitmapStoragePB delete_bitmap_storage; |
1911 | 0 | if (config::delete_bitmap_store_v2_max_bytes_in_fdb >= 0 && |
1912 | 0 | delete_bitmap_pb.ByteSizeLong() > config::delete_bitmap_store_v2_max_bytes_in_fdb) { |
1913 | | // Enable packed file only for load (txn_id > 0) |
1914 | 0 | bool enable_packed = config::enable_packed_file && txn_id > 0; |
1915 | 0 | DeleteBitmapFileWriter file_writer(tablet_id, rowset_id, storage_resource, enable_packed, |
1916 | 0 | txn_id); |
1917 | 0 | RETURN_IF_ERROR(file_writer.init()); |
1918 | 0 | RETURN_IF_ERROR(file_writer.write(delete_bitmap_pb)); |
1919 | 0 | RETURN_IF_ERROR(file_writer.close()); |
1920 | 0 | delete_bitmap_pb.Clear(); |
1921 | 0 | delete_bitmap_storage.set_store_in_fdb(false); |
1922 | | |
1923 | | // Store packed slice location if file was written to packed file |
1924 | 0 | if (file_writer.is_packed()) { |
1925 | 0 | io::PackedSliceLocation loc; |
1926 | 0 | RETURN_IF_ERROR(file_writer.get_packed_slice_location(&loc)); |
1927 | 0 | auto* packed_loc = delete_bitmap_storage.mutable_packed_slice_location(); |
1928 | 0 | packed_loc->set_packed_file_path(loc.packed_file_path); |
1929 | 0 | packed_loc->set_offset(loc.offset); |
1930 | 0 | packed_loc->set_size(loc.size); |
1931 | 0 | packed_loc->set_packed_file_size(loc.packed_file_size); |
1932 | 0 | } |
1933 | 0 | } else { |
1934 | 0 | delete_bitmap_storage.set_store_in_fdb(true); |
1935 | 0 | *(delete_bitmap_storage.mutable_delete_bitmap()) = std::move(delete_bitmap_pb); |
1936 | 0 | } |
1937 | 0 | req.add_delta_rowset_ids(rowset_id); |
1938 | 0 | *(req.add_delete_bitmap_storages()) = std::move(delete_bitmap_storage); |
1939 | 0 | return Status::OK(); |
1940 | 0 | } |
1941 | | |
1942 | | Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id, |
1943 | | int64_t initiator, DeleteBitmap* delete_bitmap, |
1944 | | DeleteBitmap* delete_bitmap_v2, std::string rowset_id, |
1945 | | std::optional<StorageResource> storage_resource, |
1946 | | int64_t store_version, int64_t table_id, int64_t txn_id, |
1947 | 0 | bool is_explicit_txn, int64_t next_visible_version) { |
1948 | 0 | VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet.tablet_id(); |
1949 | 0 | if (config::enable_mow_verbose_log) { |
1950 | 0 | std::stringstream ss; |
1951 | 0 | ss << "start update delete bitmap for tablet_id: " << tablet.tablet_id() |
1952 | 0 | << ", rowset_id: " << rowset_id |
1953 | 0 | << ", delete_bitmap num: " << delete_bitmap->delete_bitmap.size() |
1954 | 0 | << ", store_version: " << store_version << ", lock_id=" << lock_id |
1955 | 0 | << ", initiator=" << initiator; |
1956 | 0 | if (store_version == 2 || store_version == 3) { |
1957 | 0 | ss << ", delete_bitmap v2 num: " << delete_bitmap_v2->delete_bitmap.size(); |
1958 | 0 | } |
1959 | 0 | LOG(INFO) << ss.str(); |
1960 | 0 | } |
1961 | 0 | UpdateDeleteBitmapRequest req; |
1962 | 0 | UpdateDeleteBitmapResponse res; |
1963 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1964 | 0 | req.set_table_id(tablet.table_id()); |
1965 | 0 | req.set_partition_id(tablet.partition_id()); |
1966 | 0 | req.set_tablet_id(tablet.tablet_id()); |
1967 | 0 | req.set_lock_id(lock_id); |
1968 | 0 | req.set_initiator(initiator); |
1969 | 0 | req.set_is_explicit_txn(is_explicit_txn); |
1970 | 0 | if (txn_id > 0) { |
1971 | 0 | req.set_txn_id(txn_id); |
1972 | 0 | } |
1973 | 0 | if (next_visible_version > 0) { |
1974 | 0 | req.set_next_visible_version(next_visible_version); |
1975 | 0 | } |
1976 | 0 | req.set_store_version(store_version); |
1977 | |
|
1978 | 0 | bool write_v1 = store_version == 1 || store_version == 3; |
1979 | 0 | bool write_v2 = store_version == 2 || store_version == 3; |
1980 | | // write v1 kvs |
1981 | 0 | if (write_v1) { |
1982 | 0 | for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) { |
1983 | 0 | req.add_rowset_ids(std::get<0>(key).to_string()); |
1984 | 0 | req.add_segment_ids(std::get<1>(key)); |
1985 | 0 | req.add_versions(std::get<2>(key)); |
1986 | | // To save space, convert array and bitmap containers to run containers |
1987 | 0 | bitmap.runOptimize(); |
1988 | 0 | std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); |
1989 | 0 | bitmap.write(bitmap_data.data()); |
1990 | 0 | *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data); |
1991 | 0 | } |
1992 | 0 | } |
1993 | | |
1994 | | // write v2 kvs |
1995 | 0 | if (write_v2) { |
1996 | 0 | if (config::enable_mow_verbose_log) { |
1997 | 0 | LOG(INFO) << "update delete bitmap for tablet_id: " << tablet.tablet_id() |
1998 | 0 | << ", rowset_id: " << rowset_id |
1999 | 0 | << ", delete_bitmap num: " << delete_bitmap_v2->delete_bitmap.size() |
2000 | 0 | << ", lock_id=" << lock_id << ", initiator=" << initiator; |
2001 | 0 | } |
2002 | 0 | if (rowset_id.empty()) { |
2003 | 0 | std::string pre_rowset_id = ""; |
2004 | 0 | std::string cur_rowset_id = ""; |
2005 | 0 | DeleteBitmapPB delete_bitmap_pb; |
2006 | 0 | for (auto it = delete_bitmap_v2->delete_bitmap.begin(); |
2007 | 0 | it != delete_bitmap_v2->delete_bitmap.end(); ++it) { |
2008 | 0 | auto& key = it->first; |
2009 | 0 | auto& bitmap = it->second; |
2010 | 0 | cur_rowset_id = std::get<0>(key).to_string(); |
2011 | 0 | if (cur_rowset_id != pre_rowset_id) { |
2012 | 0 | if (!pre_rowset_id.empty() && delete_bitmap_pb.rowset_ids_size() > 0) { |
2013 | 0 | RETURN_IF_ERROR(store_delete_bitmap(pre_rowset_id, delete_bitmap_pb, |
2014 | 0 | tablet.tablet_id(), storage_resource, |
2015 | 0 | req, txn_id)); |
2016 | 0 | } |
2017 | 0 | pre_rowset_id = cur_rowset_id; |
2018 | 0 | DCHECK_EQ(delete_bitmap_pb.rowset_ids_size(), 0); |
2019 | 0 | DCHECK_EQ(delete_bitmap_pb.segment_ids_size(), 0); |
2020 | 0 | DCHECK_EQ(delete_bitmap_pb.versions_size(), 0); |
2021 | 0 | DCHECK_EQ(delete_bitmap_pb.segment_delete_bitmaps_size(), 0); |
2022 | 0 | } |
2023 | 0 | add_delete_bitmap(delete_bitmap_pb, key, bitmap); |
2024 | 0 | } |
2025 | 0 | if (delete_bitmap_pb.rowset_ids_size() > 0) { |
2026 | 0 | DCHECK(!cur_rowset_id.empty()); |
2027 | 0 | RETURN_IF_ERROR(store_delete_bitmap(cur_rowset_id, delete_bitmap_pb, |
2028 | 0 | tablet.tablet_id(), storage_resource, req, |
2029 | 0 | txn_id)); |
2030 | 0 | } |
2031 | 0 | } else { |
2032 | 0 | DeleteBitmapPB delete_bitmap_pb; |
2033 | 0 | for (auto& [key, bitmap] : delete_bitmap_v2->delete_bitmap) { |
2034 | 0 | add_delete_bitmap(delete_bitmap_pb, key, bitmap); |
2035 | 0 | } |
2036 | 0 | RETURN_IF_ERROR(store_delete_bitmap(rowset_id, delete_bitmap_pb, tablet.tablet_id(), |
2037 | 0 | storage_resource, req, txn_id)); |
2038 | 0 | } |
2039 | 0 | DCHECK_EQ(req.delta_rowset_ids_size(), req.delete_bitmap_storages_size()); |
2040 | 0 | } |
2041 | 0 | DBUG_EXECUTE_IF("CloudMetaMgr::test_update_big_delete_bitmap", { |
2042 | 0 | LOG(INFO) << "test_update_big_delete_bitmap for tablet " << tablet.tablet_id(); |
2043 | 0 | auto count = dp->param<int>("count", 30000); |
2044 | 0 | if (!delete_bitmap->delete_bitmap.empty()) { |
2045 | 0 | auto& key = delete_bitmap->delete_bitmap.begin()->first; |
2046 | 0 | auto& bitmap = delete_bitmap->delete_bitmap.begin()->second; |
2047 | 0 | for (int i = 1000; i < (1000 + count); i++) { |
2048 | 0 | req.add_rowset_ids(std::get<0>(key).to_string()); |
2049 | 0 | req.add_segment_ids(std::get<1>(key)); |
2050 | 0 | req.add_versions(i); |
2051 | | // To save space, convert array and bitmap containers to run containers |
2052 | 0 | bitmap.runOptimize(); |
2053 | 0 | std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); |
2054 | 0 | bitmap.write(bitmap_data.data()); |
2055 | 0 | *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data); |
2056 | 0 | } |
2057 | 0 | } |
2058 | 0 | }); |
2059 | 0 | DBUG_EXECUTE_IF("CloudMetaMgr::test_update_delete_bitmap_fail", { |
2060 | 0 | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>( |
2061 | 0 | "test update delete bitmap failed, tablet_id: {}, lock_id: {}", tablet.tablet_id(), |
2062 | 0 | lock_id); |
2063 | 0 | }); |
2064 | 0 | auto st = retry_rpc(MetaServiceRPC::UPDATE_DELETE_BITMAP, req, &res, |
2065 | 0 | &MetaService_Stub::update_delete_bitmap, |
2066 | 0 | { |
2067 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
2068 | 0 | .backpressure_handler = ms_backpressure_handler_, |
2069 | 0 | .table_id = table_id, |
2070 | 0 | }); |
2071 | 0 | if (config::enable_update_delete_bitmap_kv_check_core && |
2072 | 0 | res.status().code() == MetaServiceCode::UPDATE_OVERRIDE_EXISTING_KV) { |
2073 | 0 | auto& msg = res.status().msg(); |
2074 | 0 | LOG_WARNING(msg); |
2075 | 0 | CHECK(false) << msg; |
2076 | 0 | } |
2077 | 0 | if (res.status().code() == MetaServiceCode::LOCK_EXPIRED) { |
2078 | 0 | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>( |
2079 | 0 | "lock expired when update delete bitmap, tablet_id: {}, lock_id: {}, initiator: " |
2080 | 0 | "{}, error_msg: {}", |
2081 | 0 | tablet.tablet_id(), lock_id, initiator, res.status().msg()); |
2082 | 0 | } |
2083 | 0 | return st; |
2084 | 0 | } |
2085 | | |
2086 | | Status CloudMetaMgr::cloud_update_delete_bitmap_without_lock( |
2087 | | const CloudTablet& tablet, DeleteBitmap* delete_bitmap, |
2088 | | std::map<std::string, int64_t>& rowset_to_versions, int64_t table_id, |
2089 | 0 | int64_t pre_rowset_agg_start_version, int64_t pre_rowset_agg_end_version) { |
2090 | 0 | if (config::delete_bitmap_store_write_version == 2) { |
2091 | 0 | VLOG_DEBUG << "no need to agg delete bitmap v1 in ms because use v2"; |
2092 | 0 | return Status::OK(); |
2093 | 0 | } |
2094 | 0 | LOG(INFO) << "cloud_update_delete_bitmap_without_lock, tablet_id: " << tablet.tablet_id() |
2095 | 0 | << ", delete_bitmap size: " << delete_bitmap->delete_bitmap.size(); |
2096 | 0 | UpdateDeleteBitmapRequest req; |
2097 | 0 | UpdateDeleteBitmapResponse res; |
2098 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
2099 | 0 | req.set_table_id(tablet.table_id()); |
2100 | 0 | req.set_partition_id(tablet.partition_id()); |
2101 | 0 | req.set_tablet_id(tablet.tablet_id()); |
2102 | | // use a fake lock id to resolve compatibility issues |
2103 | 0 | req.set_lock_id(-3); |
2104 | 0 | req.set_without_lock(true); |
2105 | 0 | for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) { |
2106 | 0 | req.add_rowset_ids(std::get<0>(key).to_string()); |
2107 | 0 | req.add_segment_ids(std::get<1>(key)); |
2108 | 0 | req.add_versions(std::get<2>(key)); |
2109 | 0 | if (pre_rowset_agg_end_version > 0) { |
2110 | 0 | DCHECK(rowset_to_versions.find(std::get<0>(key).to_string()) != |
2111 | 0 | rowset_to_versions.end()) |
2112 | 0 | << "rowset_to_versions not found for key=" << std::get<0>(key).to_string(); |
2113 | 0 | req.add_pre_rowset_versions(rowset_to_versions[std::get<0>(key).to_string()]); |
2114 | 0 | } |
2115 | 0 | DCHECK(pre_rowset_agg_end_version <= 0 || pre_rowset_agg_end_version == std::get<2>(key)) |
2116 | 0 | << "pre_rowset_agg_end_version=" << pre_rowset_agg_end_version |
2117 | 0 | << " not equal to version=" << std::get<2>(key); |
2118 | | // To save space, convert array and bitmap containers to run containers |
2119 | 0 | bitmap.runOptimize(); |
2120 | 0 | std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); |
2121 | 0 | bitmap.write(bitmap_data.data()); |
2122 | 0 | *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data); |
2123 | 0 | } |
2124 | 0 | if (pre_rowset_agg_start_version > 0 && pre_rowset_agg_end_version > 0) { |
2125 | 0 | req.set_pre_rowset_agg_start_version(pre_rowset_agg_start_version); |
2126 | 0 | req.set_pre_rowset_agg_end_version(pre_rowset_agg_end_version); |
2127 | 0 | } |
2128 | 0 | return retry_rpc(MetaServiceRPC::UPDATE_DELETE_BITMAP, req, &res, |
2129 | 0 | &MetaService_Stub::update_delete_bitmap, |
2130 | 0 | { |
2131 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
2132 | 0 | .backpressure_handler = ms_backpressure_handler_, |
2133 | 0 | .table_id = table_id, |
2134 | 0 | }); |
2135 | 0 | } |
2136 | | |
2137 | | Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, |
2138 | 0 | int64_t initiator) { |
2139 | 0 | DBUG_EXECUTE_IF("get_delete_bitmap_update_lock.inject_fail", { |
2140 | 0 | auto p = dp->param("percent", 0.01); |
2141 | 0 | std::mt19937 gen {std::random_device {}()}; |
2142 | 0 | std::bernoulli_distribution inject_fault {p}; |
2143 | 0 | if (inject_fault(gen)) { |
2144 | 0 | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>( |
2145 | 0 | "injection error when get get_delete_bitmap_update_lock, " |
2146 | 0 | "tablet_id={}, lock_id={}, initiator={}", |
2147 | 0 | tablet.tablet_id(), lock_id, initiator); |
2148 | 0 | } |
2149 | 0 | }); |
2150 | 0 | VLOG_DEBUG << "get_delete_bitmap_update_lock , tablet_id: " << tablet.tablet_id() |
2151 | 0 | << ",lock_id:" << lock_id; |
2152 | 0 | GetDeleteBitmapUpdateLockRequest req; |
2153 | 0 | GetDeleteBitmapUpdateLockResponse res; |
2154 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
2155 | 0 | req.set_table_id(tablet.table_id()); |
2156 | 0 | req.set_lock_id(lock_id); |
2157 | 0 | req.set_initiator(initiator); |
2158 | | // set expiration time for compaction and schema_change |
2159 | 0 | req.set_expiration(config::delete_bitmap_lock_expiration_seconds); |
2160 | 0 | int retry_times = 0; |
2161 | 0 | Status st; |
2162 | 0 | std::default_random_engine rng = make_random_engine(); |
2163 | 0 | std::uniform_int_distribution<uint32_t> u(500, 2000); |
2164 | 0 | uint64_t backoff_sleep_time_ms {0}; |
2165 | 0 | do { |
2166 | 0 | bool test_conflict = false; |
2167 | 0 | st = retry_rpc(MetaServiceRPC::GET_DELETE_BITMAP_UPDATE_LOCK, req, &res, |
2168 | 0 | &MetaService_Stub::get_delete_bitmap_update_lock, |
2169 | 0 | { |
2170 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
2171 | 0 | .backpressure_handler = ms_backpressure_handler_, |
2172 | 0 | }); |
2173 | 0 | DBUG_EXECUTE_IF("CloudMetaMgr::test_get_delete_bitmap_update_lock_conflict", |
2174 | 0 | { test_conflict = true; }); |
2175 | 0 | if (!test_conflict && res.status().code() != MetaServiceCode::LOCK_CONFLICT) { |
2176 | 0 | break; |
2177 | 0 | } |
2178 | | |
2179 | 0 | uint32_t duration_ms = u(rng); |
2180 | 0 | LOG(WARNING) << "get delete bitmap lock conflict. " << debug_info(req) |
2181 | 0 | << " retry_times=" << retry_times << " sleep=" << duration_ms |
2182 | 0 | << "ms : " << res.status().msg(); |
2183 | 0 | auto start = std::chrono::steady_clock::now(); |
2184 | 0 | bthread_usleep(duration_ms * 1000); |
2185 | 0 | auto end = std::chrono::steady_clock::now(); |
2186 | 0 | backoff_sleep_time_ms += duration_cast<std::chrono::milliseconds>(end - start).count(); |
2187 | 0 | } while (++retry_times <= config::get_delete_bitmap_lock_max_retry_times); |
2188 | 0 | g_cloud_be_mow_get_dbm_lock_backoff_sleep_time << backoff_sleep_time_ms; |
2189 | 0 | DBUG_EXECUTE_IF("CloudMetaMgr.get_delete_bitmap_update_lock.inject_sleep", { |
2190 | 0 | auto p = dp->param("percent", 0.01); |
2191 | | // 100s > Config.calculate_delete_bitmap_task_timeout_seconds = 60s |
2192 | 0 | auto sleep_time = dp->param("sleep", 15); |
2193 | 0 | std::mt19937 gen {std::random_device {}()}; |
2194 | 0 | std::bernoulli_distribution inject_fault {p}; |
2195 | 0 | if (inject_fault(gen)) { |
2196 | 0 | LOG_INFO("injection sleep for {} seconds, tablet_id={}", sleep_time, |
2197 | 0 | tablet.tablet_id()); |
2198 | 0 | std::this_thread::sleep_for(std::chrono::seconds(sleep_time)); |
2199 | 0 | } |
2200 | 0 | }); |
2201 | 0 | if (res.status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) { |
2202 | 0 | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>( |
2203 | 0 | "txn conflict when get delete bitmap update lock, table_id {}, lock_id {}, " |
2204 | 0 | "initiator {}", |
2205 | 0 | tablet.table_id(), lock_id, initiator); |
2206 | 0 | } else if (res.status().code() == MetaServiceCode::LOCK_CONFLICT) { |
2207 | 0 | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>( |
2208 | 0 | "lock conflict when get delete bitmap update lock, table_id {}, lock_id {}, " |
2209 | 0 | "initiator {}", |
2210 | 0 | tablet.table_id(), lock_id, initiator); |
2211 | 0 | } |
2212 | 0 | return st; |
2213 | 0 | } |
2214 | | |
2215 | | void CloudMetaMgr::remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id, |
2216 | 0 | int64_t initiator, int64_t tablet_id) { |
2217 | 0 | LOG(INFO) << "remove_delete_bitmap_update_lock ,table_id: " << table_id |
2218 | 0 | << ",lock_id:" << lock_id << ",initiator:" << initiator << ",tablet_id:" << tablet_id; |
2219 | 0 | RemoveDeleteBitmapUpdateLockRequest req; |
2220 | 0 | RemoveDeleteBitmapUpdateLockResponse res; |
2221 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
2222 | 0 | req.set_table_id(table_id); |
2223 | 0 | req.set_tablet_id(tablet_id); |
2224 | 0 | req.set_lock_id(lock_id); |
2225 | 0 | req.set_initiator(initiator); |
2226 | 0 | auto st = retry_rpc(MetaServiceRPC::REMOVE_DELETE_BITMAP_UPDATE_LOCK, req, &res, |
2227 | 0 | &MetaService_Stub::remove_delete_bitmap_update_lock, |
2228 | 0 | { |
2229 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
2230 | 0 | .backpressure_handler = ms_backpressure_handler_, |
2231 | 0 | }); |
2232 | 0 | if (!st.ok()) { |
2233 | 0 | LOG(WARNING) << "remove delete bitmap update lock fail,table_id=" << table_id |
2234 | 0 | << ",tablet_id=" << tablet_id << ",lock_id=" << lock_id |
2235 | 0 | << ",st=" << st.to_string(); |
2236 | 0 | } |
2237 | 0 | } |
2238 | | |
2239 | 0 | void CloudMetaMgr::check_table_size_correctness(RowsetMeta& rs_meta) { |
2240 | 0 | if (!config::enable_table_size_correctness_check) { |
2241 | 0 | return; |
2242 | 0 | } |
2243 | 0 | int64_t total_segment_size = get_segment_file_size(rs_meta); |
2244 | 0 | int64_t total_inverted_index_size = get_inverted_index_file_size(rs_meta); |
2245 | 0 | if (rs_meta.data_disk_size() != total_segment_size || |
2246 | 0 | rs_meta.index_disk_size() != total_inverted_index_size || |
2247 | 0 | rs_meta.data_disk_size() + rs_meta.index_disk_size() != rs_meta.total_disk_size()) { |
2248 | 0 | LOG(WARNING) << "[Cloud table table size check failed]:" |
2249 | 0 | << " tablet id: " << rs_meta.tablet_id() |
2250 | 0 | << ", rowset id:" << rs_meta.rowset_id() |
2251 | 0 | << ", rowset data disk size:" << rs_meta.data_disk_size() |
2252 | 0 | << ", rowset real data disk size:" << total_segment_size |
2253 | 0 | << ", rowset index disk size:" << rs_meta.index_disk_size() |
2254 | 0 | << ", rowset real index disk size:" << total_inverted_index_size |
2255 | 0 | << ", rowset total disk size:" << rs_meta.total_disk_size() |
2256 | 0 | << ", rowset segment path:" |
2257 | 0 | << StorageResource().remote_segment_path(rs_meta.tablet_id(), |
2258 | 0 | rs_meta.rowset_id().to_string(), 0); |
2259 | 0 | DCHECK(false); |
2260 | 0 | } |
2261 | 0 | } |
2262 | | |
2263 | 0 | int64_t CloudMetaMgr::get_segment_file_size(RowsetMeta& rs_meta) { |
2264 | 0 | int64_t total_segment_size = 0; |
2265 | 0 | const auto fs = rs_meta.fs(); |
2266 | 0 | if (!fs) { |
2267 | 0 | LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id(); |
2268 | 0 | } |
2269 | 0 | for (int64_t seg_id = 0; seg_id < rs_meta.num_segments(); seg_id++) { |
2270 | 0 | std::string segment_path = StorageResource().remote_segment_path( |
2271 | 0 | rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id); |
2272 | 0 | int64_t segment_file_size = 0; |
2273 | 0 | auto st = fs->file_size(segment_path, &segment_file_size); |
2274 | 0 | if (!st.ok()) { |
2275 | 0 | segment_file_size = 0; |
2276 | 0 | if (st.is<NOT_FOUND>()) { |
2277 | 0 | LOG(INFO) << "cloud table size correctness check get segment size 0 because " |
2278 | 0 | "file not exist! msg:" |
2279 | 0 | << st.msg() << ", segment path:" << segment_path; |
2280 | 0 | } else { |
2281 | 0 | LOG(WARNING) << "cloud table size correctness check get segment size failed! msg:" |
2282 | 0 | << st.msg() << ", segment path:" << segment_path; |
2283 | 0 | } |
2284 | 0 | } |
2285 | 0 | total_segment_size += segment_file_size; |
2286 | 0 | } |
2287 | 0 | return total_segment_size; |
2288 | 0 | } |
2289 | | |
2290 | 0 | int64_t CloudMetaMgr::get_inverted_index_file_size(RowsetMeta& rs_meta) { |
2291 | 0 | int64_t total_inverted_index_size = 0; |
2292 | 0 | const auto fs = rs_meta.fs(); |
2293 | 0 | if (!fs) { |
2294 | 0 | LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id(); |
2295 | 0 | } |
2296 | 0 | if (rs_meta.tablet_schema()->get_inverted_index_storage_format() == |
2297 | 0 | InvertedIndexStorageFormatPB::V1) { |
2298 | 0 | const auto& indices = rs_meta.tablet_schema()->inverted_indexes(); |
2299 | 0 | for (auto& index : indices) { |
2300 | 0 | for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) { |
2301 | 0 | std::string segment_path = StorageResource().remote_segment_path( |
2302 | 0 | rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id); |
2303 | 0 | int64_t file_size = 0; |
2304 | |
|
2305 | 0 | std::string inverted_index_file_path = |
2306 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
2307 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(segment_path), |
2308 | 0 | index->index_id(), index->get_index_suffix()); |
2309 | 0 | auto st = fs->file_size(inverted_index_file_path, &file_size); |
2310 | 0 | if (!st.ok()) { |
2311 | 0 | file_size = 0; |
2312 | 0 | if (st.is<NOT_FOUND>()) { |
2313 | 0 | LOG(INFO) << "cloud table size correctness check get inverted index v1 " |
2314 | 0 | "0 because file not exist! msg:" |
2315 | 0 | << st.msg() |
2316 | 0 | << ", inverted index path:" << inverted_index_file_path; |
2317 | 0 | } else { |
2318 | 0 | LOG(WARNING) |
2319 | 0 | << "cloud table size correctness check get inverted index v1 " |
2320 | 0 | "size failed! msg:" |
2321 | 0 | << st.msg() << ", inverted index path:" << inverted_index_file_path; |
2322 | 0 | } |
2323 | 0 | } |
2324 | 0 | total_inverted_index_size += file_size; |
2325 | 0 | } |
2326 | 0 | } |
2327 | 0 | } else { |
2328 | 0 | for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) { |
2329 | 0 | int64_t file_size = 0; |
2330 | 0 | std::string segment_path = StorageResource().remote_segment_path( |
2331 | 0 | rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id); |
2332 | |
|
2333 | 0 | std::string inverted_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2( |
2334 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)); |
2335 | 0 | auto st = fs->file_size(inverted_index_file_path, &file_size); |
2336 | 0 | if (!st.ok()) { |
2337 | 0 | file_size = 0; |
2338 | 0 | if (st.is<NOT_FOUND>()) { |
2339 | 0 | LOG(INFO) << "cloud table size correctness check get inverted index v2 " |
2340 | 0 | "0 because file not exist! msg:" |
2341 | 0 | << st.msg() << ", inverted index path:" << inverted_index_file_path; |
2342 | 0 | } else { |
2343 | 0 | LOG(WARNING) << "cloud table size correctness check get inverted index v2 " |
2344 | 0 | "size failed! msg:" |
2345 | 0 | << st.msg() |
2346 | 0 | << ", inverted index path:" << inverted_index_file_path; |
2347 | 0 | } |
2348 | 0 | } |
2349 | 0 | total_inverted_index_size += file_size; |
2350 | 0 | } |
2351 | 0 | } |
2352 | 0 | return total_inverted_index_size; |
2353 | 0 | } |
2354 | | |
2355 | | Status CloudMetaMgr::fill_version_holes(CloudTablet* tablet, int64_t max_version, |
2356 | 9 | std::unique_lock<std::shared_mutex>& wlock) { |
2357 | 9 | if (max_version <= 0) { |
2358 | 2 | return Status::OK(); |
2359 | 2 | } |
2360 | | |
2361 | 7 | Versions existing_versions; |
2362 | 19 | for (const auto& [_, rs] : tablet->tablet_meta()->all_rs_metas()) { |
2363 | 19 | existing_versions.emplace_back(rs->version()); |
2364 | 19 | } |
2365 | | |
2366 | | // If there are no existing versions, it may be a new tablet for restore, so skip filling holes. |
2367 | 7 | if (existing_versions.empty()) { |
2368 | 1 | return Status::OK(); |
2369 | 1 | } |
2370 | | |
2371 | 6 | std::vector<RowsetSharedPtr> hole_rowsets; |
2372 | | // sort the existing versions in ascending order |
2373 | 6 | std::sort(existing_versions.begin(), existing_versions.end(), |
2374 | 13 | [](const Version& a, const Version& b) { |
2375 | | // simple because 2 versions are certainly not overlapping |
2376 | 13 | return a.first < b.first; |
2377 | 13 | }); |
2378 | | |
2379 | | // During schema change, get_tablet operations on new tablets trigger sync_tablet_rowsets which calls |
2380 | | // fill_version_holes. For schema change tablets (TABLET_NOTREADY state), we selectively skip hole |
2381 | | // filling for versions <= alter_version to prevent: |
2382 | | // 1. Abnormal compaction score calculations for schema change tablets |
2383 | | // 2. Unexpected -235 errors during load operations |
2384 | | // This allows schema change to proceed normally while still permitting hole filling for versions |
2385 | | // beyond the alter_version threshold. |
2386 | 6 | bool is_schema_change_tablet = tablet->tablet_state() == TABLET_NOTREADY; |
2387 | 6 | if (is_schema_change_tablet && tablet->alter_version() <= 1) { |
2388 | 0 | LOG(INFO) << "Skip version hole filling for new schema change tablet " |
2389 | 0 | << tablet->tablet_id() << " with alter_version " << tablet->alter_version(); |
2390 | 0 | return Status::OK(); |
2391 | 0 | } |
2392 | | |
2393 | 6 | int64_t last_version = -1; |
2394 | 19 | for (const Version& version : existing_versions) { |
2395 | 19 | VLOG_NOTICE << "Existing version for tablet " << tablet->tablet_id() << ": [" |
2396 | 0 | << version.first << ", " << version.second << "]"; |
2397 | | // missing versions are those that are not in the existing_versions |
2398 | 19 | if (version.first > last_version + 1) { |
2399 | | // there is a hole between versions |
2400 | 6 | auto prev_non_hole_rowset = tablet->get_rowset_by_version(version); |
2401 | 16 | for (int64_t ver = last_version + 1; ver < version.first; ++ver) { |
2402 | | // Skip hole filling for versions <= alter_version during schema change |
2403 | 10 | if (is_schema_change_tablet && ver <= tablet->alter_version()) { |
2404 | 0 | continue; |
2405 | 0 | } |
2406 | 10 | RowsetSharedPtr hole_rowset; |
2407 | 10 | RETURN_IF_ERROR(create_empty_rowset_for_hole( |
2408 | 10 | tablet, ver, prev_non_hole_rowset->rowset_meta(), &hole_rowset)); |
2409 | 10 | hole_rowsets.push_back(hole_rowset); |
2410 | 10 | } |
2411 | 6 | LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1 |
2412 | 6 | << " to " << version.first - 1 << " for tablet " << tablet->tablet_id() |
2413 | 6 | << (is_schema_change_tablet |
2414 | 6 | ? (", schema change tablet skipped filling versions <= " + |
2415 | 0 | std::to_string(tablet->alter_version())) |
2416 | 6 | : ""); |
2417 | 6 | } |
2418 | 19 | last_version = version.second; |
2419 | 19 | } |
2420 | | |
2421 | 6 | if (last_version + 1 <= max_version) { |
2422 | 2 | LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1 << " to " |
2423 | 2 | << max_version << " for tablet " << tablet->tablet_id() |
2424 | 2 | << (is_schema_change_tablet |
2425 | 2 | ? (", schema change tablet skipped filling versions <= " + |
2426 | 0 | std::to_string(tablet->alter_version())) |
2427 | 2 | : ""); |
2428 | | // there is a hole after the last existing version |
2429 | 7 | for (; last_version + 1 <= max_version; ++last_version) { |
2430 | | // Skip hole filling for versions <= alter_version during schema change |
2431 | 5 | if (is_schema_change_tablet && last_version + 1 <= tablet->alter_version()) { |
2432 | 0 | continue; |
2433 | 0 | } |
2434 | 5 | RowsetSharedPtr hole_rowset; |
2435 | 5 | auto prev_non_hole_rowset = tablet->get_rowset_by_version(existing_versions.back()); |
2436 | 5 | RETURN_IF_ERROR(create_empty_rowset_for_hole( |
2437 | 5 | tablet, last_version + 1, prev_non_hole_rowset->rowset_meta(), &hole_rowset)); |
2438 | 5 | hole_rowsets.push_back(hole_rowset); |
2439 | 5 | } |
2440 | 2 | } |
2441 | | |
2442 | 6 | if (!hole_rowsets.empty()) { |
2443 | 5 | size_t hole_count = hole_rowsets.size(); |
2444 | 5 | tablet->add_rowsets(std::move(hole_rowsets), false, wlock, false); |
2445 | 5 | g_cloud_version_hole_filled_count << hole_count; |
2446 | 5 | } |
2447 | 6 | return Status::OK(); |
2448 | 6 | } |
2449 | | |
2450 | | Status CloudMetaMgr::create_empty_rowset_for_hole(CloudTablet* tablet, int64_t version, |
2451 | | RowsetMetaSharedPtr prev_rowset_meta, |
2452 | 19 | RowsetSharedPtr* rowset) { |
2453 | | // Create a RowsetMeta for the empty rowset |
2454 | 19 | auto rs_meta = std::make_shared<RowsetMeta>(); |
2455 | | |
2456 | | // Generate a deterministic rowset ID for the hole (same tablet_id + version = same rowset_id) |
2457 | 19 | RowsetId hole_rowset_id; |
2458 | 19 | hole_rowset_id.init(2, 0, tablet->tablet_id(), version); |
2459 | 19 | rs_meta->set_rowset_id(hole_rowset_id); |
2460 | | |
2461 | | // Generate a deterministic load_id for the hole rowset (same tablet_id + version = same load_id) |
2462 | 19 | PUniqueId load_id; |
2463 | 19 | load_id.set_hi(tablet->tablet_id()); |
2464 | 19 | load_id.set_lo(version); |
2465 | 19 | rs_meta->set_load_id(load_id); |
2466 | | |
2467 | | // Copy schema and other metadata from template |
2468 | 19 | rs_meta->set_tablet_schema(prev_rowset_meta->tablet_schema()); |
2469 | 19 | rs_meta->set_rowset_type(prev_rowset_meta->rowset_type()); |
2470 | 19 | rs_meta->set_tablet_schema_hash(prev_rowset_meta->tablet_schema_hash()); |
2471 | 19 | rs_meta->set_resource_id(prev_rowset_meta->resource_id()); |
2472 | | |
2473 | | // Basic tablet information |
2474 | 19 | rs_meta->set_tablet_id(tablet->tablet_id()); |
2475 | 19 | rs_meta->set_index_id(tablet->index_id()); |
2476 | 19 | rs_meta->set_partition_id(tablet->partition_id()); |
2477 | 19 | rs_meta->set_tablet_uid(tablet->tablet_uid()); |
2478 | 19 | rs_meta->set_version(Version(version, version)); |
2479 | 19 | rs_meta->set_txn_id(version); |
2480 | | |
2481 | 19 | rs_meta->set_num_rows(0); |
2482 | 19 | rs_meta->set_total_disk_size(0); |
2483 | 19 | rs_meta->set_data_disk_size(0); |
2484 | 19 | rs_meta->set_index_disk_size(0); |
2485 | 19 | rs_meta->set_empty(true); |
2486 | 19 | rs_meta->set_num_segments(0); |
2487 | 19 | rs_meta->set_segments_overlap(NONOVERLAPPING); |
2488 | 19 | rs_meta->set_rowset_state(VISIBLE); |
2489 | 19 | rs_meta->set_creation_time(UnixSeconds()); |
2490 | 19 | rs_meta->set_newest_write_timestamp(UnixSeconds()); |
2491 | | |
2492 | 19 | Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, rowset); |
2493 | 19 | if (!s.ok()) { |
2494 | 0 | LOG_WARNING("Failed to create empty rowset for hole") |
2495 | 0 | .tag("tablet_id", tablet->tablet_id()) |
2496 | 0 | .tag("version", version) |
2497 | 0 | .error(s); |
2498 | 0 | return s; |
2499 | 0 | } |
2500 | 19 | (*rowset)->set_hole_rowset(true); |
2501 | | |
2502 | 19 | return Status::OK(); |
2503 | 19 | } |
2504 | | |
2505 | 0 | Status CloudMetaMgr::list_snapshot(std::vector<SnapshotInfoPB>& snapshots) { |
2506 | 0 | ListSnapshotRequest req; |
2507 | 0 | ListSnapshotResponse res; |
2508 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
2509 | 0 | req.set_include_aborted(true); |
2510 | 0 | RETURN_IF_ERROR(retry_rpc(MetaServiceRPC::LIST_SNAPSHOTS, req, &res, |
2511 | 0 | &MetaService_Stub::list_snapshot, |
2512 | 0 | { |
2513 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
2514 | 0 | .backpressure_handler = ms_backpressure_handler_, |
2515 | 0 | })); |
2516 | 0 | for (auto& snapshot : res.snapshots()) { |
2517 | 0 | snapshots.emplace_back(snapshot); |
2518 | 0 | } |
2519 | 0 | return Status::OK(); |
2520 | 0 | } |
2521 | | |
2522 | | Status CloudMetaMgr::get_snapshot_properties(SnapshotSwitchStatus& switch_status, |
2523 | | int64_t& max_reserved_snapshots, |
2524 | 0 | int64_t& snapshot_interval_seconds) { |
2525 | 0 | GetInstanceRequest req; |
2526 | 0 | GetInstanceResponse res; |
2527 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
2528 | 0 | RETURN_IF_ERROR(retry_rpc(MetaServiceRPC::GET_INSTANCE, req, &res, |
2529 | 0 | &MetaService_Stub::get_instance, |
2530 | 0 | { |
2531 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
2532 | 0 | .backpressure_handler = ms_backpressure_handler_, |
2533 | 0 | })); |
2534 | 0 | switch_status = res.instance().has_snapshot_switch_status() |
2535 | 0 | ? res.instance().snapshot_switch_status() |
2536 | 0 | : SnapshotSwitchStatus::SNAPSHOT_SWITCH_DISABLED; |
2537 | 0 | max_reserved_snapshots = |
2538 | 0 | res.instance().has_max_reserved_snapshot() ? res.instance().max_reserved_snapshot() : 0; |
2539 | 0 | snapshot_interval_seconds = res.instance().has_snapshot_interval_seconds() |
2540 | 0 | ? res.instance().snapshot_interval_seconds() |
2541 | 0 | : 3600; |
2542 | 0 | return Status::OK(); |
2543 | 0 | } |
2544 | | |
2545 | | Status CloudMetaMgr::update_packed_file_info(const std::string& packed_file_path, |
2546 | | const cloud::PackedFileInfoPB& packed_file_info, |
2547 | 0 | int64_t table_id) { |
2548 | 0 | VLOG_DEBUG << "Updating meta service for packed file: " << packed_file_path << " with " |
2549 | 0 | << packed_file_info.total_slice_num() << " small files" |
2550 | 0 | << ", total bytes: " << packed_file_info.total_slice_bytes(); |
2551 | | |
2552 | | // Create request |
2553 | 0 | cloud::UpdatePackedFileInfoRequest req; |
2554 | 0 | cloud::UpdatePackedFileInfoResponse resp; |
2555 | | |
2556 | | // Set required fields |
2557 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
2558 | 0 | req.set_packed_file_path(packed_file_path); |
2559 | 0 | *req.mutable_packed_file_info() = packed_file_info; |
2560 | | |
2561 | | // Make RPC call using retry pattern |
2562 | 0 | return retry_rpc(MetaServiceRPC::UPDATE_PACKED_FILE_INFO, req, &resp, |
2563 | 0 | &cloud::MetaService_Stub::update_packed_file_info, |
2564 | 0 | { |
2565 | 0 | .host_limiters = host_level_ms_rpc_rate_limiters_, |
2566 | 0 | .backpressure_handler = ms_backpressure_handler_, |
2567 | 0 | .table_id = table_id, |
2568 | 0 | }); |
2569 | 0 | } |
2570 | | |
2571 | | Status CloudMetaMgr::get_cluster_status( |
2572 | | std::unordered_map<std::string, std::pair<int32_t, int64_t>>* result, |
2573 | 0 | std::string* my_cluster_id) { |
2574 | 0 | GetClusterStatusRequest req; |
2575 | 0 | GetClusterStatusResponse resp; |
2576 | 0 | req.add_cloud_unique_ids(config::cloud_unique_id); |
2577 | |
|
2578 | 0 | Status s = retry_rpc(MetaServiceRPC::GET_CLUSTER_STATUS, req, &resp, |
2579 | 0 | &MetaService_Stub::get_cluster_status, |
2580 | 0 | {.host_limiters = host_level_ms_rpc_rate_limiters_}); |
2581 | 0 | if (!s.ok()) { |
2582 | 0 | return s; |
2583 | 0 | } |
2584 | | |
2585 | 0 | result->clear(); |
2586 | 0 | for (const auto& detail : resp.details()) { |
2587 | 0 | for (const auto& cluster : detail.clusters()) { |
2588 | | // Store cluster status and mtime (mtime is in seconds from MS, convert to ms). |
2589 | | // If mtime is not set, use current time as a conservative default |
2590 | | // to avoid immediate takeover due to elapsed being huge. |
2591 | 0 | int64_t mtime_ms = cluster.has_mtime() ? cluster.mtime() * 1000 : UnixMillis(); |
2592 | 0 | (*result)[cluster.cluster_id()] = {static_cast<int32_t>(cluster.cluster_status()), |
2593 | 0 | mtime_ms}; |
2594 | 0 | } |
2595 | 0 | } |
2596 | |
|
2597 | 0 | if (my_cluster_id && resp.has_requester_cluster_id()) { |
2598 | 0 | *my_cluster_id = resp.requester_cluster_id(); |
2599 | 0 | } |
2600 | |
|
2601 | 0 | return Status::OK(); |
2602 | 0 | } |
2603 | | |
2604 | | } // namespace doris::cloud |