be/src/cloud/cloud_meta_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | #include "cloud/cloud_meta_mgr.h" |
18 | | |
19 | | #include <brpc/channel.h> |
20 | | #include <brpc/controller.h> |
21 | | #include <brpc/errno.pb.h> |
22 | | #include <bthread/bthread.h> |
23 | | #include <bthread/condition_variable.h> |
24 | | #include <bthread/mutex.h> |
25 | | #include <gen_cpp/FrontendService.h> |
26 | | #include <gen_cpp/HeartbeatService_types.h> |
27 | | #include <gen_cpp/PlanNodes_types.h> |
28 | | #include <gen_cpp/Types_types.h> |
29 | | #include <gen_cpp/cloud.pb.h> |
30 | | #include <gen_cpp/olap_file.pb.h> |
31 | | #include <glog/logging.h> |
32 | | |
33 | | #include <algorithm> |
34 | | #include <atomic> |
35 | | #include <chrono> |
36 | | #include <cstdint> |
37 | | #include <memory> |
38 | | #include <mutex> |
39 | | #include <random> |
40 | | #include <shared_mutex> |
41 | | #include <string> |
42 | | #include <type_traits> |
43 | | #include <vector> |
44 | | |
45 | | #include "cloud/cloud_storage_engine.h" |
46 | | #include "cloud/cloud_tablet.h" |
47 | | #include "cloud/cloud_warm_up_manager.h" |
48 | | #include "cloud/config.h" |
49 | | #include "cloud/delete_bitmap_file_reader.h" |
50 | | #include "cloud/delete_bitmap_file_writer.h" |
51 | | #include "cloud/pb_convert.h" |
52 | | #include "common/config.h" |
53 | | #include "common/logging.h" |
54 | | #include "common/status.h" |
55 | | #include "cpp/sync_point.h" |
56 | | #include "io/fs/obj_storage_client.h" |
57 | | #include "load/stream_load/stream_load_context.h" |
58 | | #include "runtime/exec_env.h" |
59 | | #include "storage/olap_common.h" |
60 | | #include "storage/rowset/rowset.h" |
61 | | #include "storage/rowset/rowset_factory.h" |
62 | | #include "storage/rowset/rowset_fwd.h" |
63 | | #include "storage/storage_engine.h" |
64 | | #include "storage/tablet/tablet_meta.h" |
65 | | #include "util/client_cache.h" |
66 | | #include "util/network_util.h" |
67 | | #include "util/s3_util.h" |
68 | | #include "util/thrift_rpc_helper.h" |
69 | | |
70 | | namespace doris::cloud { |
71 | | #include "common/compile_check_begin.h" |
72 | | using namespace ErrorCode; |
73 | | |
74 | 1.82M | void* run_bthread_work(void* arg) { |
75 | 1.82M | auto* f = reinterpret_cast<std::function<void()>*>(arg); |
76 | 1.82M | (*f)(); |
77 | 1.82M | delete f; |
78 | 1.82M | return nullptr; |
79 | 1.82M | } |
80 | | |
81 | 385k | Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency) { |
82 | 385k | if (tasks.empty()) { |
83 | 6.62k | return Status::OK(); |
84 | 6.62k | } |
85 | | |
86 | 379k | bthread::Mutex lock; |
87 | 379k | bthread::ConditionVariable cond; |
88 | 379k | Status status; // Guard by lock |
89 | 379k | int count = 0; // Guard by lock |
90 | | |
91 | 1.50M | for (const auto& task : tasks) { |
92 | 1.50M | { |
93 | 1.50M | std::unique_lock lk(lock); |
94 | | // Wait until there are available slots |
95 | 1.56M | while (status.ok() && count >= concurrency) { |
96 | 55.6k | cond.wait(lk); |
97 | 55.6k | } |
98 | 1.50M | if (!status.ok()) { |
99 | 2 | break; |
100 | 2 | } |
101 | | |
102 | | // Increase running task count |
103 | 1.50M | ++count; |
104 | 1.50M | } |
105 | | |
106 | | // dispatch task into bthreads |
107 | 1.50M | auto* fn = new std::function<void()>([&, &task = task] { |
108 | 1.50M | auto st = task(); |
109 | 1.50M | { |
110 | 1.50M | std::lock_guard lk(lock); |
111 | 1.50M | --count; |
112 | 1.50M | if (!st.ok()) { |
113 | 2 | std::swap(st, status); |
114 | 2 | } |
115 | 1.50M | cond.notify_one(); |
116 | 1.50M | } |
117 | 1.50M | }); |
118 | | |
119 | 1.50M | bthread_t bthread_id; |
120 | 1.50M | if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, fn) != 0) { |
121 | 0 | run_bthread_work(fn); |
122 | 0 | } |
123 | 1.50M | } |
124 | | |
125 | | // Wait until all running tasks have done |
126 | 379k | { |
127 | 379k | std::unique_lock lk(lock); |
128 | 1.01M | while (count > 0) { |
129 | 632k | cond.wait(lk); |
130 | 632k | } |
131 | 379k | } |
132 | | |
133 | 379k | return status; |
134 | 385k | } |
135 | | |
136 | | Status bthread_fork_join(std::vector<std::function<Status()>>&& tasks, int concurrency, |
137 | 318k | std::future<Status>* fut) { |
138 | | // std::function will cause `copy`, we need to use heap memory to avoid copy ctor called |
139 | 318k | auto prom = std::make_shared<std::promise<Status>>(); |
140 | 318k | *fut = prom->get_future(); |
141 | 318k | std::function<void()>* fn = new std::function<void()>( |
142 | 324k | [tasks = std::move(tasks), concurrency, p = std::move(prom)]() mutable { |
143 | 324k | p->set_value(bthread_fork_join(tasks, concurrency)); |
144 | 324k | }); |
145 | | |
146 | 318k | bthread_t bthread_id; |
147 | 318k | if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, fn) != 0) { |
148 | 0 | delete fn; |
149 | 0 | return Status::InternalError<false>("failed to create bthread"); |
150 | 0 | } |
151 | 318k | return Status::OK(); |
152 | 318k | } |
153 | | |
154 | | namespace { |
155 | | constexpr int kBrpcRetryTimes = 3; |
156 | | |
157 | | bvar::LatencyRecorder _get_rowset_latency("doris_cloud_meta_mgr_get_rowset"); |
158 | | bvar::LatencyRecorder g_cloud_commit_txn_resp_redirect_latency("cloud_table_stats_report_latency"); |
159 | | bvar::Adder<uint64_t> g_cloud_meta_mgr_rpc_timeout_count("cloud_meta_mgr_rpc_timeout_count"); |
160 | | bvar::Window<bvar::Adder<uint64_t>> g_cloud_ms_rpc_timeout_count_window( |
161 | | "cloud_meta_mgr_rpc_timeout_qps", &g_cloud_meta_mgr_rpc_timeout_count, 30); |
162 | | bvar::LatencyRecorder g_cloud_be_mow_get_dbm_lock_backoff_sleep_time( |
163 | | "cloud_be_mow_get_dbm_lock_backoff_sleep_time"); |
164 | | bvar::Adder<uint64_t> g_cloud_version_hole_filled_count("cloud_version_hole_filled_count"); |
165 | | |
166 | | class MetaServiceProxy { |
167 | | public: |
168 | 770k | static Status get_proxy(MetaServiceProxy** proxy) { |
169 | | // The 'stub' is a useless parameter, added only to reuse the `get_pooled_client` function. |
170 | 770k | std::shared_ptr<MetaService_Stub> stub; |
171 | 770k | return get_pooled_client(&stub, proxy); |
172 | 770k | } |
173 | | |
174 | 0 | void set_unhealthy() { |
175 | 0 | std::unique_lock lock(_mutex); |
176 | 0 | maybe_unhealthy = true; |
177 | 0 | } |
178 | | |
179 | 1.54M | bool need_reconn(long now) { |
180 | 1.54M | return maybe_unhealthy && ((now - last_reconn_time_ms.front()) > |
181 | 0 | config::meta_service_rpc_reconnect_interval_ms); |
182 | 1.54M | } |
183 | | |
184 | 1.54M | Status get(std::shared_ptr<MetaService_Stub>* stub) { |
185 | 1.54M | using namespace std::chrono; |
186 | | |
187 | 1.54M | auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
188 | 1.54M | { |
189 | 1.54M | std::shared_lock lock(_mutex); |
190 | 1.54M | if (_deadline_ms >= now && !is_idle_timeout(now) && !need_reconn(now)) { |
191 | 1.54M | _last_access_at_ms.store(now, std::memory_order_relaxed); |
192 | 1.54M | *stub = _stub; |
193 | 1.54M | return Status::OK(); |
194 | 1.54M | } |
195 | 1.54M | } |
196 | | |
197 | 18.4E | auto channel = std::make_unique<brpc::Channel>(); |
198 | 18.4E | Status s = init_channel(channel.get()); |
199 | 18.4E | if (!s.ok()) [[unlikely]] { |
200 | 0 | return s; |
201 | 0 | } |
202 | | |
203 | 18.4E | *stub = std::make_shared<MetaService_Stub>(channel.release(), |
204 | 18.4E | google::protobuf::Service::STUB_OWNS_CHANNEL); |
205 | | |
206 | 18.4E | long deadline = now; |
207 | | // connection age only works without list endpoint. |
208 | 18.4E | if (config::meta_service_connection_age_base_seconds > 0) { |
209 | 1.63k | std::default_random_engine rng(static_cast<uint32_t>(now)); |
210 | 1.63k | std::uniform_int_distribution<> uni( |
211 | 1.63k | config::meta_service_connection_age_base_seconds, |
212 | 1.63k | config::meta_service_connection_age_base_seconds * 2); |
213 | 1.63k | deadline = now + duration_cast<milliseconds>(seconds(uni(rng))).count(); |
214 | 1.63k | } |
215 | | |
216 | | // Last one WIN |
217 | 18.4E | std::unique_lock lock(_mutex); |
218 | 18.4E | _last_access_at_ms.store(now, std::memory_order_relaxed); |
219 | 18.4E | _deadline_ms = deadline; |
220 | 18.4E | _stub = *stub; |
221 | | |
222 | 18.4E | last_reconn_time_ms.push(now); |
223 | 18.4E | last_reconn_time_ms.pop(); |
224 | 18.4E | maybe_unhealthy = false; |
225 | | |
226 | 18.4E | return Status::OK(); |
227 | 18.4E | } |
228 | | |
229 | | private: |
230 | 1.54M | static bool is_meta_service_endpoint_list() { |
231 | 1.54M | return config::meta_service_endpoint.find(',') != std::string::npos; |
232 | 1.54M | } |
233 | | |
234 | | /** |
235 | | * This function initializes a pool of `MetaServiceProxy` objects and selects one using |
236 | | * round-robin. It returns a client stub via the selected proxy. |
237 | | * |
238 | | * @param stub A pointer to a shared pointer of `MetaService_Stub` to be retrieved. |
239 | | * @param proxy (Optional) A pointer to store the selected `MetaServiceProxy`. |
240 | | * |
241 | | * @return Status Returns `Status::OK()` on success or an error status on failure. |
242 | | */ |
243 | | static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub, |
244 | 774k | MetaServiceProxy** proxy) { |
245 | 774k | static std::once_flag proxies_flag; |
246 | 774k | static size_t num_proxies = 1; |
247 | 774k | static std::atomic<size_t> index(0); |
248 | 774k | static std::unique_ptr<MetaServiceProxy[]> proxies; |
249 | 774k | if (config::meta_service_endpoint.empty()) { |
250 | 21 | return Status::InvalidArgument( |
251 | 21 | "Meta service endpoint is empty. Please configure manually or wait for " |
252 | 21 | "heartbeat to obtain."); |
253 | 21 | } |
254 | 774k | std::call_once( |
255 | 774k | proxies_flag, +[]() { |
256 | 1 | if (config::meta_service_connection_pooled) { |
257 | 1 | num_proxies = config::meta_service_connection_pool_size; |
258 | 1 | } |
259 | 1 | proxies = std::make_unique<MetaServiceProxy[]>(num_proxies); |
260 | 1 | }); |
261 | | |
262 | 774k | for (size_t i = 0; i + 1 < num_proxies; ++i) { |
263 | 773k | size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies; |
264 | 773k | Status s = proxies[next_index].get(stub); |
265 | 775k | if (proxy != nullptr) { |
266 | 775k | *proxy = &(proxies[next_index]); |
267 | 775k | } |
268 | 773k | if (s.ok()) return Status::OK(); |
269 | 773k | } |
270 | | |
271 | 938 | size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies; |
272 | 938 | if (proxy != nullptr) { |
273 | 0 | *proxy = &(proxies[next_index]); |
274 | 0 | } |
275 | 938 | return proxies[next_index].get(stub); |
276 | 774k | } |
277 | | |
278 | 1.62k | static Status init_channel(brpc::Channel* channel) { |
279 | 1.62k | static std::atomic<size_t> index = 1; |
280 | | |
281 | 1.62k | const char* load_balancer_name = nullptr; |
282 | 1.62k | std::string endpoint; |
283 | 1.62k | if (is_meta_service_endpoint_list()) { |
284 | 0 | endpoint = fmt::format("list://{}", config::meta_service_endpoint); |
285 | 0 | load_balancer_name = "random"; |
286 | 1.62k | } else { |
287 | 1.62k | std::string ip; |
288 | 1.62k | uint16_t port; |
289 | 1.62k | Status s = get_meta_service_ip_and_port(&ip, &port); |
290 | 1.62k | if (!s.ok()) { |
291 | 0 | LOG(WARNING) << "fail to get meta service ip and port: " << s; |
292 | 0 | return s; |
293 | 0 | } |
294 | | |
295 | 1.62k | endpoint = get_host_port(ip, port); |
296 | 1.62k | } |
297 | | |
298 | 1.62k | brpc::ChannelOptions options; |
299 | 1.62k | options.connection_group = |
300 | 1.62k | fmt::format("ms_{}", index.fetch_add(1, std::memory_order_relaxed)); |
301 | 1.62k | if (channel->Init(endpoint.c_str(), load_balancer_name, &options) != 0) { |
302 | 0 | return Status::InvalidArgument("failed to init brpc channel, endpoint: {}", endpoint); |
303 | 0 | } |
304 | 1.62k | return Status::OK(); |
305 | 1.62k | } |
306 | | |
307 | 1.61k | static Status get_meta_service_ip_and_port(std::string* ip, uint16_t* port) { |
308 | 1.61k | std::string parsed_host; |
309 | 1.61k | if (!parse_endpoint(config::meta_service_endpoint, &parsed_host, port)) { |
310 | 0 | return Status::InvalidArgument("invalid meta service endpoint: {}", |
311 | 0 | config::meta_service_endpoint); |
312 | 0 | } |
313 | 1.62k | if (is_valid_ip(parsed_host)) { |
314 | 1.62k | *ip = std::move(parsed_host); |
315 | 1.62k | return Status::OK(); |
316 | 1.62k | } |
317 | 18.4E | return hostname_to_ip(parsed_host, *ip); |
318 | 1.61k | } |
319 | | |
320 | 1.54M | bool is_idle_timeout(long now) { |
321 | 1.54M | auto idle_timeout_ms = config::meta_service_idle_connection_timeout_ms; |
322 | | // idle timeout only works without list endpoint. |
323 | 1.54M | return !is_meta_service_endpoint_list() && idle_timeout_ms > 0 && |
324 | 1.54M | _last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now; |
325 | 1.54M | } |
326 | | |
327 | | std::shared_mutex _mutex; |
328 | | std::atomic<long> _last_access_at_ms {0}; |
329 | | long _deadline_ms {0}; |
330 | | std::shared_ptr<MetaService_Stub> _stub; |
331 | | |
332 | | std::queue<long> last_reconn_time_ms {std::deque<long> {0, 0, 0}}; |
333 | | bool maybe_unhealthy = false; |
334 | | }; |
335 | | |
336 | | template <typename T, typename... Ts> |
337 | | struct is_any : std::disjunction<std::is_same<T, Ts>...> {}; |
338 | | |
339 | | template <typename T, typename... Ts> |
340 | | constexpr bool is_any_v = is_any<T, Ts...>::value; |
341 | | |
342 | | template <typename Request> |
343 | 528 | static std::string debug_info(const Request& req) { |
344 | 528 | if constexpr (is_any_v<Request, CommitTxnRequest, AbortTxnRequest, PrecommitTxnRequest>) { |
345 | 0 | return fmt::format(" txn_id={}", req.txn_id()); |
346 | 0 | } else if constexpr (is_any_v<Request, StartTabletJobRequest, FinishTabletJobRequest>) { |
347 | 0 | return fmt::format(" tablet_id={}", req.job().idx().tablet_id()); |
348 | 0 | } else if constexpr (is_any_v<Request, UpdateDeleteBitmapRequest>) { |
349 | 0 | return fmt::format(" tablet_id={}, lock_id={}", req.tablet_id(), req.lock_id()); |
350 | 528 | } else if constexpr (is_any_v<Request, GetDeleteBitmapUpdateLockRequest>) { |
351 | 528 | return fmt::format(" table_id={}, lock_id={}", req.table_id(), req.lock_id()); |
352 | 528 | } else if constexpr (is_any_v<Request, GetTabletRequest>) { |
353 | 0 | return fmt::format(" tablet_id={}", req.tablet_id()); |
354 | | } else if constexpr (is_any_v<Request, GetObjStoreInfoRequest, ListSnapshotRequest, |
355 | 0 | GetInstanceRequest, GetClusterStatusRequest>) { |
356 | 0 | return ""; |
357 | 0 | } else if constexpr (is_any_v<Request, CreateRowsetRequest>) { |
358 | 0 | return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id()); |
359 | | } else if constexpr (is_any_v<Request, RemoveDeleteBitmapRequest>) { |
360 | | return fmt::format(" tablet_id={}", req.tablet_id()); |
361 | 0 | } else if constexpr (is_any_v<Request, RemoveDeleteBitmapUpdateLockRequest>) { |
362 | 0 | return fmt::format(" table_id={}, tablet_id={}, lock_id={}", req.table_id(), |
363 | 0 | req.tablet_id(), req.lock_id()); |
364 | 0 | } else if constexpr (is_any_v<Request, GetDeleteBitmapRequest>) { |
365 | 0 | return fmt::format(" tablet_id={}", req.tablet_id()); |
366 | | } else if constexpr (is_any_v<Request, GetSchemaDictRequest>) { |
367 | | return fmt::format(" index_id={}", req.index_id()); |
368 | 0 | } else if constexpr (is_any_v<Request, RestoreJobRequest>) { |
369 | 0 | return fmt::format(" tablet_id={}", req.tablet_id()); |
370 | 0 | } else if constexpr (is_any_v<Request, UpdatePackedFileInfoRequest>) { |
371 | 0 | return fmt::format(" packed_file_path={}", req.packed_file_path()); |
372 | | } else { |
373 | | static_assert(!sizeof(Request)); |
374 | | } |
375 | 528 | } Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_16GetTabletRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_22GetDeleteBitmapRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_19CreateRowsetRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_16CommitTxnRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_15AbortTxnRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_19PrecommitTxnRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_17RestoreJobRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_22GetObjStoreInfoRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_21StartTabletJobRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_22FinishTabletJobRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_25UpdateDeleteBitmapRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_32GetDeleteBitmapUpdateLockRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Line | Count | Source | 343 | 528 | static std::string debug_info(const Request& req) { | 344 | | if constexpr (is_any_v<Request, CommitTxnRequest, AbortTxnRequest, PrecommitTxnRequest>) { | 345 | | return fmt::format(" txn_id={}", req.txn_id()); | 346 | | } else if constexpr (is_any_v<Request, StartTabletJobRequest, FinishTabletJobRequest>) { | 347 | | return fmt::format(" tablet_id={}", req.job().idx().tablet_id()); | 348 | | } else if constexpr (is_any_v<Request, UpdateDeleteBitmapRequest>) { | 349 | | return fmt::format(" tablet_id={}, lock_id={}", req.tablet_id(), req.lock_id()); | 350 | 528 | } else if constexpr (is_any_v<Request, GetDeleteBitmapUpdateLockRequest>) { | 351 | 528 | return fmt::format(" table_id={}, lock_id={}", req.table_id(), req.lock_id()); | 352 | | } else if constexpr (is_any_v<Request, GetTabletRequest>) { | 353 | | return fmt::format(" tablet_id={}", req.tablet_id()); | 354 | | } else if constexpr (is_any_v<Request, GetObjStoreInfoRequest, ListSnapshotRequest, | 355 | | GetInstanceRequest, GetClusterStatusRequest>) { | 356 | | return ""; | 357 | | } else if constexpr (is_any_v<Request, CreateRowsetRequest>) { | 358 | | return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id()); | 359 | | } else if constexpr (is_any_v<Request, RemoveDeleteBitmapRequest>) { | 360 | | return fmt::format(" tablet_id={}", req.tablet_id()); | 361 | | } else if constexpr (is_any_v<Request, RemoveDeleteBitmapUpdateLockRequest>) { | 362 | | return fmt::format(" table_id={}, tablet_id={}, lock_id={}", req.table_id(), | 363 | | req.tablet_id(), req.lock_id()); | 364 | | } else if constexpr (is_any_v<Request, GetDeleteBitmapRequest>) { | 365 | | return fmt::format(" tablet_id={}", req.tablet_id()); | 366 | | } else if constexpr (is_any_v<Request, GetSchemaDictRequest>) { | 367 | | return fmt::format(" index_id={}", req.index_id()); | 368 | | } else if constexpr (is_any_v<Request, RestoreJobRequest>) { | 369 | | return fmt::format(" tablet_id={}", req.tablet_id()); | 370 | | } else if constexpr (is_any_v<Request, UpdatePackedFileInfoRequest>) { | 371 | | return fmt::format(" packed_file_path={}", req.packed_file_path()); | 372 | | } else { | 373 | | static_assert(!sizeof(Request)); | 374 | | } | 375 | 528 | } |
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_35RemoveDeleteBitmapUpdateLockRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_19ListSnapshotRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_18GetInstanceRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_27UpdatePackedFileInfoRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_23GetClusterStatusRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ |
376 | | |
377 | 585k | inline std::default_random_engine make_random_engine() { |
378 | 585k | return std::default_random_engine( |
379 | 585k | static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count())); |
380 | 585k | } |
381 | | |
382 | | template <typename Request, typename Response> |
383 | | using MetaServiceMethod = void (MetaService_Stub::*)(::google::protobuf::RpcController*, |
384 | | const Request*, Response*, |
385 | | ::google::protobuf::Closure*); |
386 | | |
387 | | template <typename Request, typename Response> |
388 | | Status retry_rpc(std::string_view op_name, const Request& req, Response* res, |
389 | 584k | MetaServiceMethod<Request, Response> method) { |
390 | 584k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); |
391 | 584k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); |
392 | | |
393 | | // Applies only to the current file, and all req are non-const, but passed as const types. |
394 | 584k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); |
395 | | |
396 | 584k | int retry_times = 0; |
397 | 584k | uint32_t duration_ms = 0; |
398 | 584k | std::string error_msg; |
399 | 584k | std::default_random_engine rng = make_random_engine(); |
400 | 584k | std::uniform_int_distribution<uint32_t> u(20, 200); |
401 | 584k | std::uniform_int_distribution<uint32_t> u2(500, 1000); |
402 | 584k | MetaServiceProxy* proxy; |
403 | 584k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); |
404 | 584k | while (true) { |
405 | 584k | std::shared_ptr<MetaService_Stub> stub; |
406 | 584k | RETURN_IF_ERROR(proxy->get(&stub)); |
407 | 584k | brpc::Controller cntl; |
408 | 584k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { |
409 | 57.8k | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); |
410 | 527k | } else { |
411 | 527k | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); |
412 | 527k | } |
413 | 584k | cntl.set_max_retry(kBrpcRetryTimes); |
414 | 584k | res->Clear(); |
415 | 584k | int error_code = 0; |
416 | 584k | (stub.get()->*method)(&cntl, &req, res, nullptr); |
417 | 584k | if (cntl.Failed()) [[unlikely]] { |
418 | 0 | error_msg = cntl.ErrorText(); |
419 | 0 | error_code = cntl.ErrorCode(); |
420 | 0 | proxy->set_unhealthy(); |
421 | 585k | } else if (res->status().code() == MetaServiceCode::OK) { |
422 | 585k | return Status::OK(); |
423 | 18.4E | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { |
424 | 22 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, |
425 | 22 | res->status().msg()); |
426 | 18.4E | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { |
427 | 1.15k | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, |
428 | 1.15k | res->status().msg()); |
429 | 18.4E | } else { |
430 | 18.4E | error_msg = res->status().msg(); |
431 | 18.4E | } |
432 | | |
433 | 18.4E | if (error_code == brpc::ERPCTIMEDOUT) { |
434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; |
435 | 0 | } |
436 | | |
437 | 18.4E | ++retry_times; |
438 | 18.4E | if (retry_times > config::meta_service_rpc_retry_times || |
439 | 18.4E | (retry_times > config::meta_service_rpc_timeout_retry_times && |
440 | 0 | error_code == brpc::ERPCTIMEDOUT) || |
441 | 18.4E | (retry_times > config::meta_service_conflict_error_retry_times && |
442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { |
443 | 0 | break; |
444 | 0 | } |
445 | | |
446 | 18.4E | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); |
447 | 18.4E | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times |
448 | 18.4E | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); |
449 | 18.4E | bthread_usleep(duration_ms * 1000); |
450 | 18.4E | } |
451 | 18.4E | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); |
452 | 584k | } cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_16GetTabletRequestENS0_17GetTabletResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 337k | MetaServiceMethod<Request, Response> method) { | 390 | 337k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 337k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 337k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 337k | int retry_times = 0; | 397 | 337k | uint32_t duration_ms = 0; | 398 | 337k | std::string error_msg; | 399 | 337k | std::default_random_engine rng = make_random_engine(); | 400 | 337k | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 337k | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 337k | MetaServiceProxy* proxy; | 403 | 337k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 337k | while (true) { | 405 | 337k | std::shared_ptr<MetaService_Stub> stub; | 406 | 337k | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 337k | brpc::Controller cntl; | 408 | 337k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 337k | } else { | 411 | 337k | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 337k | } | 413 | 337k | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 337k | res->Clear(); | 415 | 337k | int error_code = 0; | 416 | 337k | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 337k | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 337k | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 337k | return Status::OK(); | 423 | 337k | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 173 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 100 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 100 | res->status().msg()); | 429 | 100 | } else { | 430 | 73 | error_msg = res->status().msg(); | 431 | 73 | } | 432 | | | 433 | 73 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | | | 437 | 73 | ++retry_times; | 438 | 73 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 73 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 73 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 73 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 73 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 73 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 73 | bthread_usleep(duration_ms * 1000); | 450 | 73 | } | 451 | 210 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 337k | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_22GetDeleteBitmapRequestENS0_23GetDeleteBitmapResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 27.4k | MetaServiceMethod<Request, Response> method) { | 390 | 27.4k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 27.4k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 27.4k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 27.4k | int retry_times = 0; | 397 | 27.4k | uint32_t duration_ms = 0; | 398 | 27.4k | std::string error_msg; | 399 | 27.4k | std::default_random_engine rng = make_random_engine(); | 400 | 27.4k | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 27.4k | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 27.4k | MetaServiceProxy* proxy; | 403 | 27.4k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 27.4k | while (true) { | 405 | 27.3k | std::shared_ptr<MetaService_Stub> stub; | 406 | 27.3k | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 27.3k | brpc::Controller cntl; | 408 | 27.4k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 27.4k | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 18.4E | } else { | 411 | 18.4E | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 18.4E | } | 413 | 27.3k | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 27.3k | res->Clear(); | 415 | 27.3k | int error_code = 0; | 416 | 27.3k | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 27.3k | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 27.4k | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 27.4k | return Status::OK(); | 423 | 18.4E | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 18.4E | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 0 | res->status().msg()); | 429 | 18.4E | } else { | 430 | 18.4E | error_msg = res->status().msg(); | 431 | 18.4E | } | 432 | | | 433 | 18.4E | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | | | 437 | 18.4E | ++retry_times; | 438 | 18.4E | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 18.4E | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 18.4E | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 18.4E | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 18.4E | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 18.4E | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 18.4E | bthread_usleep(duration_ms * 1000); | 450 | 18.4E | } | 451 | 18.4E | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 27.4k | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_19CreateRowsetRequestENS0_20CreateRowsetResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 148k | MetaServiceMethod<Request, Response> method) { | 390 | 148k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 148k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 148k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 148k | int retry_times = 0; | 397 | 148k | uint32_t duration_ms = 0; | 398 | 148k | std::string error_msg; | 399 | 148k | std::default_random_engine rng = make_random_engine(); | 400 | 148k | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 148k | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 148k | MetaServiceProxy* proxy; | 403 | 148k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 148k | while (true) { | 405 | 147k | std::shared_ptr<MetaService_Stub> stub; | 406 | 147k | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 147k | brpc::Controller cntl; | 408 | 148k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 147k | } else { | 411 | 147k | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 147k | } | 413 | 147k | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 147k | res->Clear(); | 415 | 147k | int error_code = 0; | 416 | 147k | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 147k | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 148k | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 148k | return Status::OK(); | 423 | 18.4E | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 18.4E | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 0 | res->status().msg()); | 429 | 18.4E | } else { | 430 | 18.4E | error_msg = res->status().msg(); | 431 | 18.4E | } | 432 | | | 433 | 18.4E | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | | | 437 | 18.4E | ++retry_times; | 438 | 18.4E | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 18.4E | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 18.4E | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 18.4E | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 18.4E | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 18.4E | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 18.4E | bthread_usleep(duration_ms * 1000); | 450 | 18.4E | } | 451 | 18.4E | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 148k | } |
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_16CommitTxnRequestENS0_17CommitTxnResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_15AbortTxnRequestENS0_16AbortTxnResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 487 | MetaServiceMethod<Request, Response> method) { | 390 | 487 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 487 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 487 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 487 | int retry_times = 0; | 397 | 487 | uint32_t duration_ms = 0; | 398 | 487 | std::string error_msg; | 399 | 487 | std::default_random_engine rng = make_random_engine(); | 400 | 487 | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 487 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 487 | MetaServiceProxy* proxy; | 403 | 487 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 487 | while (true) { | 405 | 487 | std::shared_ptr<MetaService_Stub> stub; | 406 | 487 | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 487 | brpc::Controller cntl; | 408 | 487 | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 487 | } else { | 411 | 487 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 487 | } | 413 | 487 | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 487 | res->Clear(); | 415 | 487 | int error_code = 0; | 416 | 487 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 487 | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 487 | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 478 | return Status::OK(); | 423 | 478 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 9 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 9 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 9 | res->status().msg()); | 429 | 9 | } else { | 430 | 0 | error_msg = res->status().msg(); | 431 | 0 | } | 432 | | | 433 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | |
| 437 | 0 | ++retry_times; | 438 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 0 | bthread_usleep(duration_ms * 1000); | 450 | 0 | } | 451 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 487 | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_19PrecommitTxnRequestENS0_20PrecommitTxnResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 33 | MetaServiceMethod<Request, Response> method) { | 390 | 33 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 33 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 33 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 33 | int retry_times = 0; | 397 | 33 | uint32_t duration_ms = 0; | 398 | 33 | std::string error_msg; | 399 | 33 | std::default_random_engine rng = make_random_engine(); | 400 | 33 | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 33 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 33 | MetaServiceProxy* proxy; | 403 | 33 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 33 | while (true) { | 405 | 33 | std::shared_ptr<MetaService_Stub> stub; | 406 | 33 | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 33 | brpc::Controller cntl; | 408 | 33 | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 33 | } else { | 411 | 33 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 33 | } | 413 | 33 | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 33 | res->Clear(); | 415 | 33 | int error_code = 0; | 416 | 33 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 33 | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 33 | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 33 | return Status::OK(); | 423 | 33 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 0 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 0 | res->status().msg()); | 429 | 0 | } else { | 430 | 0 | error_msg = res->status().msg(); | 431 | 0 | } | 432 | | | 433 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | |
| 437 | 0 | ++retry_times; | 438 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 0 | bthread_usleep(duration_ms * 1000); | 450 | 0 | } | 451 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 33 | } |
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_17RestoreJobRequestENS0_18RestoreJobResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_22GetObjStoreInfoRequestENS0_23GetObjStoreInfoResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 255 | MetaServiceMethod<Request, Response> method) { | 390 | 255 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 255 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 255 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 255 | int retry_times = 0; | 397 | 255 | uint32_t duration_ms = 0; | 398 | 255 | std::string error_msg; | 399 | 255 | std::default_random_engine rng = make_random_engine(); | 400 | 255 | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 255 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 255 | MetaServiceProxy* proxy; | 403 | 255 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 255 | while (true) { | 405 | 255 | std::shared_ptr<MetaService_Stub> stub; | 406 | 255 | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 255 | brpc::Controller cntl; | 408 | 255 | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 255 | } else { | 411 | 255 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 255 | } | 413 | 255 | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 255 | res->Clear(); | 415 | 255 | int error_code = 0; | 416 | 255 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 255 | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 255 | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 255 | return Status::OK(); | 423 | 255 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 0 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 0 | res->status().msg()); | 429 | 0 | } else { | 430 | 0 | error_msg = res->status().msg(); | 431 | 0 | } | 432 | | | 433 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | |
| 437 | 0 | ++retry_times; | 438 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 0 | bthread_usleep(duration_ms * 1000); | 450 | 0 | } | 451 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 255 | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_21StartTabletJobRequestENS0_22StartTabletJobResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 18.5k | MetaServiceMethod<Request, Response> method) { | 390 | 18.5k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 18.5k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 18.5k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 18.5k | int retry_times = 0; | 397 | 18.5k | uint32_t duration_ms = 0; | 398 | 18.5k | std::string error_msg; | 399 | 18.5k | std::default_random_engine rng = make_random_engine(); | 400 | 18.5k | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 18.5k | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 18.5k | MetaServiceProxy* proxy; | 403 | 18.5k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 18.5k | while (true) { | 405 | 18.5k | std::shared_ptr<MetaService_Stub> stub; | 406 | 18.5k | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 18.5k | brpc::Controller cntl; | 408 | 18.5k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 18.5k | } else { | 411 | 18.5k | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 18.5k | } | 413 | 18.5k | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 18.5k | res->Clear(); | 415 | 18.5k | int error_code = 0; | 416 | 18.5k | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 18.5k | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 18.5k | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 18.2k | return Status::OK(); | 423 | 18.2k | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 441 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 441 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 441 | res->status().msg()); | 429 | 18.4E | } else { | 430 | 18.4E | error_msg = res->status().msg(); | 431 | 18.4E | } | 432 | | | 433 | 18.4E | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | | | 437 | 18.4E | ++retry_times; | 438 | 18.4E | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 18.4E | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 18.4E | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 18.4E | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 18.4E | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 18.4E | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 18.4E | bthread_usleep(duration_ms * 1000); | 450 | 18.4E | } | 451 | 18.4E | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 18.5k | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_22FinishTabletJobRequestENS0_23FinishTabletJobResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 16.8k | MetaServiceMethod<Request, Response> method) { | 390 | 16.8k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 16.8k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 16.8k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 16.8k | int retry_times = 0; | 397 | 16.8k | uint32_t duration_ms = 0; | 398 | 16.8k | std::string error_msg; | 399 | 16.8k | std::default_random_engine rng = make_random_engine(); | 400 | 16.8k | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 16.8k | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 16.8k | MetaServiceProxy* proxy; | 403 | 16.8k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 17.0k | while (true) { | 405 | 17.0k | std::shared_ptr<MetaService_Stub> stub; | 406 | 17.0k | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 17.0k | brpc::Controller cntl; | 408 | 17.1k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 17.0k | } else { | 411 | 17.0k | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 17.0k | } | 413 | 17.0k | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 17.0k | res->Clear(); | 415 | 17.0k | int error_code = 0; | 416 | 17.0k | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 17.0k | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 17.1k | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 17.1k | return Status::OK(); | 423 | 18.4E | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 22 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 22 | res->status().msg()); | 426 | 18.4E | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 35 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 35 | res->status().msg()); | 429 | 18.4E | } else { | 430 | 18.4E | error_msg = res->status().msg(); | 431 | 18.4E | } | 432 | | | 433 | 18.4E | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | | | 437 | 18.4E | ++retry_times; | 438 | 18.4E | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 18.4E | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 18.4E | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 18.4E | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 18.4E | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 18.4E | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 18.4E | bthread_usleep(duration_ms * 1000); | 450 | 18.4E | } | 451 | 18.4E | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 16.8k | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_25UpdateDeleteBitmapRequestENS0_26UpdateDeleteBitmapResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 29.8k | MetaServiceMethod<Request, Response> method) { | 390 | 29.8k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 29.8k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 29.8k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 29.8k | int retry_times = 0; | 397 | 29.8k | uint32_t duration_ms = 0; | 398 | 29.8k | std::string error_msg; | 399 | 29.8k | std::default_random_engine rng = make_random_engine(); | 400 | 29.8k | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 29.8k | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 29.8k | MetaServiceProxy* proxy; | 403 | 29.8k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 30.2k | while (true) { | 405 | 30.2k | std::shared_ptr<MetaService_Stub> stub; | 406 | 30.2k | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 30.2k | brpc::Controller cntl; | 408 | 30.4k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 30.4k | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 18.4E | } else { | 411 | 18.4E | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 18.4E | } | 413 | 30.2k | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 30.2k | res->Clear(); | 415 | 30.2k | int error_code = 0; | 416 | 30.2k | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 30.2k | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 30.9k | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 30.9k | return Status::OK(); | 423 | 18.4E | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 18.4E | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 19 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 19 | res->status().msg()); | 429 | 18.4E | } else { | 430 | 18.4E | error_msg = res->status().msg(); | 431 | 18.4E | } | 432 | | | 433 | 18.4E | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | | | 437 | 18.4E | ++retry_times; | 438 | 18.4E | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 18.4E | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 18.4E | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 18.4E | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 18.4E | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 18.4E | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 18.4E | bthread_usleep(duration_ms * 1000); | 450 | 18.4E | } | 451 | 18.4E | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 29.8k | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_32GetDeleteBitmapUpdateLockRequestENS0_33GetDeleteBitmapUpdateLockResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 5.28k | MetaServiceMethod<Request, Response> method) { | 390 | 5.28k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 5.28k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 5.28k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 5.28k | int retry_times = 0; | 397 | 5.28k | uint32_t duration_ms = 0; | 398 | 5.28k | std::string error_msg; | 399 | 5.28k | std::default_random_engine rng = make_random_engine(); | 400 | 5.28k | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 5.28k | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 5.28k | MetaServiceProxy* proxy; | 403 | 5.28k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 5.28k | while (true) { | 405 | 5.28k | std::shared_ptr<MetaService_Stub> stub; | 406 | 5.28k | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 5.28k | brpc::Controller cntl; | 408 | 5.28k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 5.28k | } else { | 411 | 5.28k | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 5.28k | } | 413 | 5.28k | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 5.28k | res->Clear(); | 415 | 5.28k | int error_code = 0; | 416 | 5.28k | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 5.28k | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 5.28k | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 4.76k | return Status::OK(); | 423 | 4.76k | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 528 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 528 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 528 | res->status().msg()); | 429 | 18.4E | } else { | 430 | 18.4E | error_msg = res->status().msg(); | 431 | 18.4E | } | 432 | | | 433 | 18.4E | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | | | 437 | 18.4E | ++retry_times; | 438 | 18.4E | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 18.4E | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 18.4E | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 18.4E | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 18.4E | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 18.4E | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 18.4E | bthread_usleep(duration_ms * 1000); | 450 | 18.4E | } | 451 | 18.4E | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 5.28k | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_35RemoveDeleteBitmapUpdateLockRequestENS0_36RemoveDeleteBitmapUpdateLockResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 21 | MetaServiceMethod<Request, Response> method) { | 390 | 21 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 21 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 21 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 21 | int retry_times = 0; | 397 | 21 | uint32_t duration_ms = 0; | 398 | 21 | std::string error_msg; | 399 | 21 | std::default_random_engine rng = make_random_engine(); | 400 | 21 | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 21 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 21 | MetaServiceProxy* proxy; | 403 | 21 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 21 | while (true) { | 405 | 21 | std::shared_ptr<MetaService_Stub> stub; | 406 | 21 | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 21 | brpc::Controller cntl; | 408 | 21 | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 21 | } else { | 411 | 21 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 21 | } | 413 | 21 | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 21 | res->Clear(); | 415 | 21 | int error_code = 0; | 416 | 21 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 21 | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 21 | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 2 | return Status::OK(); | 423 | 19 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 19 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 19 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 19 | res->status().msg()); | 429 | 19 | } else { | 430 | 0 | error_msg = res->status().msg(); | 431 | 0 | } | 432 | | | 433 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | |
| 437 | 0 | ++retry_times; | 438 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 0 | bthread_usleep(duration_ms * 1000); | 450 | 0 | } | 451 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 21 | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_19ListSnapshotRequestENS0_20ListSnapshotResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 2 | MetaServiceMethod<Request, Response> method) { | 390 | 2 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 2 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 2 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 2 | int retry_times = 0; | 397 | 2 | uint32_t duration_ms = 0; | 398 | 2 | std::string error_msg; | 399 | 2 | std::default_random_engine rng = make_random_engine(); | 400 | 2 | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 2 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 2 | MetaServiceProxy* proxy; | 403 | 2 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 2 | while (true) { | 405 | 2 | std::shared_ptr<MetaService_Stub> stub; | 406 | 2 | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 2 | brpc::Controller cntl; | 408 | 2 | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 2 | } else { | 411 | 2 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 2 | } | 413 | 2 | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 2 | res->Clear(); | 415 | 2 | int error_code = 0; | 416 | 2 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 2 | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 2 | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 0 | return Status::OK(); | 423 | 2 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 2 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 2 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 2 | res->status().msg()); | 429 | 2 | } else { | 430 | 0 | error_msg = res->status().msg(); | 431 | 0 | } | 432 | | | 433 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | |
| 437 | 0 | ++retry_times; | 438 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 0 | bthread_usleep(duration_ms * 1000); | 450 | 0 | } | 451 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 2 | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_18GetInstanceRequestENS0_19GetInstanceResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 1 | MetaServiceMethod<Request, Response> method) { | 390 | 1 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 1 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 1 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 1 | int retry_times = 0; | 397 | 1 | uint32_t duration_ms = 0; | 398 | 1 | std::string error_msg; | 399 | 1 | std::default_random_engine rng = make_random_engine(); | 400 | 1 | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 1 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 1 | MetaServiceProxy* proxy; | 403 | 1 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 1 | while (true) { | 405 | 1 | std::shared_ptr<MetaService_Stub> stub; | 406 | 1 | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 1 | brpc::Controller cntl; | 408 | 1 | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 1 | } else { | 411 | 1 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 1 | } | 413 | 1 | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 1 | res->Clear(); | 415 | 1 | int error_code = 0; | 416 | 1 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 1 | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 1 | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 1 | return Status::OK(); | 423 | 1 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 0 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 0 | res->status().msg()); | 429 | 0 | } else { | 430 | 0 | error_msg = res->status().msg(); | 431 | 0 | } | 432 | | | 433 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | |
| 437 | 0 | ++retry_times; | 438 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 0 | bthread_usleep(duration_ms * 1000); | 450 | 0 | } | 451 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 1 | } |
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_27UpdatePackedFileInfoRequestENS0_28UpdatePackedFileInfoResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_23GetClusterStatusRequestENS0_24GetClusterStatusResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 66 | MetaServiceMethod<Request, Response> method) { | 390 | 66 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 66 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 66 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 66 | int retry_times = 0; | 397 | 66 | uint32_t duration_ms = 0; | 398 | 66 | std::string error_msg; | 399 | 66 | std::default_random_engine rng = make_random_engine(); | 400 | 66 | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 66 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 66 | MetaServiceProxy* proxy; | 403 | 66 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 66 | while (true) { | 405 | 66 | std::shared_ptr<MetaService_Stub> stub; | 406 | 66 | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 66 | brpc::Controller cntl; | 408 | 66 | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 66 | } else { | 411 | 66 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 66 | } | 413 | 66 | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 66 | res->Clear(); | 415 | 66 | int error_code = 0; | 416 | 66 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 66 | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 66 | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 66 | return Status::OK(); | 423 | 66 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 0 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 0 | res->status().msg()); | 429 | 0 | } else { | 430 | 0 | error_msg = res->status().msg(); | 431 | 0 | } | 432 | | | 433 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | |
| 437 | 0 | ++retry_times; | 438 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 0 | bthread_usleep(duration_ms * 1000); | 450 | 0 | } | 451 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 66 | } |
|
453 | | |
454 | | } // namespace |
455 | | |
456 | 337k | Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta) { |
457 | 337k | VLOG_DEBUG << "send GetTabletRequest, tablet_id: " << tablet_id; |
458 | 337k | TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::get_tablet_meta", Status::OK(), tablet_id, |
459 | 337k | tablet_meta); |
460 | 337k | GetTabletRequest req; |
461 | 337k | GetTabletResponse resp; |
462 | 337k | req.set_cloud_unique_id(config::cloud_unique_id); |
463 | 337k | req.set_tablet_id(tablet_id); |
464 | 337k | Status st = retry_rpc("get tablet meta", req, &resp, &MetaService_Stub::get_tablet); |
465 | 337k | if (!st.ok()) { |
466 | 100 | if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) { |
467 | 100 | return Status::NotFound("failed to get tablet meta: {}", resp.status().msg()); |
468 | 100 | } |
469 | 0 | return st; |
470 | 100 | } |
471 | | |
472 | 337k | *tablet_meta = std::make_shared<TabletMeta>(); |
473 | 337k | (*tablet_meta) |
474 | 337k | ->init_from_pb(cloud_tablet_meta_to_doris(std::move(*resp.mutable_tablet_meta()))); |
475 | 337k | VLOG_DEBUG << "get tablet meta, tablet_id: " << (*tablet_meta)->tablet_id(); |
476 | 337k | return Status::OK(); |
477 | 337k | } |
478 | | |
479 | | Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, const SyncOptions& options, |
480 | 134k | SyncRowsetStats* sync_stats) { |
481 | 134k | std::unique_lock lock {tablet->get_sync_meta_lock()}; |
482 | 134k | return sync_tablet_rowsets_unlocked(tablet, lock, options, sync_stats); |
483 | 134k | } |
484 | | |
485 | | Status CloudMetaMgr::_log_mow_delete_bitmap(CloudTablet* tablet, GetRowsetResponse& resp, |
486 | | DeleteBitmap& delete_bitmap, int64_t old_max_version, |
487 | 51.3k | bool full_sync, int32_t read_version) { |
488 | 51.3k | if (config::enable_mow_verbose_log && !resp.rowset_meta().empty() && |
489 | 51.3k | delete_bitmap.cardinality() > 0) { |
490 | 0 | int64_t tablet_id = tablet->tablet_id(); |
491 | 0 | std::vector<std::string> new_rowset_msgs; |
492 | 0 | std::vector<std::string> old_rowset_msgs; |
493 | 0 | std::unordered_set<RowsetId> new_rowset_ids; |
494 | 0 | int64_t new_max_version = resp.rowset_meta().rbegin()->end_version(); |
495 | 0 | for (const auto& rs : resp.rowset_meta()) { |
496 | 0 | RowsetId rowset_id; |
497 | 0 | rowset_id.init(rs.rowset_id_v2()); |
498 | 0 | new_rowset_ids.insert(rowset_id); |
499 | 0 | DeleteBitmap rowset_dbm(tablet_id); |
500 | 0 | delete_bitmap.subset({rowset_id, 0, 0}, |
501 | 0 | {rowset_id, std::numeric_limits<DeleteBitmap::SegmentId>::max(), |
502 | 0 | std::numeric_limits<DeleteBitmap::Version>::max()}, |
503 | 0 | &rowset_dbm); |
504 | 0 | size_t cardinality = rowset_dbm.cardinality(); |
505 | 0 | size_t count = rowset_dbm.get_delete_bitmap_count(); |
506 | 0 | if (cardinality > 0) { |
507 | 0 | new_rowset_msgs.push_back(fmt::format("({}[{}-{}],{},{})", rs.rowset_id_v2(), |
508 | 0 | rs.start_version(), rs.end_version(), count, |
509 | 0 | cardinality)); |
510 | 0 | } |
511 | 0 | } |
512 | |
|
513 | 0 | if (old_max_version > 0) { |
514 | 0 | std::vector<RowsetSharedPtr> old_rowsets; |
515 | 0 | RowsetIdUnorderedSet old_rowset_ids; |
516 | 0 | { |
517 | 0 | std::lock_guard<std::shared_mutex> rlock(tablet->get_header_lock()); |
518 | 0 | RETURN_IF_ERROR(tablet->get_all_rs_id_unlocked(old_max_version, &old_rowset_ids)); |
519 | 0 | old_rowsets = tablet->get_rowset_by_ids(&old_rowset_ids); |
520 | 0 | } |
521 | 0 | for (const auto& rs : old_rowsets) { |
522 | 0 | if (!new_rowset_ids.contains(rs->rowset_id())) { |
523 | 0 | DeleteBitmap rowset_dbm(tablet_id); |
524 | 0 | delete_bitmap.subset( |
525 | 0 | {rs->rowset_id(), 0, 0}, |
526 | 0 | {rs->rowset_id(), std::numeric_limits<DeleteBitmap::SegmentId>::max(), |
527 | 0 | std::numeric_limits<DeleteBitmap::Version>::max()}, |
528 | 0 | &rowset_dbm); |
529 | 0 | size_t cardinality = rowset_dbm.cardinality(); |
530 | 0 | size_t count = rowset_dbm.get_delete_bitmap_count(); |
531 | 0 | if (cardinality > 0) { |
532 | 0 | old_rowset_msgs.push_back( |
533 | 0 | fmt::format("({}{},{},{})", rs->rowset_id().to_string(), |
534 | 0 | rs->version().to_string(), count, cardinality)); |
535 | 0 | } |
536 | 0 | } |
537 | 0 | } |
538 | 0 | } |
539 | | |
540 | 0 | std::string tablet_info = fmt::format( |
541 | 0 | "tablet_id={} table_id={} index_id={} partition_id={}", tablet->tablet_id(), |
542 | 0 | tablet->table_id(), tablet->index_id(), tablet->partition_id()); |
543 | 0 | LOG_INFO("[verbose] sync tablet delete bitmap " + tablet_info) |
544 | 0 | .tag("full_sync", full_sync) |
545 | 0 | .tag("read_version", read_version) |
546 | 0 | .tag("old_max_version", old_max_version) |
547 | 0 | .tag("new_max_version", new_max_version) |
548 | 0 | .tag("cumu_compaction_cnt", resp.stats().cumulative_compaction_cnt()) |
549 | 0 | .tag("base_compaction_cnt", resp.stats().base_compaction_cnt()) |
550 | 0 | .tag("cumu_point", resp.stats().cumulative_point()) |
551 | 0 | .tag("rowset_num", resp.rowset_meta().size()) |
552 | 0 | .tag("delete_bitmap_cardinality", delete_bitmap.cardinality()) |
553 | 0 | .tag("old_rowsets(rowset,count,cardinality)", |
554 | 0 | fmt::format("[{}]", fmt::join(old_rowset_msgs, ", "))) |
555 | 0 | .tag("new_rowsets(rowset,count,cardinality)", |
556 | 0 | fmt::format("[{}]", fmt::join(new_rowset_msgs, ", "))); |
557 | 0 | } |
558 | 51.3k | return Status::OK(); |
559 | 51.3k | } |
560 | | |
561 | | Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet, |
562 | | std::unique_lock<bthread::Mutex>& lock, |
563 | | const SyncOptions& options, |
564 | 188k | SyncRowsetStats* sync_stats) { |
565 | 188k | using namespace std::chrono; |
566 | | |
567 | 188k | TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet); |
568 | 188k | DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.before.inject_error", { |
569 | 188k | auto target_tablet_id = dp->param<int64_t>("tablet_id", -1); |
570 | 188k | auto target_table_id = dp->param<int64_t>("table_id", -1); |
571 | 188k | if (target_tablet_id == tablet->tablet_id() || target_table_id == tablet->table_id()) { |
572 | 188k | return Status::InternalError( |
573 | 188k | "[sync_tablet_rowsets_unlocked] injected error for testing"); |
574 | 188k | } |
575 | 188k | }); |
576 | | |
577 | 188k | MetaServiceProxy* proxy; |
578 | 188k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); |
579 | 188k | std::string tablet_info = |
580 | 188k | fmt::format("tablet_id={} table_id={} index_id={} partition_id={}", tablet->tablet_id(), |
581 | 188k | tablet->table_id(), tablet->index_id(), tablet->partition_id()); |
582 | 188k | int tried = 0; |
583 | 188k | while (true) { |
584 | 188k | std::shared_ptr<MetaService_Stub> stub; |
585 | 188k | RETURN_IF_ERROR(proxy->get(&stub)); |
586 | 188k | brpc::Controller cntl; |
587 | 188k | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); |
588 | 188k | GetRowsetRequest req; |
589 | 188k | GetRowsetResponse resp; |
590 | | |
591 | 188k | int64_t tablet_id = tablet->tablet_id(); |
592 | 188k | int64_t table_id = tablet->table_id(); |
593 | 188k | int64_t index_id = tablet->index_id(); |
594 | 188k | req.set_cloud_unique_id(config::cloud_unique_id); |
595 | 188k | auto* idx = req.mutable_idx(); |
596 | 188k | idx->set_tablet_id(tablet_id); |
597 | 188k | idx->set_table_id(table_id); |
598 | 188k | idx->set_index_id(index_id); |
599 | 188k | idx->set_partition_id(tablet->partition_id()); |
600 | 188k | { |
601 | 188k | auto lock_start = std::chrono::steady_clock::now(); |
602 | 188k | std::shared_lock rlock(tablet->get_header_lock()); |
603 | 188k | if (sync_stats) { |
604 | 18.3k | sync_stats->meta_lock_wait_ns += |
605 | 18.3k | std::chrono::duration_cast<std::chrono::nanoseconds>( |
606 | 18.3k | std::chrono::steady_clock::now() - lock_start) |
607 | 18.3k | .count(); |
608 | 18.3k | } |
609 | 188k | if (options.full_sync) { |
610 | 2 | req.set_start_version(0); |
611 | 188k | } else { |
612 | 188k | req.set_start_version(tablet->max_version_unlocked() + 1); |
613 | 188k | } |
614 | 188k | req.set_base_compaction_cnt(tablet->base_compaction_cnt()); |
615 | 188k | req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt()); |
616 | 188k | req.set_full_compaction_cnt(tablet->full_compaction_cnt()); |
617 | 188k | req.set_cumulative_point(tablet->cumulative_layer_point()); |
618 | 188k | } |
619 | 188k | req.set_end_version(-1); |
620 | 188k | VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString(); |
621 | 188k | auto start = std::chrono::steady_clock::now(); |
622 | 188k | stub->get_rowset(&cntl, &req, &resp, nullptr); |
623 | 188k | auto end = std::chrono::steady_clock::now(); |
624 | 188k | int64_t latency = cntl.latency_us(); |
625 | 188k | _get_rowset_latency << latency; |
626 | 188k | int retry_times = config::meta_service_rpc_retry_times; |
627 | 188k | if (cntl.Failed()) { |
628 | 0 | proxy->set_unhealthy(); |
629 | 0 | if (tried++ < retry_times) { |
630 | 0 | auto rng = make_random_engine(); |
631 | 0 | std::uniform_int_distribution<uint32_t> u(20, 200); |
632 | 0 | std::uniform_int_distribution<uint32_t> u1(500, 1000); |
633 | 0 | uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng); |
634 | 0 | bthread_usleep(duration_ms * 1000); |
635 | 0 | LOG_INFO("failed to get rowset meta, " + tablet_info) |
636 | 0 | .tag("reason", cntl.ErrorText()) |
637 | 0 | .tag("tried", tried) |
638 | 0 | .tag("sleep", duration_ms); |
639 | 0 | continue; |
640 | 0 | } |
641 | 0 | return Status::RpcError("failed to get rowset meta: {}", cntl.ErrorText()); |
642 | 0 | } |
643 | 188k | if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) { |
644 | 0 | LOG(WARNING) << "failed to get rowset meta, err=" << resp.status().msg() << " " |
645 | 0 | << tablet_info; |
646 | 0 | return Status::NotFound("failed to get rowset meta: {}, {}", resp.status().msg(), |
647 | 0 | tablet_info); |
648 | 0 | } |
649 | 188k | if (resp.status().code() != MetaServiceCode::OK) { |
650 | 0 | LOG(WARNING) << " failed to get rowset meta, err=" << resp.status().msg() << " " |
651 | 0 | << tablet_info; |
652 | 0 | return Status::InternalError("failed to get rowset meta: {}, {}", resp.status().msg(), |
653 | 0 | tablet_info); |
654 | 0 | } |
655 | 188k | if (latency > 100 * 1000) { // 100ms |
656 | 127 | LOG(INFO) << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size() |
657 | 127 | << ", latency=" << latency << "us" |
658 | 127 | << " " << tablet_info; |
659 | 188k | } else { |
660 | 188k | LOG_EVERY_N(INFO, 100) |
661 | 1.88k | << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size() |
662 | 1.88k | << ", latency=" << latency << "us" |
663 | 1.88k | << " " << tablet_info; |
664 | 188k | } |
665 | | |
666 | 188k | int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); |
667 | 188k | tablet->last_sync_time_s = now; |
668 | | |
669 | 188k | if (sync_stats) { |
670 | 18.4k | sync_stats->get_remote_rowsets_rpc_ns += |
671 | 18.4k | std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count(); |
672 | 18.4k | sync_stats->get_remote_rowsets_num += resp.rowset_meta().size(); |
673 | 18.4k | } |
674 | | |
675 | | // If is mow, the tablet has no delete bitmap in base rowsets. |
676 | | // So dont need to sync it. |
677 | 189k | if (options.sync_delete_bitmap && tablet->enable_unique_key_merge_on_write() && |
678 | 188k | tablet->tablet_state() == TABLET_RUNNING) { |
679 | 51.3k | DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.sync_tablet_delete_bitmap.block", |
680 | 51.3k | DBUG_BLOCK); |
681 | 51.3k | DeleteBitmap delete_bitmap(tablet_id); |
682 | 51.3k | int64_t old_max_version = req.start_version() - 1; |
683 | 51.3k | auto read_version = config::delete_bitmap_store_read_version; |
684 | 51.3k | auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(), |
685 | 51.3k | resp.stats(), req.idx(), &delete_bitmap, |
686 | 51.3k | options.full_sync, sync_stats, read_version, false); |
687 | 51.3k | if (st.is<ErrorCode::ROWSETS_EXPIRED>() && tried++ < retry_times) { |
688 | 0 | LOG_INFO("rowset meta is expired, need to retry, " + tablet_info) |
689 | 0 | .tag("tried", tried) |
690 | 0 | .error(st); |
691 | 0 | continue; |
692 | 0 | } |
693 | 51.3k | if (!st.ok()) { |
694 | 0 | LOG_WARNING("failed to get delete bitmap, " + tablet_info).error(st); |
695 | 0 | return st; |
696 | 0 | } |
697 | 51.3k | tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap); |
698 | 51.3k | RETURN_IF_ERROR(_log_mow_delete_bitmap(tablet, resp, delete_bitmap, old_max_version, |
699 | 51.3k | options.full_sync, read_version)); |
700 | 51.3k | RETURN_IF_ERROR( |
701 | 51.3k | _check_delete_bitmap_v2_correctness(tablet, req, resp, old_max_version)); |
702 | 51.3k | } |
703 | 188k | DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.before.modify_tablet_meta", { |
704 | 188k | auto target_tablet_id = dp->param<int64_t>("tablet_id", -1); |
705 | 188k | if (target_tablet_id == tablet->tablet_id()) { |
706 | 188k | DBUG_BLOCK |
707 | 188k | } |
708 | 188k | }); |
709 | 188k | { |
710 | 188k | const auto& stats = resp.stats(); |
711 | 188k | auto lock_start = std::chrono::steady_clock::now(); |
712 | 188k | std::unique_lock wlock(tablet->get_header_lock()); |
713 | 188k | if (sync_stats) { |
714 | 18.4k | sync_stats->meta_lock_wait_ns += |
715 | 18.4k | std::chrono::duration_cast<std::chrono::nanoseconds>( |
716 | 18.4k | std::chrono::steady_clock::now() - lock_start) |
717 | 18.4k | .count(); |
718 | 18.4k | } |
719 | | |
720 | | // ATTN: we are facing following data race |
721 | | // |
722 | | // resp_base_compaction_cnt=0|base_compaction_cnt=0|resp_cumulative_compaction_cnt=0|cumulative_compaction_cnt=1|resp_max_version=11|max_version=8 |
723 | | // |
724 | | // BE-compaction-thread meta-service BE-query-thread |
725 | | // | | | |
726 | | // local | commit cumu-compaction | | |
727 | | // cc_cnt=0 | ---------------------------> | sync rowset (long rpc, local cc_cnt=0 ) | local |
728 | | // | | <----------------------------------------- | cc_cnt=0 |
729 | | // | | -. | |
730 | | // local | done cc_cnt=1 | \ | |
731 | | // cc_cnt=1 | <--------------------------- | \ | |
732 | | // | | \ returned with resp cc_cnt=0 (snapshot) | |
733 | | // | | '------------------------------------> | local |
734 | | // | | | cc_cnt=1 |
735 | | // | | | |
736 | | // | | | CHECK FAIL |
737 | | // | | | need retry |
738 | | // To get rid of just retry syncing tablet |
739 | 188k | if (stats.base_compaction_cnt() < tablet->base_compaction_cnt() || |
740 | 189k | stats.cumulative_compaction_cnt() < tablet->cumulative_compaction_cnt()) |
741 | 0 | [[unlikely]] { |
742 | | // stale request, ignore |
743 | 0 | LOG_WARNING("stale get rowset meta request " + tablet_info) |
744 | 0 | .tag("resp_base_compaction_cnt", stats.base_compaction_cnt()) |
745 | 0 | .tag("base_compaction_cnt", tablet->base_compaction_cnt()) |
746 | 0 | .tag("resp_cumulative_compaction_cnt", stats.cumulative_compaction_cnt()) |
747 | 0 | .tag("cumulative_compaction_cnt", tablet->cumulative_compaction_cnt()) |
748 | 0 | .tag("tried", tried); |
749 | 0 | if (tried++ < 10) continue; |
750 | 0 | return Status::OK(); |
751 | 0 | } |
752 | 188k | std::vector<RowsetSharedPtr> rowsets; |
753 | 188k | rowsets.reserve(resp.rowset_meta().size()); |
754 | 188k | for (const auto& cloud_rs_meta_pb : resp.rowset_meta()) { |
755 | 132k | VLOG_DEBUG << "get rowset meta, tablet_id=" << cloud_rs_meta_pb.tablet_id() |
756 | 206 | << ", version=[" << cloud_rs_meta_pb.start_version() << '-' |
757 | 206 | << cloud_rs_meta_pb.end_version() << ']'; |
758 | 132k | auto existed_rowset = tablet->get_rowset_by_version( |
759 | 132k | {cloud_rs_meta_pb.start_version(), cloud_rs_meta_pb.end_version()}); |
760 | 132k | if (existed_rowset && |
761 | 132k | existed_rowset->rowset_id().to_string() == cloud_rs_meta_pb.rowset_id_v2()) { |
762 | 0 | continue; // Same rowset, skip it |
763 | 0 | } |
764 | 132k | RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris(cloud_rs_meta_pb); |
765 | 132k | auto rs_meta = std::make_shared<RowsetMeta>(); |
766 | 132k | rs_meta->init_from_pb(meta_pb); |
767 | 132k | RowsetSharedPtr rowset; |
768 | | // schema is nullptr implies using RowsetMeta.tablet_schema |
769 | 132k | Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset); |
770 | 132k | if (!s.ok()) { |
771 | 0 | LOG_WARNING("create rowset").tag("status", s); |
772 | 0 | return s; |
773 | 0 | } |
774 | 132k | rowsets.push_back(std::move(rowset)); |
775 | 132k | } |
776 | 188k | if (!rowsets.empty()) { |
777 | | // `rowsets.empty()` could happen after doing EMPTY_CUMULATIVE compaction. e.g.: |
778 | | // BE has [0-1][2-11][12-12], [12-12] is delete predicate, cp is 2; |
779 | | // after doing EMPTY_CUMULATIVE compaction, MS cp is 13, get_rowset will return [2-11][12-12]. |
780 | 125k | bool version_overlap = |
781 | 125k | tablet->max_version_unlocked() >= rowsets.front()->start_version(); |
782 | 125k | tablet->add_rowsets(std::move(rowsets), version_overlap, wlock, |
783 | 125k | options.warmup_delta_data || |
784 | 125k | config::enable_warmup_immediately_on_new_rowset); |
785 | 125k | } |
786 | | |
787 | | // Fill version holes |
788 | 188k | int64_t partition_max_version = |
789 | 188k | resp.has_partition_max_version() ? resp.partition_max_version() : -1; |
790 | 188k | RETURN_IF_ERROR(fill_version_holes(tablet, partition_max_version, wlock)); |
791 | | |
792 | 188k | tablet->last_base_compaction_success_time_ms = stats.last_base_compaction_time_ms(); |
793 | 188k | tablet->last_cumu_compaction_success_time_ms = stats.last_cumu_compaction_time_ms(); |
794 | 188k | tablet->set_base_compaction_cnt(stats.base_compaction_cnt()); |
795 | 188k | tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt()); |
796 | 188k | tablet->set_full_compaction_cnt(stats.full_compaction_cnt()); |
797 | 188k | tablet->set_cumulative_layer_point(stats.cumulative_point()); |
798 | 188k | tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), |
799 | 188k | stats.num_rows(), stats.data_size()); |
800 | | |
801 | | // Sync last active cluster info for compaction read-write separation |
802 | 188k | if (config::enable_compaction_rw_separation && stats.has_last_active_cluster_id()) { |
803 | 17.4k | tablet->set_last_active_cluster_info(stats.last_active_cluster_id(), |
804 | 17.4k | stats.last_active_time_ms()); |
805 | 17.4k | } |
806 | 188k | } |
807 | 0 | return Status::OK(); |
808 | 188k | } |
809 | 188k | } |
810 | | |
811 | | bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, |
812 | | std::ranges::range auto&& rs_metas, |
813 | 27.5k | DeleteBitmap* delete_bitmap) { |
814 | 27.5k | std::set<int64_t> txn_processed; |
815 | 27.5k | for (auto& rs_meta : rs_metas) { |
816 | 27.5k | auto txn_id = rs_meta.txn_id(); |
817 | 27.5k | if (txn_processed.find(txn_id) != txn_processed.end()) { |
818 | 0 | continue; |
819 | 0 | } |
820 | 27.5k | txn_processed.insert(txn_id); |
821 | 27.5k | DeleteBitmapPtr tmp_delete_bitmap; |
822 | 27.5k | std::shared_ptr<PublishStatus> publish_status = |
823 | 27.5k | std::make_shared<PublishStatus>(PublishStatus::INIT); |
824 | 27.5k | CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud(); |
825 | 27.5k | Status status = engine.txn_delete_bitmap_cache().get_delete_bitmap( |
826 | 27.5k | txn_id, tablet->tablet_id(), &tmp_delete_bitmap, nullptr, &publish_status); |
827 | | // CloudMetaMgr::sync_tablet_delete_bitmap_by_cache() is called after we sync rowsets from meta services. |
828 | | // If the control flows reaches here, it's gauranteed that the rowsets is commited in meta services, so we can |
829 | | // use the delete bitmap from cache directly if *publish_status == PublishStatus::SUCCEED without checking other |
830 | | // stats(version or compaction stats) |
831 | 27.5k | if (status.ok() && *publish_status == PublishStatus::SUCCEED) { |
832 | | // tmp_delete_bitmap contains sentinel marks, we should remove it before merge it to delete bitmap. |
833 | | // Also, the version of delete bitmap key in tmp_delete_bitmap is DeleteBitmap::TEMP_VERSION_COMMON, |
834 | | // we should replace it with the rowset's real version |
835 | 440 | DCHECK(rs_meta.start_version() == rs_meta.end_version()); |
836 | 440 | int64_t rowset_version = rs_meta.start_version(); |
837 | 4.23k | for (const auto& [delete_bitmap_key, bitmap_value] : tmp_delete_bitmap->delete_bitmap) { |
838 | | // skip sentinel mark, which is used for delete bitmap correctness check |
839 | 4.23k | if (std::get<1>(delete_bitmap_key) != DeleteBitmap::INVALID_SEGMENT_ID) { |
840 | 296 | delete_bitmap->merge({std::get<0>(delete_bitmap_key), |
841 | 296 | std::get<1>(delete_bitmap_key), rowset_version}, |
842 | 296 | bitmap_value); |
843 | 296 | } |
844 | 4.23k | } |
845 | 440 | engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id, |
846 | 440 | tablet->tablet_id()); |
847 | 27.1k | } else { |
848 | 27.1k | LOG_EVERY_N(INFO, 20) |
849 | 1.35k | << "delete bitmap not found in cache, will sync rowset to get. tablet_id= " |
850 | 1.35k | << tablet->tablet_id() << ", txn_id=" << txn_id << ", status=" << status; |
851 | 27.1k | return false; |
852 | 27.1k | } |
853 | 27.5k | } |
854 | 391 | return true; |
855 | 27.5k | } |
856 | | |
857 | | Status CloudMetaMgr::_get_delete_bitmap_from_ms(GetDeleteBitmapRequest& req, |
858 | 27.4k | GetDeleteBitmapResponse& res) { |
859 | 18.4E | VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString(); |
860 | 27.4k | TEST_SYNC_POINT_CALLBACK("CloudMetaMgr::_get_delete_bitmap_from_ms", &req, &res); |
861 | | |
862 | 27.4k | auto st = retry_rpc("get delete bitmap", req, &res, &MetaService_Stub::get_delete_bitmap); |
863 | 27.4k | if (st.code() == ErrorCode::THRIFT_RPC_ERROR) { |
864 | 0 | return st; |
865 | 0 | } |
866 | | |
867 | 27.4k | if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) { |
868 | 1 | return Status::NotFound("failed to get delete bitmap: {}", res.status().msg()); |
869 | 1 | } |
870 | | // The delete bitmap of stale rowsets will be removed when commit compaction job, |
871 | | // then delete bitmap of stale rowsets cannot be obtained. But the rowsets obtained |
872 | | // by sync_tablet_rowsets may include these stale rowsets. When this case happend, the |
873 | | // error code of ROWSETS_EXPIRED will be returned, we need to retry sync rowsets again. |
874 | | // |
875 | | // Be query thread meta-service Be compaction thread |
876 | | // | | | |
877 | | // | get rowset | | |
878 | | // |--------------------------->| | |
879 | | // | return get rowset | | |
880 | | // |<---------------------------| | |
881 | | // | | commit job | |
882 | | // | |<------------------------| |
883 | | // | | return commit job | |
884 | | // | |------------------------>| |
885 | | // | get delete bitmap | | |
886 | | // |--------------------------->| | |
887 | | // | return get delete bitmap | | |
888 | | // |<---------------------------| | |
889 | | // | | | |
890 | 27.4k | if (res.status().code() == MetaServiceCode::ROWSETS_EXPIRED) { |
891 | 0 | return Status::Error<ErrorCode::ROWSETS_EXPIRED, false>("failed to get delete bitmap: {}", |
892 | 0 | res.status().msg()); |
893 | 0 | } |
894 | 27.4k | if (res.status().code() != MetaServiceCode::OK) { |
895 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to get delete bitmap: {}", |
896 | 0 | res.status().msg()); |
897 | 0 | } |
898 | 27.4k | return Status::OK(); |
899 | 27.4k | } |
900 | | |
901 | | Status CloudMetaMgr::_get_delete_bitmap_from_ms_by_batch(GetDeleteBitmapRequest& req, |
902 | | GetDeleteBitmapResponse& res, |
903 | 27.0k | int64_t bytes_threadhold) { |
904 | 27.0k | std::unordered_set<std::string> finished_rowset_ids {}; |
905 | 27.0k | int count = 0; |
906 | 27.3k | do { |
907 | 27.3k | GetDeleteBitmapRequest cur_req; |
908 | 27.3k | GetDeleteBitmapResponse cur_res; |
909 | | |
910 | 27.3k | cur_req.set_cloud_unique_id(config::cloud_unique_id); |
911 | 27.3k | cur_req.set_tablet_id(req.tablet_id()); |
912 | 27.3k | cur_req.set_base_compaction_cnt(req.base_compaction_cnt()); |
913 | 27.3k | cur_req.set_cumulative_compaction_cnt(req.cumulative_compaction_cnt()); |
914 | 27.3k | cur_req.set_cumulative_point(req.cumulative_point()); |
915 | 27.3k | *(cur_req.mutable_idx()) = req.idx(); |
916 | 27.3k | cur_req.set_store_version(req.store_version()); |
917 | 27.4k | if (bytes_threadhold > 0) { |
918 | 27.4k | cur_req.set_dbm_bytes_threshold(bytes_threadhold); |
919 | 27.4k | } |
920 | 58.9k | for (int i = 0; i < req.rowset_ids_size(); i++) { |
921 | 31.5k | if (!finished_rowset_ids.contains(req.rowset_ids(i))) { |
922 | 30.6k | cur_req.add_rowset_ids(req.rowset_ids(i)); |
923 | 30.6k | cur_req.add_begin_versions(req.begin_versions(i)); |
924 | 30.6k | cur_req.add_end_versions(req.end_versions(i)); |
925 | 30.6k | } |
926 | 31.5k | } |
927 | | |
928 | 27.3k | RETURN_IF_ERROR(_get_delete_bitmap_from_ms(cur_req, cur_res)); |
929 | 27.3k | ++count; |
930 | | |
931 | | // v1 delete bitmap |
932 | 27.3k | res.mutable_rowset_ids()->MergeFrom(cur_res.rowset_ids()); |
933 | 27.3k | res.mutable_segment_ids()->MergeFrom(cur_res.segment_ids()); |
934 | 27.3k | res.mutable_versions()->MergeFrom(cur_res.versions()); |
935 | 27.3k | res.mutable_segment_delete_bitmaps()->MergeFrom(cur_res.segment_delete_bitmaps()); |
936 | | |
937 | | // v2 delete bitmap |
938 | 27.3k | res.mutable_delta_rowset_ids()->MergeFrom(cur_res.delta_rowset_ids()); |
939 | 27.3k | res.mutable_delete_bitmap_storages()->MergeFrom(cur_res.delete_bitmap_storages()); |
940 | | |
941 | 29.9k | for (const auto& rowset_id : cur_res.returned_rowset_ids()) { |
942 | 29.9k | finished_rowset_ids.insert(rowset_id); |
943 | 29.9k | } |
944 | | |
945 | 27.3k | bool has_more = cur_res.has_has_more() && cur_res.has_more(); |
946 | 27.3k | if (!has_more) { |
947 | 27.1k | break; |
948 | 27.1k | } |
949 | 161 | LOG_INFO("batch get delete bitmap, progress={}/{}", finished_rowset_ids.size(), |
950 | 161 | req.rowset_ids_size()) |
951 | 161 | .tag("tablet_id", req.tablet_id()) |
952 | 161 | .tag("cur_returned_rowsets", cur_res.returned_rowset_ids_size()) |
953 | 161 | .tag("rpc_count", count); |
954 | 161 | } while (finished_rowset_ids.size() < req.rowset_ids_size()); |
955 | 27.0k | return Status::OK(); |
956 | 27.0k | } |
957 | | |
958 | | Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version, |
959 | | std::ranges::range auto&& rs_metas, |
960 | | const TabletStatsPB& stats, const TabletIndexPB& idx, |
961 | | DeleteBitmap* delete_bitmap, bool full_sync, |
962 | | SyncRowsetStats* sync_stats, int32_t read_version, |
963 | 51.3k | bool full_sync_v2) { |
964 | 51.3k | if (rs_metas.empty()) { |
965 | 23.7k | return Status::OK(); |
966 | 23.7k | } |
967 | | |
968 | 27.5k | if (!full_sync && config::enable_sync_tablet_delete_bitmap_by_cache && |
969 | 27.5k | sync_tablet_delete_bitmap_by_cache(tablet, rs_metas, delete_bitmap)) { |
970 | 405 | if (sync_stats) { |
971 | 76 | sync_stats->get_local_delete_bitmap_rowsets_num += rs_metas.size(); |
972 | 76 | } |
973 | 405 | return Status::OK(); |
974 | 27.1k | } else { |
975 | 27.1k | DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id()); |
976 | 27.1k | *delete_bitmap = *new_delete_bitmap; |
977 | 27.1k | } |
978 | | |
979 | 27.1k | if (read_version == 2 && config::delete_bitmap_store_write_version == 1) { |
980 | 0 | return Status::InternalError( |
981 | 0 | "please set delete_bitmap_store_read_version to 1 or 3 because " |
982 | 0 | "delete_bitmap_store_write_version is 1"); |
983 | 27.1k | } else if (read_version == 1 && config::delete_bitmap_store_write_version == 2) { |
984 | 0 | return Status::InternalError( |
985 | 0 | "please set delete_bitmap_store_read_version to 2 or 3 because " |
986 | 0 | "delete_bitmap_store_write_version is 2"); |
987 | 0 | } |
988 | | |
989 | 27.1k | int64_t new_max_version = std::max(old_max_version, rs_metas.rbegin()->end_version()); |
990 | | // When there are many delete bitmaps that need to be synchronized, it |
991 | | // may take a longer time, especially when loading the tablet for the |
992 | | // first time, so set a relatively long timeout time. |
993 | 27.1k | GetDeleteBitmapRequest req; |
994 | 27.1k | GetDeleteBitmapResponse res; |
995 | 27.1k | req.set_cloud_unique_id(config::cloud_unique_id); |
996 | 27.1k | req.set_tablet_id(tablet->tablet_id()); |
997 | 27.1k | req.set_base_compaction_cnt(stats.base_compaction_cnt()); |
998 | 27.1k | req.set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt()); |
999 | 27.1k | req.set_cumulative_point(stats.cumulative_point()); |
1000 | 27.1k | *(req.mutable_idx()) = idx; |
1001 | 27.1k | req.set_store_version(read_version); |
1002 | | // New rowset sync all versions of delete bitmap |
1003 | 29.7k | for (const auto& rs_meta : rs_metas) { |
1004 | 29.7k | req.add_rowset_ids(rs_meta.rowset_id_v2()); |
1005 | 29.7k | req.add_begin_versions(0); |
1006 | 29.7k | req.add_end_versions(new_max_version); |
1007 | 29.7k | } |
1008 | | |
1009 | 27.1k | if (!full_sync_v2) { |
1010 | | // old rowset sync incremental versions of delete bitmap |
1011 | 27.1k | if (old_max_version > 0 && old_max_version < new_max_version) { |
1012 | 13 | RowsetIdUnorderedSet all_rs_ids; |
1013 | 13 | RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids)); |
1014 | 87 | for (const auto& rs_id : all_rs_ids) { |
1015 | 87 | req.add_rowset_ids(rs_id.to_string()); |
1016 | 87 | req.add_begin_versions(old_max_version + 1); |
1017 | 87 | req.add_end_versions(new_max_version); |
1018 | 87 | } |
1019 | 13 | } |
1020 | 18.4E | } else { |
1021 | 18.4E | if (old_max_version > 0) { |
1022 | 0 | RowsetIdUnorderedSet all_rs_ids; |
1023 | 0 | RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids)); |
1024 | 0 | for (const auto& rs_id : all_rs_ids) { |
1025 | 0 | req.add_rowset_ids(rs_id.to_string()); |
1026 | 0 | req.add_begin_versions(0); |
1027 | 0 | req.add_end_versions(new_max_version); |
1028 | 0 | } |
1029 | 0 | } |
1030 | 18.4E | } |
1031 | 27.1k | if (sync_stats) { |
1032 | 9.23k | sync_stats->get_remote_delete_bitmap_rowsets_num += req.rowset_ids_size(); |
1033 | 9.23k | } |
1034 | | |
1035 | 27.1k | auto start = std::chrono::steady_clock::now(); |
1036 | 27.1k | if (config::enable_batch_get_delete_bitmap) { |
1037 | 27.1k | RETURN_IF_ERROR(_get_delete_bitmap_from_ms_by_batch( |
1038 | 27.1k | req, res, config::get_delete_bitmap_bytes_threshold)); |
1039 | 27.1k | } else { |
1040 | 24 | RETURN_IF_ERROR(_get_delete_bitmap_from_ms(req, res)); |
1041 | 24 | } |
1042 | 27.1k | auto end = std::chrono::steady_clock::now(); |
1043 | | |
1044 | | // v1 delete bitmap |
1045 | 27.1k | const auto& rowset_ids = res.rowset_ids(); |
1046 | 27.1k | const auto& segment_ids = res.segment_ids(); |
1047 | 27.1k | const auto& vers = res.versions(); |
1048 | 27.1k | const auto& delete_bitmaps = res.segment_delete_bitmaps(); |
1049 | 27.1k | if (rowset_ids.size() != segment_ids.size() || rowset_ids.size() != vers.size() || |
1050 | 27.2k | rowset_ids.size() != delete_bitmaps.size()) { |
1051 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
1052 | 0 | "get delete bitmap data wrong," |
1053 | 0 | "rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}", |
1054 | 0 | rowset_ids.size(), segment_ids.size(), vers.size(), delete_bitmaps.size()); |
1055 | 0 | } |
1056 | 27.4k | for (int i = 0; i < rowset_ids.size(); i++) { |
1057 | 324 | RowsetId rst_id; |
1058 | 324 | rst_id.init(rowset_ids[i]); |
1059 | 324 | delete_bitmap->merge( |
1060 | 324 | {rst_id, segment_ids[i], vers[i]}, |
1061 | 324 | roaring::Roaring::readSafe(delete_bitmaps[i].data(), delete_bitmaps[i].length())); |
1062 | 324 | } |
1063 | | // v2 delete bitmap |
1064 | 27.1k | const auto& delta_rowset_ids = res.delta_rowset_ids(); |
1065 | 27.1k | const auto& delete_bitmap_storages = res.delete_bitmap_storages(); |
1066 | 27.1k | if (delta_rowset_ids.size() != delete_bitmap_storages.size()) { |
1067 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
1068 | 0 | "get delete bitmap data wrong, delta_rowset_ids.size={}, " |
1069 | 0 | "delete_bitmap_storages.size={}", |
1070 | 0 | delta_rowset_ids.size(), delete_bitmap_storages.size()); |
1071 | 0 | } |
1072 | 27.1k | int64_t remote_delete_bitmap_bytes = 0; |
1073 | 27.1k | RETURN_IF_ERROR(_read_tablet_delete_bitmap_v2(tablet, old_max_version, rs_metas, delete_bitmap, |
1074 | 27.1k | res, remote_delete_bitmap_bytes, full_sync_v2)); |
1075 | | |
1076 | 27.1k | if (sync_stats) { |
1077 | 9.25k | sync_stats->get_remote_delete_bitmap_rpc_ns += |
1078 | 9.25k | std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count(); |
1079 | 9.25k | sync_stats->get_remote_delete_bitmap_key_count += |
1080 | 9.25k | delete_bitmaps.size() + delete_bitmap_storages.size(); |
1081 | 9.25k | for (const auto& dbm : delete_bitmaps) { |
1082 | 310 | sync_stats->get_remote_delete_bitmap_bytes += dbm.length(); |
1083 | 310 | } |
1084 | 9.25k | sync_stats->get_remote_delete_bitmap_bytes += remote_delete_bitmap_bytes; |
1085 | 9.25k | } |
1086 | 27.1k | int64_t latency = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count(); |
1087 | 27.1k | if (latency > 100 * 1000) { // 100ms |
1088 | 15 | LOG(INFO) << "finish get_delete_bitmap rpcs. rowset_ids.size()=" << rowset_ids.size() |
1089 | 15 | << ", delete_bitmaps.size()=" << delete_bitmaps.size() |
1090 | 15 | << ", delta_delete_bitmaps.size()=" << delta_rowset_ids.size() |
1091 | 15 | << ", latency=" << latency << "us, read_version=" << read_version; |
1092 | 27.1k | } else { |
1093 | 27.1k | LOG_EVERY_N(INFO, 100) << "finish get_delete_bitmap rpcs. rowset_ids.size()=" |
1094 | 271 | << rowset_ids.size() |
1095 | 271 | << ", delete_bitmaps.size()=" << delete_bitmaps.size() |
1096 | 271 | << ", delta_delete_bitmaps.size()=" << delta_rowset_ids.size() |
1097 | 271 | << ", latency=" << latency << "us, read_version=" << read_version; |
1098 | 27.1k | } |
1099 | 27.1k | return Status::OK(); |
1100 | 27.1k | } |
1101 | | |
1102 | | Status CloudMetaMgr::_check_delete_bitmap_v2_correctness(CloudTablet* tablet, GetRowsetRequest& req, |
1103 | | GetRowsetResponse& resp, |
1104 | 51.3k | int64_t old_max_version) { |
1105 | 51.3k | if (!config::enable_delete_bitmap_store_v2_check_correctness || |
1106 | 51.3k | config::delete_bitmap_store_write_version == 1 || resp.rowset_meta().empty()) { |
1107 | 51.3k | return Status::OK(); |
1108 | 51.3k | } |
1109 | 18.4E | int64_t tablet_id = tablet->tablet_id(); |
1110 | 18.4E | int64_t new_max_version = std::max(old_max_version, resp.rowset_meta().rbegin()->end_version()); |
1111 | | // rowset_id, num_segments |
1112 | 18.4E | std::vector<std::pair<RowsetId, int64_t>> all_rowsets; |
1113 | 18.4E | std::map<std::string, std::string> rowset_to_resource; |
1114 | 18.4E | for (const auto& rs_meta : resp.rowset_meta()) { |
1115 | 0 | RowsetId rowset_id; |
1116 | 0 | rowset_id.init(rs_meta.rowset_id_v2()); |
1117 | 0 | all_rowsets.emplace_back(std::make_pair(rowset_id, rs_meta.num_segments())); |
1118 | 0 | rowset_to_resource[rs_meta.rowset_id_v2()] = rs_meta.resource_id(); |
1119 | 0 | } |
1120 | 18.4E | if (old_max_version > 0) { |
1121 | 0 | RowsetIdUnorderedSet all_rs_ids; |
1122 | 0 | RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids)); |
1123 | 0 | for (auto& rowset : tablet->get_rowset_by_ids(&all_rs_ids)) { |
1124 | 0 | all_rowsets.emplace_back(std::make_pair(rowset->rowset_id(), rowset->num_segments())); |
1125 | 0 | rowset_to_resource[rowset->rowset_id().to_string()] = |
1126 | 0 | rowset->rowset_meta()->resource_id(); |
1127 | 0 | } |
1128 | 0 | } |
1129 | | |
1130 | 18.4E | auto compare_delete_bitmap = [&](DeleteBitmap* delete_bitmap, int version) { |
1131 | 0 | bool success = true; |
1132 | 0 | for (auto& [rs_id, num_segments] : all_rowsets) { |
1133 | 0 | for (int seg_id = 0; seg_id < num_segments; ++seg_id) { |
1134 | 0 | DeleteBitmap::BitmapKey key = {rs_id, seg_id, new_max_version}; |
1135 | 0 | auto dm1 = tablet->tablet_meta()->delete_bitmap().get_agg(key); |
1136 | 0 | auto dm2 = delete_bitmap->get_agg_without_cache(key); |
1137 | 0 | if (*dm1 != *dm2) { |
1138 | 0 | success = false; |
1139 | 0 | LOG(WARNING) << "failed to check delete bitmap correctness by v" |
1140 | 0 | << std::to_string(version) << ", tablet_id=" << tablet->tablet_id() |
1141 | 0 | << ", rowset_id=" << rs_id.to_string() << ", segment_id=" << seg_id |
1142 | 0 | << ", max_version=" << new_max_version |
1143 | 0 | << ". size1=" << dm1->cardinality() |
1144 | 0 | << ", size2=" << dm2->cardinality(); |
1145 | 0 | } |
1146 | 0 | } |
1147 | 0 | } |
1148 | 0 | if (success) { |
1149 | 0 | LOG(INFO) << "succeed to check delete bitmap correctness by v" |
1150 | 0 | << std::to_string(version) << ", tablet_id=" << tablet->tablet_id() |
1151 | 0 | << ", max_version=" << new_max_version; |
1152 | 0 | } |
1153 | 0 | }; |
1154 | | |
1155 | 18.4E | DeleteBitmap full_delete_bitmap(tablet_id); |
1156 | 18.4E | auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(), resp.stats(), |
1157 | 18.4E | req.idx(), &full_delete_bitmap, false, nullptr, 2, true); |
1158 | 18.4E | if (!st.ok()) { |
1159 | 0 | LOG_WARNING("failed to check delete bitmap correctness by v2") |
1160 | 0 | .tag("tablet", tablet->tablet_id()) |
1161 | 0 | .error(st); |
1162 | 18.4E | } else { |
1163 | 18.4E | compare_delete_bitmap(&full_delete_bitmap, 2); |
1164 | 18.4E | } |
1165 | 18.4E | return Status::OK(); |
1166 | 18.4E | } |
1167 | | |
1168 | | Status CloudMetaMgr::_read_tablet_delete_bitmap_v2(CloudTablet* tablet, int64_t old_max_version, |
1169 | | std::ranges::range auto&& rs_metas, |
1170 | | DeleteBitmap* delete_bitmap, |
1171 | | GetDeleteBitmapResponse& res, |
1172 | | int64_t& remote_delete_bitmap_bytes, |
1173 | 27.1k | bool full_sync_v2) { |
1174 | 27.1k | if (res.delta_rowset_ids().empty()) { |
1175 | 27.1k | return Status::OK(); |
1176 | 27.1k | } |
1177 | 18.4E | const auto& rowset_ids = res.delta_rowset_ids(); |
1178 | 18.4E | const auto& delete_bitmap_storages = res.delete_bitmap_storages(); |
1179 | 18.4E | RowsetIdUnorderedSet all_rs_ids; |
1180 | 18.4E | std::map<std::string, std::string> rowset_to_resource; |
1181 | 18.4E | if (old_max_version > 0) { |
1182 | 0 | RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids)); |
1183 | 0 | if (full_sync_v2) { |
1184 | 0 | for (auto& rowset : tablet->get_rowset_by_ids(&all_rs_ids)) { |
1185 | 0 | rowset_to_resource[rowset->rowset_id().to_string()] = |
1186 | 0 | rowset->rowset_meta()->resource_id(); |
1187 | 0 | } |
1188 | 0 | } |
1189 | 0 | } |
1190 | 18.4E | for (const auto& rs_meta : rs_metas) { |
1191 | 0 | RowsetId rs_id; |
1192 | 0 | rs_id.init(rs_meta.rowset_id_v2()); |
1193 | 0 | all_rs_ids.emplace(rs_id); |
1194 | 0 | rowset_to_resource[rs_meta.rowset_id_v2()] = rs_meta.resource_id(); |
1195 | 0 | } |
1196 | 18.4E | if (config::enable_mow_verbose_log) { |
1197 | 0 | LOG(INFO) << "read delete bitmap for tablet_id=" << tablet->tablet_id() |
1198 | 0 | << ", old_max_version=" << old_max_version |
1199 | 0 | << ", new rowset num=" << rs_metas.size() |
1200 | 0 | << ", rowset has delete bitmap num=" << rowset_ids.size() |
1201 | 0 | << ". all rowset num=" << all_rs_ids.size(); |
1202 | 0 | } |
1203 | | |
1204 | 18.4E | std::mutex result_mtx; |
1205 | 18.4E | Status result; |
1206 | 18.4E | auto merge_delete_bitmap = [&](const std::string& rowset_id, DeleteBitmapPB& dbm) { |
1207 | 0 | if (dbm.rowset_ids_size() != dbm.segment_ids_size() || |
1208 | 0 | dbm.rowset_ids_size() != dbm.versions_size() || |
1209 | 0 | dbm.rowset_ids_size() != dbm.segment_delete_bitmaps_size()) { |
1210 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
1211 | 0 | "get delete bitmap data wrong, rowset_id={}" |
1212 | 0 | "rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}", |
1213 | 0 | rowset_id, dbm.rowset_ids_size(), dbm.segment_ids_size(), dbm.versions_size(), |
1214 | 0 | dbm.segment_delete_bitmaps_size()); |
1215 | 0 | } |
1216 | 0 | if (config::enable_mow_verbose_log) { |
1217 | 0 | LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id() |
1218 | 0 | << ", rowset_id=" << rowset_id |
1219 | 0 | << ", delete_bitmap num=" << dbm.segment_delete_bitmaps_size(); |
1220 | 0 | } |
1221 | 0 | std::lock_guard lock(result_mtx); |
1222 | 0 | for (int j = 0; j < dbm.rowset_ids_size(); j++) { |
1223 | 0 | RowsetId rst_id; |
1224 | 0 | rst_id.init(dbm.rowset_ids(j)); |
1225 | 0 | if (!all_rs_ids.contains(rst_id)) { |
1226 | 0 | LOG(INFO) << "skip merge delete bitmap for tablet_id=" << tablet->tablet_id() |
1227 | 0 | << ", rowset_id=" << rowset_id << ", unused rowset_id=" << rst_id; |
1228 | 0 | continue; |
1229 | 0 | } |
1230 | 0 | delete_bitmap->merge( |
1231 | 0 | {rst_id, dbm.segment_ids(j), dbm.versions(j)}, |
1232 | 0 | roaring::Roaring::readSafe(dbm.segment_delete_bitmaps(j).data(), |
1233 | 0 | dbm.segment_delete_bitmaps(j).length())); |
1234 | 0 | remote_delete_bitmap_bytes += dbm.segment_delete_bitmaps(j).length(); |
1235 | 0 | } |
1236 | 0 | return Status::OK(); |
1237 | 0 | }; |
1238 | 18.4E | auto get_delete_bitmap_from_file = [&](const std::string& rowset_id, |
1239 | 18.4E | const DeleteBitmapStoragePB& storage) { |
1240 | 0 | if (config::enable_mow_verbose_log) { |
1241 | 0 | LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id() |
1242 | 0 | << ", rowset_id=" << rowset_id << " from file" |
1243 | 0 | << ", is_packed=" << storage.has_packed_slice_location(); |
1244 | 0 | } |
1245 | 0 | if (rowset_to_resource.find(rowset_id) == rowset_to_resource.end()) { |
1246 | 0 | return Status::InternalError("vault id not found for tablet_id={}, rowset_id={}", |
1247 | 0 | tablet->tablet_id(), rowset_id); |
1248 | 0 | } |
1249 | 0 | auto resource_id = rowset_to_resource[rowset_id]; |
1250 | 0 | CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud(); |
1251 | 0 | auto storage_resource = engine.get_storage_resource(resource_id); |
1252 | 0 | if (!storage_resource) { |
1253 | 0 | return Status::InternalError("vault id not found, maybe not sync, vault id {}", |
1254 | 0 | resource_id); |
1255 | 0 | } |
1256 | | |
1257 | | // Use packed file reader if packed_slice_location is present |
1258 | 0 | std::unique_ptr<DeleteBitmapFileReader> reader; |
1259 | 0 | if (storage.has_packed_slice_location() && |
1260 | 0 | !storage.packed_slice_location().packed_file_path().empty()) { |
1261 | 0 | reader = std::make_unique<DeleteBitmapFileReader>(tablet->tablet_id(), rowset_id, |
1262 | 0 | storage_resource, |
1263 | 0 | storage.packed_slice_location()); |
1264 | 0 | } else { |
1265 | 0 | reader = std::make_unique<DeleteBitmapFileReader>(tablet->tablet_id(), rowset_id, |
1266 | 0 | storage_resource); |
1267 | 0 | } |
1268 | |
|
1269 | 0 | RETURN_IF_ERROR(reader->init()); |
1270 | 0 | DeleteBitmapPB dbm; |
1271 | 0 | RETURN_IF_ERROR(reader->read(dbm)); |
1272 | 0 | RETURN_IF_ERROR(reader->close()); |
1273 | 0 | return merge_delete_bitmap(rowset_id, dbm); |
1274 | 0 | }; |
1275 | 18.4E | CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud(); |
1276 | 18.4E | std::unique_ptr<ThreadPoolToken> token = engine.sync_delete_bitmap_thread_pool().new_token( |
1277 | 18.4E | ThreadPool::ExecutionMode::CONCURRENT); |
1278 | 18.4E | bthread::CountdownEvent wait {rowset_ids.size()}; |
1279 | 18.4E | for (int i = 0; i < rowset_ids.size(); i++) { |
1280 | 0 | auto& rowset_id = rowset_ids[i]; |
1281 | 0 | if (delete_bitmap_storages[i].store_in_fdb()) { |
1282 | 0 | wait.signal(); |
1283 | 0 | DeleteBitmapPB dbm = delete_bitmap_storages[i].delete_bitmap(); |
1284 | 0 | RETURN_IF_ERROR(merge_delete_bitmap(rowset_id, dbm)); |
1285 | 0 | } else { |
1286 | 0 | const auto& storage = delete_bitmap_storages[i]; |
1287 | 0 | auto submit_st = token->submit_func([&, rowset_id, storage]() { |
1288 | 0 | auto status = get_delete_bitmap_from_file(rowset_id, storage); |
1289 | 0 | if (!status.ok()) { |
1290 | 0 | LOG(WARNING) << "failed to get delete bitmap for tablet_id=" |
1291 | 0 | << tablet->tablet_id() << ", rowset_id=" << rowset_id |
1292 | 0 | << " from file, st=" << status.to_string(); |
1293 | 0 | std::lock_guard lock(result_mtx); |
1294 | 0 | if (result.ok()) { |
1295 | 0 | result = status; |
1296 | 0 | } |
1297 | 0 | } |
1298 | 0 | wait.signal(); |
1299 | 0 | }); |
1300 | 0 | RETURN_IF_ERROR(submit_st); |
1301 | 0 | } |
1302 | 0 | } |
1303 | | // wait for all finished |
1304 | 18.4E | wait.wait(); |
1305 | 18.4E | token->wait(); |
1306 | 18.4E | return result; |
1307 | 18.4E | } |
1308 | | |
1309 | | Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id, |
1310 | 74.4k | RowsetMetaSharedPtr* existed_rs_meta) { |
1311 | 74.4k | VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id() |
1312 | 67 | << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id(); |
1313 | 74.4k | { |
1314 | 74.4k | Status ret_st; |
1315 | 74.4k | TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_rowset", ret_st); |
1316 | 74.4k | } |
1317 | 74.4k | CreateRowsetRequest req; |
1318 | 74.4k | CreateRowsetResponse resp; |
1319 | 74.4k | req.set_cloud_unique_id(config::cloud_unique_id); |
1320 | 74.4k | req.set_txn_id(rs_meta.txn_id()); |
1321 | 74.4k | req.set_tablet_job_id(job_id); |
1322 | | |
1323 | 74.4k | RowsetMetaPB doris_rs_meta = rs_meta.get_rowset_pb(/*skip_schema=*/true); |
1324 | 74.4k | doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(doris_rs_meta)); |
1325 | | |
1326 | 74.4k | Status st = retry_rpc("prepare rowset", req, &resp, &MetaService_Stub::prepare_rowset); |
1327 | 74.4k | if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) { |
1328 | 0 | if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) { |
1329 | 0 | RowsetMetaPB doris_rs_meta_tmp = |
1330 | 0 | cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta())); |
1331 | 0 | *existed_rs_meta = std::make_shared<RowsetMeta>(); |
1332 | 0 | (*existed_rs_meta)->init_from_pb(doris_rs_meta_tmp); |
1333 | 0 | } |
1334 | 0 | return Status::AlreadyExist("failed to prepare rowset: {}", resp.status().msg()); |
1335 | 0 | } |
1336 | 74.4k | return st; |
1337 | 74.4k | } |
1338 | | |
1339 | | Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_id, |
1340 | 74.2k | RowsetMetaSharedPtr* existed_rs_meta) { |
1341 | 74.2k | VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id() |
1342 | 102 | << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id(); |
1343 | 74.2k | { |
1344 | 74.2k | Status ret_st; |
1345 | 74.2k | TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_rowset", ret_st); |
1346 | 74.2k | } |
1347 | 74.2k | check_table_size_correctness(rs_meta); |
1348 | 74.2k | CreateRowsetRequest req; |
1349 | 74.2k | CreateRowsetResponse resp; |
1350 | 74.2k | req.set_cloud_unique_id(config::cloud_unique_id); |
1351 | 74.2k | req.set_txn_id(rs_meta.txn_id()); |
1352 | 74.2k | req.set_tablet_job_id(job_id); |
1353 | | |
1354 | 74.2k | RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(); |
1355 | 74.2k | doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb)); |
1356 | 74.2k | Status st = retry_rpc("commit rowset", req, &resp, &MetaService_Stub::commit_rowset); |
1357 | 74.2k | if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) { |
1358 | 0 | if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) { |
1359 | 0 | RowsetMetaPB doris_rs_meta = |
1360 | 0 | cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta())); |
1361 | 0 | *existed_rs_meta = std::make_shared<RowsetMeta>(); |
1362 | 0 | (*existed_rs_meta)->init_from_pb(doris_rs_meta); |
1363 | 0 | } |
1364 | 0 | return Status::AlreadyExist("failed to commit rowset: {}", resp.status().msg()); |
1365 | 0 | } |
1366 | 74.2k | int64_t timeout_ms = -1; |
1367 | | // if the `job_id` is not empty, it means this rowset was produced by a compaction job. |
1368 | 74.2k | if (config::enable_compaction_delay_commit_for_warm_up && !job_id.empty()) { |
1369 | | // 1. assume the download speed is 100MB/s |
1370 | | // 2. we double the download time as timeout for safety |
1371 | | // 3. for small rowsets, the timeout we calculate maybe quite small, so we need a min_time_out |
1372 | 0 | const double speed_mbps = 100.0; // 100MB/s |
1373 | 0 | const double safety_factor = 2.0; |
1374 | 0 | timeout_ms = std::min( |
1375 | 0 | std::max(static_cast<int64_t>(static_cast<double>(rs_meta.total_disk_size()) / |
1376 | 0 | (speed_mbps * 1024 * 1024) * safety_factor * 1000), |
1377 | 0 | config::warm_up_rowset_sync_wait_min_timeout_ms), |
1378 | 0 | config::warm_up_rowset_sync_wait_max_timeout_ms); |
1379 | 0 | LOG(INFO) << "warm up rowset: " << rs_meta.version() << ", job_id: " << job_id |
1380 | 0 | << ", with timeout: " << timeout_ms << " ms"; |
1381 | 0 | } |
1382 | 74.2k | auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); |
1383 | 74.2k | manager.warm_up_rowset(rs_meta, timeout_ms); |
1384 | 74.2k | return st; |
1385 | 74.2k | } |
1386 | | |
1387 | 34.8k | void CloudMetaMgr::cache_committed_rowset(RowsetMetaSharedPtr rs_meta, int64_t expiration_time) { |
1388 | | // For load-generated rowsets (job_id is empty), add to pending rowset manager |
1389 | | // so FE can notify BE to promote them later |
1390 | | |
1391 | | // TODO(bobhan1): copy rs_meta? |
1392 | 34.8k | int64_t txn_id = rs_meta->txn_id(); |
1393 | 34.8k | int64_t tablet_id = rs_meta->tablet_id(); |
1394 | 34.8k | ExecEnv::GetInstance()->storage_engine().to_cloud().committed_rs_mgr().add_committed_rowset( |
1395 | 34.8k | txn_id, tablet_id, std::move(rs_meta), expiration_time); |
1396 | 34.8k | } |
1397 | | |
1398 | 19 | Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) { |
1399 | 19 | VLOG_DEBUG << "update committed rowset, tablet_id: " << rs_meta.tablet_id() |
1400 | 0 | << ", rowset_id: " << rs_meta.rowset_id(); |
1401 | 19 | CreateRowsetRequest req; |
1402 | 19 | CreateRowsetResponse resp; |
1403 | 19 | req.set_cloud_unique_id(config::cloud_unique_id); |
1404 | | |
1405 | | // Variant schema maybe updated, so we need to update the schema as well. |
1406 | | // The updated rowset meta after `rowset->merge_rowset_meta` in `BaseTablet::update_delete_bitmap` |
1407 | | // will be lost in `update_tmp_rowset` if skip_schema.So in order to keep the latest schema we should keep schema in update_tmp_rowset |
1408 | | // for variant type |
1409 | 19 | bool skip_schema = rs_meta.tablet_schema()->num_variant_columns() == 0; |
1410 | 19 | RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(skip_schema); |
1411 | 19 | doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb)); |
1412 | 19 | Status st = |
1413 | 19 | retry_rpc("update committed rowset", req, &resp, &MetaService_Stub::update_tmp_rowset); |
1414 | 19 | if (!st.ok() && resp.status().code() == MetaServiceCode::ROWSET_META_NOT_FOUND) { |
1415 | 0 | return Status::InternalError("failed to update committed rowset: {}", resp.status().msg()); |
1416 | 0 | } |
1417 | 19 | return st; |
1418 | 19 | } |
1419 | | |
1420 | | // async send TableStats(in res) to FE coz we are in streamload ctx, response to the user ASAP |
1421 | | static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id, |
1422 | | const std::string& label, CommitTxnResponse& res, |
1423 | 7.48k | const std::vector<int64_t>& tablet_ids) { |
1424 | 7.48k | std::string protobufBytes; |
1425 | 7.48k | if (txn_id != -1) { |
1426 | 0 | res.SerializeToString(&protobufBytes); |
1427 | 0 | } |
1428 | 7.48k | auto st = ExecEnv::GetInstance()->send_table_stats_thread_pool()->submit_func( |
1429 | 7.49k | [db_id, txn_id, label, protobufBytes, tablet_ids]() -> Status { |
1430 | 7.49k | TReportCommitTxnResultRequest request; |
1431 | 7.49k | TStatus result; |
1432 | | |
1433 | 7.49k | if (txn_id != -1 && protobufBytes.length() <= 0) { |
1434 | 0 | LOG(WARNING) << "protobufBytes: " << protobufBytes.length(); |
1435 | 0 | return Status::OK(); // nobody cares the return status |
1436 | 0 | } |
1437 | | |
1438 | 7.49k | request.__set_dbId(db_id); |
1439 | 7.49k | request.__set_txnId(txn_id); |
1440 | 7.49k | request.__set_label(label); |
1441 | 7.49k | request.__set_payload(protobufBytes); |
1442 | 7.49k | request.__set_tabletIds(tablet_ids); |
1443 | | |
1444 | 7.49k | Status status; |
1445 | 7.49k | int64_t duration_ns = 0; |
1446 | 7.49k | TNetworkAddress master_addr = |
1447 | 7.49k | ExecEnv::GetInstance()->cluster_info()->master_fe_addr; |
1448 | 7.49k | if (master_addr.hostname.empty() || master_addr.port == 0) { |
1449 | 0 | status = Status::Error<SERVICE_UNAVAILABLE>( |
1450 | 0 | "Have not get FE Master heartbeat yet"); |
1451 | 7.49k | } else { |
1452 | 7.49k | SCOPED_RAW_TIMER(&duration_ns); |
1453 | | |
1454 | 7.49k | RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
1455 | 7.49k | master_addr.hostname, master_addr.port, |
1456 | 7.49k | [&request, &result](FrontendServiceConnection& client) { |
1457 | 7.49k | client->reportCommitTxnResult(result, request); |
1458 | 7.49k | })); |
1459 | | |
1460 | 7.49k | status = Status::create<false>(result); |
1461 | 7.49k | } |
1462 | 7.49k | g_cloud_commit_txn_resp_redirect_latency << duration_ns / 1000; |
1463 | | |
1464 | 7.49k | if (!status.ok()) { |
1465 | 0 | LOG(WARNING) << "TableStats report RPC to FE failed, errmsg=" << status |
1466 | 0 | << " dbId=" << db_id << " txnId=" << txn_id << " label=" << label; |
1467 | 0 | return Status::OK(); // nobody cares the return status |
1468 | 7.49k | } else { |
1469 | 7.49k | LOG(INFO) << "TableStats report RPC to FE success, msg=" << status |
1470 | 7.49k | << " dbId=" << db_id << " txnId=" << txn_id << " label=" << label; |
1471 | 7.49k | return Status::OK(); |
1472 | 7.49k | } |
1473 | 7.49k | }); |
1474 | 7.48k | if (!st.ok()) { |
1475 | 0 | LOG(WARNING) << "TableStats report to FE task submission failed: " << st.to_string(); |
1476 | 0 | } |
1477 | 7.48k | } |
1478 | | |
1479 | 0 | Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) { |
1480 | 0 | VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id |
1481 | 0 | << ", label: " << ctx.label << ", is_2pc: " << is_2pc; |
1482 | 0 | { |
1483 | 0 | Status ret_st; |
1484 | 0 | TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_txn", ret_st); |
1485 | 0 | } |
1486 | 0 | CommitTxnRequest req; |
1487 | 0 | CommitTxnResponse res; |
1488 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1489 | 0 | req.set_db_id(ctx.db_id); |
1490 | 0 | req.set_txn_id(ctx.txn_id); |
1491 | 0 | req.set_is_2pc(is_2pc); |
1492 | 0 | req.set_enable_txn_lazy_commit(config::enable_cloud_txn_lazy_commit); |
1493 | 0 | auto st = retry_rpc("commit txn", req, &res, &MetaService_Stub::commit_txn); |
1494 | |
|
1495 | 0 | if (st.ok()) { |
1496 | 0 | std::vector<int64_t> tablet_ids; |
1497 | 0 | for (auto& commit_info : ctx.commit_infos) { |
1498 | 0 | tablet_ids.emplace_back(commit_info.tabletId); |
1499 | 0 | } |
1500 | 0 | send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, res, tablet_ids); |
1501 | 0 | } |
1502 | |
|
1503 | 0 | return st; |
1504 | 0 | } |
1505 | | |
1506 | 487 | Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) { |
1507 | 487 | VLOG_DEBUG << "abort txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id |
1508 | 0 | << ", label: " << ctx.label; |
1509 | 487 | { |
1510 | 487 | Status ret_st; |
1511 | 487 | TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::abort_txn", ret_st); |
1512 | 487 | } |
1513 | 487 | AbortTxnRequest req; |
1514 | 487 | AbortTxnResponse res; |
1515 | 487 | req.set_cloud_unique_id(config::cloud_unique_id); |
1516 | 487 | req.set_reason(std::string(ctx.status.msg().substr(0, 1024))); |
1517 | 487 | if (ctx.db_id > 0 && !ctx.label.empty()) { |
1518 | 479 | req.set_db_id(ctx.db_id); |
1519 | 479 | req.set_label(ctx.label); |
1520 | 479 | } else if (ctx.txn_id > 0) { |
1521 | 8 | req.set_txn_id(ctx.txn_id); |
1522 | 8 | } else { |
1523 | 0 | LOG(WARNING) << "failed abort txn, with illegal input, db_id=" << ctx.db_id |
1524 | 0 | << " txn_id=" << ctx.txn_id << " label=" << ctx.label; |
1525 | 0 | return Status::InternalError<false>("failed to abort txn"); |
1526 | 0 | } |
1527 | 487 | return retry_rpc("abort txn", req, &res, &MetaService_Stub::abort_txn); |
1528 | 487 | } |
1529 | | |
1530 | 33 | Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) { |
1531 | 33 | VLOG_DEBUG << "precommit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id |
1532 | 0 | << ", label: " << ctx.label; |
1533 | 33 | { |
1534 | 33 | Status ret_st; |
1535 | 33 | TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::precommit_txn", ret_st); |
1536 | 33 | } |
1537 | 33 | PrecommitTxnRequest req; |
1538 | 33 | PrecommitTxnResponse res; |
1539 | 33 | req.set_cloud_unique_id(config::cloud_unique_id); |
1540 | 33 | req.set_db_id(ctx.db_id); |
1541 | 33 | req.set_txn_id(ctx.txn_id); |
1542 | 33 | return retry_rpc("precommit txn", req, &res, &MetaService_Stub::precommit_txn); |
1543 | 33 | } |
1544 | | |
1545 | 0 | Status CloudMetaMgr::prepare_restore_job(const TabletMetaPB& tablet_meta) { |
1546 | 0 | VLOG_DEBUG << "prepare restore job, tablet_id: " << tablet_meta.tablet_id(); |
1547 | 0 | RestoreJobRequest req; |
1548 | 0 | RestoreJobResponse resp; |
1549 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1550 | 0 | req.set_tablet_id(tablet_meta.tablet_id()); |
1551 | 0 | req.set_expiration(config::snapshot_expire_time_sec); |
1552 | 0 | req.set_action(RestoreJobRequest::PREPARE); |
1553 | |
|
1554 | 0 | doris_tablet_meta_to_cloud(req.mutable_tablet_meta(), std::move(tablet_meta)); |
1555 | 0 | return retry_rpc("prepare restore job", req, &resp, &MetaService_Stub::prepare_restore_job); |
1556 | 0 | } |
1557 | | |
1558 | 0 | Status CloudMetaMgr::commit_restore_job(const int64_t tablet_id) { |
1559 | 0 | VLOG_DEBUG << "commit restore job, tablet_id: " << tablet_id; |
1560 | 0 | RestoreJobRequest req; |
1561 | 0 | RestoreJobResponse resp; |
1562 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1563 | 0 | req.set_tablet_id(tablet_id); |
1564 | 0 | req.set_action(RestoreJobRequest::COMMIT); |
1565 | 0 | req.set_store_version(config::delete_bitmap_store_write_version); |
1566 | |
|
1567 | 0 | return retry_rpc("commit restore job", req, &resp, &MetaService_Stub::commit_restore_job); |
1568 | 0 | } |
1569 | | |
1570 | 0 | Status CloudMetaMgr::finish_restore_job(const int64_t tablet_id, bool is_completed) { |
1571 | 0 | VLOG_DEBUG << "finish restore job, tablet_id: " << tablet_id |
1572 | 0 | << ", is_completed: " << is_completed; |
1573 | 0 | RestoreJobRequest req; |
1574 | 0 | RestoreJobResponse resp; |
1575 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1576 | 0 | req.set_tablet_id(tablet_id); |
1577 | 0 | req.set_action(is_completed ? RestoreJobRequest::COMPLETE : RestoreJobRequest::ABORT); |
1578 | |
|
1579 | 0 | return retry_rpc("finish restore job", req, &resp, &MetaService_Stub::finish_restore_job); |
1580 | 0 | } |
1581 | | |
1582 | 255 | Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos, bool* is_vault_mode) { |
1583 | 255 | GetObjStoreInfoRequest req; |
1584 | 255 | GetObjStoreInfoResponse resp; |
1585 | 255 | req.set_cloud_unique_id(config::cloud_unique_id); |
1586 | 255 | Status s = |
1587 | 255 | retry_rpc("get storage vault info", req, &resp, &MetaService_Stub::get_obj_store_info); |
1588 | 255 | if (!s.ok()) { |
1589 | 0 | return s; |
1590 | 0 | } |
1591 | | |
1592 | 255 | *is_vault_mode = resp.enable_storage_vault(); |
1593 | | |
1594 | 255 | auto add_obj_store = [&vault_infos](const auto& obj_store) { |
1595 | 255 | vault_infos->emplace_back(obj_store.id(), S3Conf::get_s3_conf(obj_store), |
1596 | 255 | StorageVaultPB_PathFormat {}); |
1597 | 255 | }; |
1598 | | |
1599 | 255 | std::ranges::for_each(resp.obj_info(), add_obj_store); |
1600 | 255 | std::ranges::for_each(resp.storage_vault(), [&](const auto& vault) { |
1601 | 0 | if (vault.has_hdfs_info()) { |
1602 | 0 | vault_infos->emplace_back(vault.id(), vault.hdfs_info(), vault.path_format()); |
1603 | 0 | } |
1604 | 0 | if (vault.has_obj_info()) { |
1605 | 0 | add_obj_store(vault.obj_info()); |
1606 | 0 | } |
1607 | 0 | }); |
1608 | | |
1609 | | // desensitization, hide secret |
1610 | 510 | for (int i = 0; i < resp.obj_info_size(); ++i) { |
1611 | 255 | resp.mutable_obj_info(i)->set_sk(resp.obj_info(i).sk().substr(0, 2) + "xxx"); |
1612 | 255 | } |
1613 | 255 | for (int i = 0; i < resp.storage_vault_size(); ++i) { |
1614 | 0 | auto* j = resp.mutable_storage_vault(i); |
1615 | 0 | if (!j->has_obj_info()) continue; |
1616 | 0 | j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx"); |
1617 | 0 | } |
1618 | | |
1619 | 509 | for (int i = 0; i < resp.obj_info_size(); ++i) { |
1620 | 254 | resp.mutable_obj_info(i)->set_ak(hide_access_key(resp.obj_info(i).sk())); |
1621 | 254 | } |
1622 | 255 | for (int i = 0; i < resp.storage_vault_size(); ++i) { |
1623 | 0 | auto* j = resp.mutable_storage_vault(i); |
1624 | 0 | if (!j->has_obj_info()) continue; |
1625 | 0 | j->mutable_obj_info()->set_sk(hide_access_key(j->obj_info().sk())); |
1626 | 0 | } |
1627 | | |
1628 | 255 | LOG(INFO) << "get storage vault, enable_storage_vault=" << *is_vault_mode |
1629 | 255 | << " response=" << resp.ShortDebugString(); |
1630 | 255 | return Status::OK(); |
1631 | 255 | } |
1632 | | |
1633 | 18.5k | Status CloudMetaMgr::prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res) { |
1634 | 18.4E | VLOG_DEBUG << "prepare_tablet_job: " << job.ShortDebugString(); |
1635 | 18.5k | TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_tablet_job", Status::OK(), job, res); |
1636 | | |
1637 | 18.5k | StartTabletJobRequest req; |
1638 | 18.5k | req.mutable_job()->CopyFrom(job); |
1639 | 18.5k | req.set_cloud_unique_id(config::cloud_unique_id); |
1640 | 18.5k | return retry_rpc("start tablet job", req, res, &MetaService_Stub::start_tablet_job); |
1641 | 18.5k | } |
1642 | | |
1643 | 16.7k | Status CloudMetaMgr::commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJobResponse* res) { |
1644 | 18.4E | VLOG_DEBUG << "commit_tablet_job: " << job.ShortDebugString(); |
1645 | 16.7k | TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_tablet_job", Status::OK(), job, res); |
1646 | 16.7k | DBUG_EXECUTE_IF("CloudMetaMgr::commit_tablet_job.fail", { |
1647 | 16.7k | return Status::InternalError<false>("inject CloudMetaMgr::commit_tablet_job.fail"); |
1648 | 16.7k | }); |
1649 | | |
1650 | 16.7k | FinishTabletJobRequest req; |
1651 | 16.7k | req.mutable_job()->CopyFrom(job); |
1652 | 16.7k | req.set_action(FinishTabletJobRequest::COMMIT); |
1653 | 16.7k | req.set_cloud_unique_id(config::cloud_unique_id); |
1654 | 16.7k | auto st = retry_rpc("commit tablet job", req, res, &MetaService_Stub::finish_tablet_job); |
1655 | 16.7k | if (res->status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) { |
1656 | 0 | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>( |
1657 | 0 | "txn conflict when commit tablet job {}", job.ShortDebugString()); |
1658 | 0 | } |
1659 | | |
1660 | 16.9k | if (st.ok() && !job.compaction().empty() && job.has_idx()) { |
1661 | 7.49k | CommitTxnResponse commit_txn_resp; |
1662 | 7.49k | std::vector<int64_t> tablet_ids = {job.idx().tablet_id()}; |
1663 | 7.49k | send_stats_to_fe_async(-1, -1, "", commit_txn_resp, tablet_ids); |
1664 | 7.49k | } |
1665 | | |
1666 | 16.7k | return st; |
1667 | 16.7k | } |
1668 | | |
1669 | 53 | Status CloudMetaMgr::abort_tablet_job(const TabletJobInfoPB& job) { |
1670 | 53 | VLOG_DEBUG << "abort_tablet_job: " << job.ShortDebugString(); |
1671 | 53 | FinishTabletJobRequest req; |
1672 | 53 | FinishTabletJobResponse res; |
1673 | 53 | req.mutable_job()->CopyFrom(job); |
1674 | 53 | req.set_action(FinishTabletJobRequest::ABORT); |
1675 | 53 | req.set_cloud_unique_id(config::cloud_unique_id); |
1676 | 53 | return retry_rpc("abort tablet job", req, &res, &MetaService_Stub::finish_tablet_job); |
1677 | 53 | } |
1678 | | |
1679 | 179 | Status CloudMetaMgr::lease_tablet_job(const TabletJobInfoPB& job) { |
1680 | 179 | VLOG_DEBUG << "lease_tablet_job: " << job.ShortDebugString(); |
1681 | 179 | FinishTabletJobRequest req; |
1682 | 179 | FinishTabletJobResponse res; |
1683 | 179 | req.mutable_job()->CopyFrom(job); |
1684 | 179 | req.set_action(FinishTabletJobRequest::LEASE); |
1685 | 179 | req.set_cloud_unique_id(config::cloud_unique_id); |
1686 | 179 | return retry_rpc("lease tablet job", req, &res, &MetaService_Stub::finish_tablet_job); |
1687 | 179 | } |
1688 | | |
1689 | | static void add_delete_bitmap(DeleteBitmapPB& delete_bitmap_pb, const DeleteBitmap::BitmapKey& key, |
1690 | 0 | roaring::Roaring& bitmap) { |
1691 | 0 | delete_bitmap_pb.add_rowset_ids(std::get<0>(key).to_string()); |
1692 | 0 | delete_bitmap_pb.add_segment_ids(std::get<1>(key)); |
1693 | 0 | delete_bitmap_pb.add_versions(std::get<2>(key)); |
1694 | | // To save space, convert array and bitmap containers to run containers |
1695 | 0 | bitmap.runOptimize(); |
1696 | 0 | std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); |
1697 | 0 | bitmap.write(bitmap_data.data()); |
1698 | 0 | *(delete_bitmap_pb.add_segment_delete_bitmaps()) = std::move(bitmap_data); |
1699 | 0 | } |
1700 | | |
1701 | | static Status store_delete_bitmap(std::string& rowset_id, DeleteBitmapPB& delete_bitmap_pb, |
1702 | | int64_t tablet_id, |
1703 | | std::optional<StorageResource> storage_resource, |
1704 | 0 | UpdateDeleteBitmapRequest& req, int64_t txn_id) { |
1705 | 0 | if (config::enable_mow_verbose_log) { |
1706 | 0 | std::stringstream ss; |
1707 | 0 | for (int i = 0; i < delete_bitmap_pb.rowset_ids_size(); i++) { |
1708 | 0 | ss << "{rid=" << delete_bitmap_pb.rowset_ids(i) |
1709 | 0 | << ", sid=" << delete_bitmap_pb.segment_ids(i) |
1710 | 0 | << ", ver=" << delete_bitmap_pb.versions(i) << "}, "; |
1711 | 0 | } |
1712 | 0 | LOG(INFO) << "handle one rowset delete bitmap for tablet_id: " << tablet_id |
1713 | 0 | << ", rowset_id: " << rowset_id |
1714 | 0 | << ", delete_bitmap num: " << delete_bitmap_pb.rowset_ids_size() |
1715 | 0 | << ", size: " << delete_bitmap_pb.ByteSizeLong() << ", keys=[" << ss.str() |
1716 | 0 | << "]"; |
1717 | 0 | } |
1718 | 0 | if (delete_bitmap_pb.rowset_ids_size() == 0) { |
1719 | 0 | return Status::OK(); |
1720 | 0 | } |
1721 | 0 | DeleteBitmapStoragePB delete_bitmap_storage; |
1722 | 0 | if (config::delete_bitmap_store_v2_max_bytes_in_fdb >= 0 && |
1723 | 0 | delete_bitmap_pb.ByteSizeLong() > config::delete_bitmap_store_v2_max_bytes_in_fdb) { |
1724 | | // Enable packed file only for load (txn_id > 0) |
1725 | 0 | bool enable_packed = config::enable_packed_file && txn_id > 0; |
1726 | 0 | DeleteBitmapFileWriter file_writer(tablet_id, rowset_id, storage_resource, enable_packed, |
1727 | 0 | txn_id); |
1728 | 0 | RETURN_IF_ERROR(file_writer.init()); |
1729 | 0 | RETURN_IF_ERROR(file_writer.write(delete_bitmap_pb)); |
1730 | 0 | RETURN_IF_ERROR(file_writer.close()); |
1731 | 0 | delete_bitmap_pb.Clear(); |
1732 | 0 | delete_bitmap_storage.set_store_in_fdb(false); |
1733 | | |
1734 | | // Store packed slice location if file was written to packed file |
1735 | 0 | if (file_writer.is_packed()) { |
1736 | 0 | io::PackedSliceLocation loc; |
1737 | 0 | RETURN_IF_ERROR(file_writer.get_packed_slice_location(&loc)); |
1738 | 0 | auto* packed_loc = delete_bitmap_storage.mutable_packed_slice_location(); |
1739 | 0 | packed_loc->set_packed_file_path(loc.packed_file_path); |
1740 | 0 | packed_loc->set_offset(loc.offset); |
1741 | 0 | packed_loc->set_size(loc.size); |
1742 | 0 | packed_loc->set_packed_file_size(loc.packed_file_size); |
1743 | 0 | } |
1744 | 0 | } else { |
1745 | 0 | delete_bitmap_storage.set_store_in_fdb(true); |
1746 | 0 | *(delete_bitmap_storage.mutable_delete_bitmap()) = std::move(delete_bitmap_pb); |
1747 | 0 | } |
1748 | 0 | req.add_delta_rowset_ids(rowset_id); |
1749 | 0 | *(req.add_delete_bitmap_storages()) = std::move(delete_bitmap_storage); |
1750 | 0 | return Status::OK(); |
1751 | 0 | } |
1752 | | |
1753 | | Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id, |
1754 | | int64_t initiator, DeleteBitmap* delete_bitmap, |
1755 | | DeleteBitmap* delete_bitmap_v2, std::string rowset_id, |
1756 | | std::optional<StorageResource> storage_resource, |
1757 | | int64_t store_version, int64_t txn_id, |
1758 | 26.9k | bool is_explicit_txn, int64_t next_visible_version) { |
1759 | 26.9k | VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet.tablet_id(); |
1760 | 26.9k | if (config::enable_mow_verbose_log) { |
1761 | 0 | std::stringstream ss; |
1762 | 0 | ss << "start update delete bitmap for tablet_id: " << tablet.tablet_id() |
1763 | 0 | << ", rowset_id: " << rowset_id |
1764 | 0 | << ", delete_bitmap num: " << delete_bitmap->delete_bitmap.size() |
1765 | 0 | << ", store_version: " << store_version << ", lock_id=" << lock_id |
1766 | 0 | << ", initiator=" << initiator; |
1767 | 0 | if (store_version == 2 || store_version == 3) { |
1768 | 0 | ss << ", delete_bitmap v2 num: " << delete_bitmap_v2->delete_bitmap.size(); |
1769 | 0 | } |
1770 | 0 | LOG(INFO) << ss.str(); |
1771 | 0 | } |
1772 | 26.9k | UpdateDeleteBitmapRequest req; |
1773 | 26.9k | UpdateDeleteBitmapResponse res; |
1774 | 26.9k | req.set_cloud_unique_id(config::cloud_unique_id); |
1775 | 26.9k | req.set_table_id(tablet.table_id()); |
1776 | 26.9k | req.set_partition_id(tablet.partition_id()); |
1777 | 26.9k | req.set_tablet_id(tablet.tablet_id()); |
1778 | 26.9k | req.set_lock_id(lock_id); |
1779 | 26.9k | req.set_initiator(initiator); |
1780 | 26.9k | req.set_is_explicit_txn(is_explicit_txn); |
1781 | 26.9k | if (txn_id > 0) { |
1782 | 21.7k | req.set_txn_id(txn_id); |
1783 | 21.7k | } |
1784 | 26.9k | if (next_visible_version > 0) { |
1785 | 21.9k | req.set_next_visible_version(next_visible_version); |
1786 | 21.9k | } |
1787 | 26.9k | req.set_store_version(store_version); |
1788 | | |
1789 | 26.9k | bool write_v1 = store_version == 1 || store_version == 3; |
1790 | 26.9k | bool write_v2 = store_version == 2 || store_version == 3; |
1791 | | // write v1 kvs |
1792 | 26.9k | if (write_v1) { |
1793 | 26.8k | for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) { |
1794 | 10.4k | req.add_rowset_ids(std::get<0>(key).to_string()); |
1795 | 10.4k | req.add_segment_ids(std::get<1>(key)); |
1796 | 10.4k | req.add_versions(std::get<2>(key)); |
1797 | | // To save space, convert array and bitmap containers to run containers |
1798 | 10.4k | bitmap.runOptimize(); |
1799 | 10.4k | std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); |
1800 | 10.4k | bitmap.write(bitmap_data.data()); |
1801 | 10.4k | *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data); |
1802 | 10.4k | } |
1803 | 26.8k | } |
1804 | | |
1805 | | // write v2 kvs |
1806 | 26.9k | if (write_v2) { |
1807 | 0 | if (config::enable_mow_verbose_log) { |
1808 | 0 | LOG(INFO) << "update delete bitmap for tablet_id: " << tablet.tablet_id() |
1809 | 0 | << ", rowset_id: " << rowset_id |
1810 | 0 | << ", delete_bitmap num: " << delete_bitmap_v2->delete_bitmap.size() |
1811 | 0 | << ", lock_id=" << lock_id << ", initiator=" << initiator; |
1812 | 0 | } |
1813 | 0 | if (rowset_id.empty()) { |
1814 | 0 | std::string pre_rowset_id = ""; |
1815 | 0 | std::string cur_rowset_id = ""; |
1816 | 0 | DeleteBitmapPB delete_bitmap_pb; |
1817 | 0 | for (auto it = delete_bitmap_v2->delete_bitmap.begin(); |
1818 | 0 | it != delete_bitmap_v2->delete_bitmap.end(); ++it) { |
1819 | 0 | auto& key = it->first; |
1820 | 0 | auto& bitmap = it->second; |
1821 | 0 | cur_rowset_id = std::get<0>(key).to_string(); |
1822 | 0 | if (cur_rowset_id != pre_rowset_id) { |
1823 | 0 | if (!pre_rowset_id.empty() && delete_bitmap_pb.rowset_ids_size() > 0) { |
1824 | 0 | RETURN_IF_ERROR(store_delete_bitmap(pre_rowset_id, delete_bitmap_pb, |
1825 | 0 | tablet.tablet_id(), storage_resource, |
1826 | 0 | req, txn_id)); |
1827 | 0 | } |
1828 | 0 | pre_rowset_id = cur_rowset_id; |
1829 | 0 | DCHECK_EQ(delete_bitmap_pb.rowset_ids_size(), 0); |
1830 | 0 | DCHECK_EQ(delete_bitmap_pb.segment_ids_size(), 0); |
1831 | 0 | DCHECK_EQ(delete_bitmap_pb.versions_size(), 0); |
1832 | 0 | DCHECK_EQ(delete_bitmap_pb.segment_delete_bitmaps_size(), 0); |
1833 | 0 | } |
1834 | 0 | add_delete_bitmap(delete_bitmap_pb, key, bitmap); |
1835 | 0 | } |
1836 | 0 | if (delete_bitmap_pb.rowset_ids_size() > 0) { |
1837 | 0 | DCHECK(!cur_rowset_id.empty()); |
1838 | 0 | RETURN_IF_ERROR(store_delete_bitmap(cur_rowset_id, delete_bitmap_pb, |
1839 | 0 | tablet.tablet_id(), storage_resource, req, |
1840 | 0 | txn_id)); |
1841 | 0 | } |
1842 | 0 | } else { |
1843 | 0 | DeleteBitmapPB delete_bitmap_pb; |
1844 | 0 | for (auto& [key, bitmap] : delete_bitmap_v2->delete_bitmap) { |
1845 | 0 | add_delete_bitmap(delete_bitmap_pb, key, bitmap); |
1846 | 0 | } |
1847 | 0 | RETURN_IF_ERROR(store_delete_bitmap(rowset_id, delete_bitmap_pb, tablet.tablet_id(), |
1848 | 0 | storage_resource, req, txn_id)); |
1849 | 0 | } |
1850 | 0 | DCHECK_EQ(req.delta_rowset_ids_size(), req.delete_bitmap_storages_size()); |
1851 | 0 | } |
1852 | 26.9k | DBUG_EXECUTE_IF("CloudMetaMgr::test_update_big_delete_bitmap", { |
1853 | 26.9k | LOG(INFO) << "test_update_big_delete_bitmap for tablet " << tablet.tablet_id(); |
1854 | 26.9k | auto count = dp->param<int>("count", 30000); |
1855 | 26.9k | if (!delete_bitmap->delete_bitmap.empty()) { |
1856 | 26.9k | auto& key = delete_bitmap->delete_bitmap.begin()->first; |
1857 | 26.9k | auto& bitmap = delete_bitmap->delete_bitmap.begin()->second; |
1858 | 26.9k | for (int i = 1000; i < (1000 + count); i++) { |
1859 | 26.9k | req.add_rowset_ids(std::get<0>(key).to_string()); |
1860 | 26.9k | req.add_segment_ids(std::get<1>(key)); |
1861 | 26.9k | req.add_versions(i); |
1862 | | // To save space, convert array and bitmap containers to run containers |
1863 | 26.9k | bitmap.runOptimize(); |
1864 | 26.9k | std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); |
1865 | 26.9k | bitmap.write(bitmap_data.data()); |
1866 | 26.9k | *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data); |
1867 | 26.9k | } |
1868 | 26.9k | } |
1869 | 26.9k | }); |
1870 | 26.9k | DBUG_EXECUTE_IF("CloudMetaMgr::test_update_delete_bitmap_fail", { |
1871 | 26.9k | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>( |
1872 | 26.9k | "test update delete bitmap failed, tablet_id: {}, lock_id: {}", tablet.tablet_id(), |
1873 | 26.9k | lock_id); |
1874 | 26.9k | }); |
1875 | 26.9k | auto st = retry_rpc("update delete bitmap", req, &res, &MetaService_Stub::update_delete_bitmap); |
1876 | 26.9k | if (config::enable_update_delete_bitmap_kv_check_core && |
1877 | 26.9k | res.status().code() == MetaServiceCode::UPDATE_OVERRIDE_EXISTING_KV) { |
1878 | 0 | auto& msg = res.status().msg(); |
1879 | 0 | LOG_WARNING(msg); |
1880 | 0 | CHECK(false) << msg; |
1881 | 0 | } |
1882 | 26.9k | if (res.status().code() == MetaServiceCode::LOCK_EXPIRED) { |
1883 | 19 | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>( |
1884 | 19 | "lock expired when update delete bitmap, tablet_id: {}, lock_id: {}, initiator: " |
1885 | 19 | "{}, error_msg: {}", |
1886 | 19 | tablet.tablet_id(), lock_id, initiator, res.status().msg()); |
1887 | 19 | } |
1888 | 26.9k | return st; |
1889 | 26.9k | } |
1890 | | |
1891 | | Status CloudMetaMgr::cloud_update_delete_bitmap_without_lock( |
1892 | | const CloudTablet& tablet, DeleteBitmap* delete_bitmap, |
1893 | | std::map<std::string, int64_t>& rowset_to_versions, int64_t pre_rowset_agg_start_version, |
1894 | 3.47k | int64_t pre_rowset_agg_end_version) { |
1895 | 3.47k | if (config::delete_bitmap_store_write_version == 2) { |
1896 | 0 | VLOG_DEBUG << "no need to agg delete bitmap v1 in ms because use v2"; |
1897 | 0 | return Status::OK(); |
1898 | 0 | } |
1899 | 3.47k | LOG(INFO) << "cloud_update_delete_bitmap_without_lock, tablet_id: " << tablet.tablet_id() |
1900 | 3.47k | << ", delete_bitmap size: " << delete_bitmap->delete_bitmap.size(); |
1901 | 3.47k | UpdateDeleteBitmapRequest req; |
1902 | 3.47k | UpdateDeleteBitmapResponse res; |
1903 | 3.47k | req.set_cloud_unique_id(config::cloud_unique_id); |
1904 | 3.47k | req.set_table_id(tablet.table_id()); |
1905 | 3.47k | req.set_partition_id(tablet.partition_id()); |
1906 | 3.47k | req.set_tablet_id(tablet.tablet_id()); |
1907 | | // use a fake lock id to resolve compatibility issues |
1908 | 3.47k | req.set_lock_id(-3); |
1909 | 3.47k | req.set_without_lock(true); |
1910 | 3.47k | for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) { |
1911 | 1.12k | req.add_rowset_ids(std::get<0>(key).to_string()); |
1912 | 1.12k | req.add_segment_ids(std::get<1>(key)); |
1913 | 1.12k | req.add_versions(std::get<2>(key)); |
1914 | 1.12k | if (pre_rowset_agg_end_version > 0) { |
1915 | 1.12k | DCHECK(rowset_to_versions.find(std::get<0>(key).to_string()) != |
1916 | 0 | rowset_to_versions.end()) |
1917 | 0 | << "rowset_to_versions not found for key=" << std::get<0>(key).to_string(); |
1918 | 1.12k | req.add_pre_rowset_versions(rowset_to_versions[std::get<0>(key).to_string()]); |
1919 | 1.12k | } |
1920 | 1.12k | DCHECK(pre_rowset_agg_end_version <= 0 || pre_rowset_agg_end_version == std::get<2>(key)) |
1921 | 0 | << "pre_rowset_agg_end_version=" << pre_rowset_agg_end_version |
1922 | 0 | << " not equal to version=" << std::get<2>(key); |
1923 | | // To save space, convert array and bitmap containers to run containers |
1924 | 1.12k | bitmap.runOptimize(); |
1925 | 1.12k | std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); |
1926 | 1.12k | bitmap.write(bitmap_data.data()); |
1927 | 1.12k | *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data); |
1928 | 1.12k | } |
1929 | 3.47k | if (pre_rowset_agg_start_version > 0 && pre_rowset_agg_end_version > 0) { |
1930 | 3.47k | req.set_pre_rowset_agg_start_version(pre_rowset_agg_start_version); |
1931 | 3.47k | req.set_pre_rowset_agg_end_version(pre_rowset_agg_end_version); |
1932 | 3.47k | } |
1933 | 3.47k | return retry_rpc("update delete bitmap", req, &res, &MetaService_Stub::update_delete_bitmap); |
1934 | 3.47k | } |
1935 | | |
1936 | | Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, |
1937 | 4.75k | int64_t initiator) { |
1938 | 4.75k | DBUG_EXECUTE_IF("get_delete_bitmap_update_lock.inject_fail", { |
1939 | 4.75k | auto p = dp->param("percent", 0.01); |
1940 | 4.75k | std::mt19937 gen {std::random_device {}()}; |
1941 | 4.75k | std::bernoulli_distribution inject_fault {p}; |
1942 | 4.75k | if (inject_fault(gen)) { |
1943 | 4.75k | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>( |
1944 | 4.75k | "injection error when get get_delete_bitmap_update_lock, " |
1945 | 4.75k | "tablet_id={}, lock_id={}, initiator={}", |
1946 | 4.75k | tablet.tablet_id(), lock_id, initiator); |
1947 | 4.75k | } |
1948 | 4.75k | }); |
1949 | 4.75k | VLOG_DEBUG << "get_delete_bitmap_update_lock , tablet_id: " << tablet.tablet_id() |
1950 | 1 | << ",lock_id:" << lock_id; |
1951 | 4.75k | GetDeleteBitmapUpdateLockRequest req; |
1952 | 4.75k | GetDeleteBitmapUpdateLockResponse res; |
1953 | 4.75k | req.set_cloud_unique_id(config::cloud_unique_id); |
1954 | 4.75k | req.set_table_id(tablet.table_id()); |
1955 | 4.75k | req.set_lock_id(lock_id); |
1956 | 4.75k | req.set_initiator(initiator); |
1957 | | // set expiration time for compaction and schema_change |
1958 | 4.75k | req.set_expiration(config::delete_bitmap_lock_expiration_seconds); |
1959 | 4.75k | int retry_times = 0; |
1960 | 4.75k | Status st; |
1961 | 4.75k | std::default_random_engine rng = make_random_engine(); |
1962 | 4.75k | std::uniform_int_distribution<uint32_t> u(500, 2000); |
1963 | 4.75k | uint64_t backoff_sleep_time_ms {0}; |
1964 | 5.28k | do { |
1965 | 5.28k | bool test_conflict = false; |
1966 | 5.28k | st = retry_rpc("get delete bitmap update lock", req, &res, |
1967 | 5.28k | &MetaService_Stub::get_delete_bitmap_update_lock); |
1968 | 5.28k | DBUG_EXECUTE_IF("CloudMetaMgr::test_get_delete_bitmap_update_lock_conflict", |
1969 | 5.28k | { test_conflict = true; }); |
1970 | 5.28k | if (!test_conflict && res.status().code() != MetaServiceCode::LOCK_CONFLICT) { |
1971 | 4.76k | break; |
1972 | 4.76k | } |
1973 | | |
1974 | 526 | uint32_t duration_ms = u(rng); |
1975 | 526 | LOG(WARNING) << "get delete bitmap lock conflict. " << debug_info(req) |
1976 | 526 | << " retry_times=" << retry_times << " sleep=" << duration_ms |
1977 | 526 | << "ms : " << res.status().msg(); |
1978 | 526 | auto start = std::chrono::steady_clock::now(); |
1979 | 526 | bthread_usleep(duration_ms * 1000); |
1980 | 526 | auto end = std::chrono::steady_clock::now(); |
1981 | 526 | backoff_sleep_time_ms += duration_cast<std::chrono::milliseconds>(end - start).count(); |
1982 | 526 | } while (++retry_times <= config::get_delete_bitmap_lock_max_retry_times); |
1983 | 0 | g_cloud_be_mow_get_dbm_lock_backoff_sleep_time << backoff_sleep_time_ms; |
1984 | 4.75k | DBUG_EXECUTE_IF("CloudMetaMgr.get_delete_bitmap_update_lock.inject_sleep", { |
1985 | 4.75k | auto p = dp->param("percent", 0.01); |
1986 | | // 100s > Config.calculate_delete_bitmap_task_timeout_seconds = 60s |
1987 | 4.75k | auto sleep_time = dp->param("sleep", 15); |
1988 | 4.75k | std::mt19937 gen {std::random_device {}()}; |
1989 | 4.75k | std::bernoulli_distribution inject_fault {p}; |
1990 | 4.75k | if (inject_fault(gen)) { |
1991 | 4.75k | LOG_INFO("injection sleep for {} seconds, tablet_id={}", sleep_time, |
1992 | 4.75k | tablet.tablet_id()); |
1993 | 4.75k | std::this_thread::sleep_for(std::chrono::seconds(sleep_time)); |
1994 | 4.75k | } |
1995 | 4.75k | }); |
1996 | 4.75k | if (res.status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) { |
1997 | 0 | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>( |
1998 | 0 | "txn conflict when get delete bitmap update lock, table_id {}, lock_id {}, " |
1999 | 0 | "initiator {}", |
2000 | 0 | tablet.table_id(), lock_id, initiator); |
2001 | 4.75k | } else if (res.status().code() == MetaServiceCode::LOCK_CONFLICT) { |
2002 | 0 | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>( |
2003 | 0 | "lock conflict when get delete bitmap update lock, table_id {}, lock_id {}, " |
2004 | 0 | "initiator {}", |
2005 | 0 | tablet.table_id(), lock_id, initiator); |
2006 | 0 | } |
2007 | 4.75k | return st; |
2008 | 4.75k | } |
2009 | | |
2010 | | void CloudMetaMgr::remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id, |
2011 | 21 | int64_t initiator, int64_t tablet_id) { |
2012 | 21 | LOG(INFO) << "remove_delete_bitmap_update_lock ,table_id: " << table_id |
2013 | 21 | << ",lock_id:" << lock_id << ",initiator:" << initiator << ",tablet_id:" << tablet_id; |
2014 | 21 | RemoveDeleteBitmapUpdateLockRequest req; |
2015 | 21 | RemoveDeleteBitmapUpdateLockResponse res; |
2016 | 21 | req.set_cloud_unique_id(config::cloud_unique_id); |
2017 | 21 | req.set_table_id(table_id); |
2018 | 21 | req.set_tablet_id(tablet_id); |
2019 | 21 | req.set_lock_id(lock_id); |
2020 | 21 | req.set_initiator(initiator); |
2021 | 21 | auto st = retry_rpc("remove delete bitmap update lock", req, &res, |
2022 | 21 | &MetaService_Stub::remove_delete_bitmap_update_lock); |
2023 | 21 | if (!st.ok()) { |
2024 | 19 | LOG(WARNING) << "remove delete bitmap update lock fail,table_id=" << table_id |
2025 | 19 | << ",tablet_id=" << tablet_id << ",lock_id=" << lock_id |
2026 | 19 | << ",st=" << st.to_string(); |
2027 | 19 | } |
2028 | 21 | } |
2029 | | |
2030 | 74.0k | void CloudMetaMgr::check_table_size_correctness(RowsetMeta& rs_meta) { |
2031 | 74.1k | if (!config::enable_table_size_correctness_check) { |
2032 | 74.1k | return; |
2033 | 74.1k | } |
2034 | 18.4E | int64_t total_segment_size = get_segment_file_size(rs_meta); |
2035 | 18.4E | int64_t total_inverted_index_size = get_inverted_index_file_size(rs_meta); |
2036 | 18.4E | if (rs_meta.data_disk_size() != total_segment_size || |
2037 | 18.4E | rs_meta.index_disk_size() != total_inverted_index_size || |
2038 | 18.4E | rs_meta.data_disk_size() + rs_meta.index_disk_size() != rs_meta.total_disk_size()) { |
2039 | 0 | LOG(WARNING) << "[Cloud table table size check failed]:" |
2040 | 0 | << " tablet id: " << rs_meta.tablet_id() |
2041 | 0 | << ", rowset id:" << rs_meta.rowset_id() |
2042 | 0 | << ", rowset data disk size:" << rs_meta.data_disk_size() |
2043 | 0 | << ", rowset real data disk size:" << total_segment_size |
2044 | 0 | << ", rowset index disk size:" << rs_meta.index_disk_size() |
2045 | 0 | << ", rowset real index disk size:" << total_inverted_index_size |
2046 | 0 | << ", rowset total disk size:" << rs_meta.total_disk_size() |
2047 | 0 | << ", rowset segment path:" |
2048 | 0 | << StorageResource().remote_segment_path(rs_meta.tablet_id(), |
2049 | 0 | rs_meta.rowset_id().to_string(), 0); |
2050 | 0 | DCHECK(false); |
2051 | 0 | } |
2052 | 18.4E | } |
2053 | | |
2054 | 0 | int64_t CloudMetaMgr::get_segment_file_size(RowsetMeta& rs_meta) { |
2055 | 0 | int64_t total_segment_size = 0; |
2056 | 0 | const auto fs = rs_meta.fs(); |
2057 | 0 | if (!fs) { |
2058 | 0 | LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id(); |
2059 | 0 | } |
2060 | 0 | for (int64_t seg_id = 0; seg_id < rs_meta.num_segments(); seg_id++) { |
2061 | 0 | std::string segment_path = StorageResource().remote_segment_path( |
2062 | 0 | rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id); |
2063 | 0 | int64_t segment_file_size = 0; |
2064 | 0 | auto st = fs->file_size(segment_path, &segment_file_size); |
2065 | 0 | if (!st.ok()) { |
2066 | 0 | segment_file_size = 0; |
2067 | 0 | if (st.is<NOT_FOUND>()) { |
2068 | 0 | LOG(INFO) << "cloud table size correctness check get segment size 0 because " |
2069 | 0 | "file not exist! msg:" |
2070 | 0 | << st.msg() << ", segment path:" << segment_path; |
2071 | 0 | } else { |
2072 | 0 | LOG(WARNING) << "cloud table size correctness check get segment size failed! msg:" |
2073 | 0 | << st.msg() << ", segment path:" << segment_path; |
2074 | 0 | } |
2075 | 0 | } |
2076 | 0 | total_segment_size += segment_file_size; |
2077 | 0 | } |
2078 | 0 | return total_segment_size; |
2079 | 0 | } |
2080 | | |
2081 | 0 | int64_t CloudMetaMgr::get_inverted_index_file_size(RowsetMeta& rs_meta) { |
2082 | 0 | int64_t total_inverted_index_size = 0; |
2083 | 0 | const auto fs = rs_meta.fs(); |
2084 | 0 | if (!fs) { |
2085 | 0 | LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id(); |
2086 | 0 | } |
2087 | 0 | if (rs_meta.tablet_schema()->get_inverted_index_storage_format() == |
2088 | 0 | InvertedIndexStorageFormatPB::V1) { |
2089 | 0 | const auto& indices = rs_meta.tablet_schema()->inverted_indexes(); |
2090 | 0 | for (auto& index : indices) { |
2091 | 0 | for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) { |
2092 | 0 | std::string segment_path = StorageResource().remote_segment_path( |
2093 | 0 | rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id); |
2094 | 0 | int64_t file_size = 0; |
2095 | |
|
2096 | 0 | std::string inverted_index_file_path = |
2097 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
2098 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(segment_path), |
2099 | 0 | index->index_id(), index->get_index_suffix()); |
2100 | 0 | auto st = fs->file_size(inverted_index_file_path, &file_size); |
2101 | 0 | if (!st.ok()) { |
2102 | 0 | file_size = 0; |
2103 | 0 | if (st.is<NOT_FOUND>()) { |
2104 | 0 | LOG(INFO) << "cloud table size correctness check get inverted index v1 " |
2105 | 0 | "0 because file not exist! msg:" |
2106 | 0 | << st.msg() |
2107 | 0 | << ", inverted index path:" << inverted_index_file_path; |
2108 | 0 | } else { |
2109 | 0 | LOG(WARNING) |
2110 | 0 | << "cloud table size correctness check get inverted index v1 " |
2111 | 0 | "size failed! msg:" |
2112 | 0 | << st.msg() << ", inverted index path:" << inverted_index_file_path; |
2113 | 0 | } |
2114 | 0 | } |
2115 | 0 | total_inverted_index_size += file_size; |
2116 | 0 | } |
2117 | 0 | } |
2118 | 0 | } else { |
2119 | 0 | for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) { |
2120 | 0 | int64_t file_size = 0; |
2121 | 0 | std::string segment_path = StorageResource().remote_segment_path( |
2122 | 0 | rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id); |
2123 | |
|
2124 | 0 | std::string inverted_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2( |
2125 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)); |
2126 | 0 | auto st = fs->file_size(inverted_index_file_path, &file_size); |
2127 | 0 | if (!st.ok()) { |
2128 | 0 | file_size = 0; |
2129 | 0 | if (st.is<NOT_FOUND>()) { |
2130 | 0 | LOG(INFO) << "cloud table size correctness check get inverted index v2 " |
2131 | 0 | "0 because file not exist! msg:" |
2132 | 0 | << st.msg() << ", inverted index path:" << inverted_index_file_path; |
2133 | 0 | } else { |
2134 | 0 | LOG(WARNING) << "cloud table size correctness check get inverted index v2 " |
2135 | 0 | "size failed! msg:" |
2136 | 0 | << st.msg() |
2137 | 0 | << ", inverted index path:" << inverted_index_file_path; |
2138 | 0 | } |
2139 | 0 | } |
2140 | 0 | total_inverted_index_size += file_size; |
2141 | 0 | } |
2142 | 0 | } |
2143 | 0 | return total_inverted_index_size; |
2144 | 0 | } |
2145 | | |
2146 | | Status CloudMetaMgr::fill_version_holes(CloudTablet* tablet, int64_t max_version, |
2147 | 187k | std::unique_lock<std::shared_mutex>& wlock) { |
2148 | 187k | if (max_version <= 0) { |
2149 | 116k | return Status::OK(); |
2150 | 116k | } |
2151 | | |
2152 | 71.5k | Versions existing_versions; |
2153 | 265k | for (const auto& [_, rs] : tablet->tablet_meta()->all_rs_metas()) { |
2154 | 265k | existing_versions.emplace_back(rs->version()); |
2155 | 265k | } |
2156 | | |
2157 | | // If there are no existing versions, it may be a new tablet for restore, so skip filling holes. |
2158 | 71.5k | if (existing_versions.empty()) { |
2159 | 1 | return Status::OK(); |
2160 | 1 | } |
2161 | | |
2162 | 71.5k | std::vector<RowsetSharedPtr> hole_rowsets; |
2163 | | // sort the existing versions in ascending order |
2164 | 71.5k | std::sort(existing_versions.begin(), existing_versions.end(), |
2165 | 541k | [](const Version& a, const Version& b) { |
2166 | | // simple because 2 versions are certainly not overlapping |
2167 | 541k | return a.first < b.first; |
2168 | 541k | }); |
2169 | | |
2170 | | // During schema change, get_tablet operations on new tablets trigger sync_tablet_rowsets which calls |
2171 | | // fill_version_holes. For schema change tablets (TABLET_NOTREADY state), we selectively skip hole |
2172 | | // filling for versions <= alter_version to prevent: |
2173 | | // 1. Abnormal compaction score calculations for schema change tablets |
2174 | | // 2. Unexpected -235 errors during load operations |
2175 | | // This allows schema change to proceed normally while still permitting hole filling for versions |
2176 | | // beyond the alter_version threshold. |
2177 | 71.5k | bool is_schema_change_tablet = tablet->tablet_state() == TABLET_NOTREADY; |
2178 | 71.5k | if (is_schema_change_tablet && tablet->alter_version() <= 1) { |
2179 | 4.80k | LOG(INFO) << "Skip version hole filling for new schema change tablet " |
2180 | 4.80k | << tablet->tablet_id() << " with alter_version " << tablet->alter_version(); |
2181 | 4.80k | return Status::OK(); |
2182 | 4.80k | } |
2183 | | |
2184 | 66.7k | int64_t last_version = -1; |
2185 | 260k | for (const Version& version : existing_versions) { |
2186 | 260k | VLOG_NOTICE << "Existing version for tablet " << tablet->tablet_id() << ": [" |
2187 | 1 | << version.first << ", " << version.second << "]"; |
2188 | | // missing versions are those that are not in the existing_versions |
2189 | 260k | if (version.first > last_version + 1) { |
2190 | | // there is a hole between versions |
2191 | 167 | auto prev_non_hole_rowset = tablet->get_rowset_by_version(version); |
2192 | 860 | for (int64_t ver = last_version + 1; ver < version.first; ++ver) { |
2193 | | // Skip hole filling for versions <= alter_version during schema change |
2194 | 693 | if (is_schema_change_tablet && ver <= tablet->alter_version()) { |
2195 | 425 | continue; |
2196 | 425 | } |
2197 | 268 | RowsetSharedPtr hole_rowset; |
2198 | 268 | RETURN_IF_ERROR(create_empty_rowset_for_hole( |
2199 | 268 | tablet, ver, prev_non_hole_rowset->rowset_meta(), &hole_rowset)); |
2200 | 268 | hole_rowsets.push_back(hole_rowset); |
2201 | 268 | } |
2202 | 167 | LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1 |
2203 | 167 | << " to " << version.first - 1 << " for tablet " << tablet->tablet_id() |
2204 | 167 | << (is_schema_change_tablet |
2205 | 167 | ? (", schema change tablet skipped filling versions <= " + |
2206 | 16 | std::to_string(tablet->alter_version())) |
2207 | 167 | : ""); |
2208 | 167 | } |
2209 | 260k | last_version = version.second; |
2210 | 260k | } |
2211 | | |
2212 | 66.7k | if (last_version + 1 <= max_version) { |
2213 | 7.80k | LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1 << " to " |
2214 | 7.80k | << max_version << " for tablet " << tablet->tablet_id() |
2215 | 7.80k | << (is_schema_change_tablet |
2216 | 7.80k | ? (", schema change tablet skipped filling versions <= " + |
2217 | 5 | std::to_string(tablet->alter_version())) |
2218 | 7.80k | : ""); |
2219 | | // there is a hole after the last existing version |
2220 | 18.3k | for (; last_version + 1 <= max_version; ++last_version) { |
2221 | | // Skip hole filling for versions <= alter_version during schema change |
2222 | 10.5k | if (is_schema_change_tablet && last_version + 1 <= tablet->alter_version()) { |
2223 | 215 | continue; |
2224 | 215 | } |
2225 | 10.3k | RowsetSharedPtr hole_rowset; |
2226 | 10.3k | auto prev_non_hole_rowset = tablet->get_rowset_by_version(existing_versions.back()); |
2227 | 10.3k | RETURN_IF_ERROR(create_empty_rowset_for_hole( |
2228 | 10.3k | tablet, last_version + 1, prev_non_hole_rowset->rowset_meta(), &hole_rowset)); |
2229 | 10.3k | hole_rowsets.push_back(hole_rowset); |
2230 | 10.3k | } |
2231 | 7.80k | } |
2232 | | |
2233 | 66.7k | if (!hole_rowsets.empty()) { |
2234 | 7.90k | size_t hole_count = hole_rowsets.size(); |
2235 | 7.90k | tablet->add_rowsets(std::move(hole_rowsets), false, wlock, false); |
2236 | 7.90k | g_cloud_version_hole_filled_count << hole_count; |
2237 | 7.90k | } |
2238 | 66.7k | return Status::OK(); |
2239 | 66.7k | } |
2240 | | |
2241 | | Status CloudMetaMgr::create_empty_rowset_for_hole(CloudTablet* tablet, int64_t version, |
2242 | | RowsetMetaSharedPtr prev_rowset_meta, |
2243 | 133k | RowsetSharedPtr* rowset) { |
2244 | | // Create a RowsetMeta for the empty rowset |
2245 | 133k | auto rs_meta = std::make_shared<RowsetMeta>(); |
2246 | | |
2247 | | // Generate a deterministic rowset ID for the hole (same tablet_id + version = same rowset_id) |
2248 | 133k | RowsetId hole_rowset_id; |
2249 | 133k | hole_rowset_id.init(2, 0, tablet->tablet_id(), version); |
2250 | 133k | rs_meta->set_rowset_id(hole_rowset_id); |
2251 | | |
2252 | | // Generate a deterministic load_id for the hole rowset (same tablet_id + version = same load_id) |
2253 | 133k | PUniqueId load_id; |
2254 | 133k | load_id.set_hi(tablet->tablet_id()); |
2255 | 133k | load_id.set_lo(version); |
2256 | 133k | rs_meta->set_load_id(load_id); |
2257 | | |
2258 | | // Copy schema and other metadata from template |
2259 | 133k | rs_meta->set_tablet_schema(prev_rowset_meta->tablet_schema()); |
2260 | 133k | rs_meta->set_rowset_type(prev_rowset_meta->rowset_type()); |
2261 | 133k | rs_meta->set_tablet_schema_hash(prev_rowset_meta->tablet_schema_hash()); |
2262 | 133k | rs_meta->set_resource_id(prev_rowset_meta->resource_id()); |
2263 | | |
2264 | | // Basic tablet information |
2265 | 133k | rs_meta->set_tablet_id(tablet->tablet_id()); |
2266 | 133k | rs_meta->set_index_id(tablet->index_id()); |
2267 | 133k | rs_meta->set_partition_id(tablet->partition_id()); |
2268 | 133k | rs_meta->set_tablet_uid(tablet->tablet_uid()); |
2269 | 133k | rs_meta->set_version(Version(version, version)); |
2270 | 133k | rs_meta->set_txn_id(version); |
2271 | | |
2272 | 133k | rs_meta->set_num_rows(0); |
2273 | 133k | rs_meta->set_total_disk_size(0); |
2274 | 133k | rs_meta->set_data_disk_size(0); |
2275 | 133k | rs_meta->set_index_disk_size(0); |
2276 | 133k | rs_meta->set_empty(true); |
2277 | 133k | rs_meta->set_num_segments(0); |
2278 | 133k | rs_meta->set_segments_overlap(NONOVERLAPPING); |
2279 | 133k | rs_meta->set_rowset_state(VISIBLE); |
2280 | 133k | rs_meta->set_creation_time(UnixSeconds()); |
2281 | 133k | rs_meta->set_newest_write_timestamp(UnixSeconds()); |
2282 | | |
2283 | 133k | Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, rowset); |
2284 | 133k | if (!s.ok()) { |
2285 | 0 | LOG_WARNING("Failed to create empty rowset for hole") |
2286 | 0 | .tag("tablet_id", tablet->tablet_id()) |
2287 | 0 | .tag("version", version) |
2288 | 0 | .error(s); |
2289 | 0 | return s; |
2290 | 0 | } |
2291 | 133k | (*rowset)->set_hole_rowset(true); |
2292 | | |
2293 | 133k | return Status::OK(); |
2294 | 133k | } |
2295 | | |
2296 | 2 | Status CloudMetaMgr::list_snapshot(std::vector<SnapshotInfoPB>& snapshots) { |
2297 | 2 | ListSnapshotRequest req; |
2298 | 2 | ListSnapshotResponse res; |
2299 | 2 | req.set_cloud_unique_id(config::cloud_unique_id); |
2300 | 2 | req.set_include_aborted(true); |
2301 | 2 | RETURN_IF_ERROR(retry_rpc("list snapshot", req, &res, &MetaService_Stub::list_snapshot)); |
2302 | 0 | for (auto& snapshot : res.snapshots()) { |
2303 | 0 | snapshots.emplace_back(snapshot); |
2304 | 0 | } |
2305 | 0 | return Status::OK(); |
2306 | 2 | } |
2307 | | |
2308 | | Status CloudMetaMgr::get_snapshot_properties(SnapshotSwitchStatus& switch_status, |
2309 | | int64_t& max_reserved_snapshots, |
2310 | 1 | int64_t& snapshot_interval_seconds) { |
2311 | 1 | GetInstanceRequest req; |
2312 | 1 | GetInstanceResponse res; |
2313 | 1 | req.set_cloud_unique_id(config::cloud_unique_id); |
2314 | 1 | RETURN_IF_ERROR( |
2315 | 1 | retry_rpc("get snapshot properties", req, &res, &MetaService_Stub::get_instance)); |
2316 | 1 | switch_status = res.instance().has_snapshot_switch_status() |
2317 | 1 | ? res.instance().snapshot_switch_status() |
2318 | 1 | : SnapshotSwitchStatus::SNAPSHOT_SWITCH_DISABLED; |
2319 | 1 | max_reserved_snapshots = |
2320 | 1 | res.instance().has_max_reserved_snapshot() ? res.instance().max_reserved_snapshot() : 0; |
2321 | 1 | snapshot_interval_seconds = res.instance().has_snapshot_interval_seconds() |
2322 | 1 | ? res.instance().snapshot_interval_seconds() |
2323 | 1 | : 3600; |
2324 | 1 | return Status::OK(); |
2325 | 1 | } |
2326 | | |
2327 | | Status CloudMetaMgr::update_packed_file_info(const std::string& packed_file_path, |
2328 | 0 | const cloud::PackedFileInfoPB& packed_file_info) { |
2329 | 0 | VLOG_DEBUG << "Updating meta service for packed file: " << packed_file_path << " with " |
2330 | 0 | << packed_file_info.total_slice_num() << " small files" |
2331 | 0 | << ", total bytes: " << packed_file_info.total_slice_bytes(); |
2332 | | |
2333 | | // Create request |
2334 | 0 | cloud::UpdatePackedFileInfoRequest req; |
2335 | 0 | cloud::UpdatePackedFileInfoResponse resp; |
2336 | | |
2337 | | // Set required fields |
2338 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
2339 | 0 | req.set_packed_file_path(packed_file_path); |
2340 | 0 | *req.mutable_packed_file_info() = packed_file_info; |
2341 | | |
2342 | | // Make RPC call using retry pattern |
2343 | 0 | return retry_rpc("update packed file info", req, &resp, |
2344 | 0 | &cloud::MetaService_Stub::update_packed_file_info); |
2345 | 0 | } |
2346 | | |
2347 | | Status CloudMetaMgr::get_cluster_status( |
2348 | | std::unordered_map<std::string, std::pair<int32_t, int64_t>>* result, |
2349 | 66 | std::string* my_cluster_id) { |
2350 | 66 | GetClusterStatusRequest req; |
2351 | 66 | GetClusterStatusResponse resp; |
2352 | 66 | req.add_cloud_unique_ids(config::cloud_unique_id); |
2353 | | |
2354 | 66 | Status s = retry_rpc("get cluster status", req, &resp, &MetaService_Stub::get_cluster_status); |
2355 | 66 | if (!s.ok()) { |
2356 | 0 | return s; |
2357 | 0 | } |
2358 | | |
2359 | 66 | result->clear(); |
2360 | 66 | for (const auto& detail : resp.details()) { |
2361 | 66 | for (const auto& cluster : detail.clusters()) { |
2362 | | // Store cluster status and mtime (mtime is in seconds from MS, convert to ms). |
2363 | | // If mtime is not set, use current time as a conservative default |
2364 | | // to avoid immediate takeover due to elapsed being huge. |
2365 | 66 | int64_t mtime_ms = cluster.has_mtime() ? cluster.mtime() * 1000 : UnixMillis(); |
2366 | 66 | (*result)[cluster.cluster_id()] = {static_cast<int32_t>(cluster.cluster_status()), |
2367 | 66 | mtime_ms}; |
2368 | 66 | } |
2369 | 66 | } |
2370 | | |
2371 | 66 | if (my_cluster_id && resp.has_requester_cluster_id()) { |
2372 | 1 | *my_cluster_id = resp.requester_cluster_id(); |
2373 | 1 | } |
2374 | | |
2375 | 66 | return Status::OK(); |
2376 | 66 | } |
2377 | | |
2378 | | #include "common/compile_check_end.h" |
2379 | | } // namespace doris::cloud |