be/src/cloud/cloud_meta_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | #include "cloud/cloud_meta_mgr.h" |
18 | | |
19 | | #include <brpc/channel.h> |
20 | | #include <brpc/controller.h> |
21 | | #include <brpc/errno.pb.h> |
22 | | #include <bthread/bthread.h> |
23 | | #include <bthread/condition_variable.h> |
24 | | #include <bthread/mutex.h> |
25 | | #include <gen_cpp/FrontendService.h> |
26 | | #include <gen_cpp/HeartbeatService_types.h> |
27 | | #include <gen_cpp/PlanNodes_types.h> |
28 | | #include <gen_cpp/Types_types.h> |
29 | | #include <gen_cpp/cloud.pb.h> |
30 | | #include <gen_cpp/olap_file.pb.h> |
31 | | #include <glog/logging.h> |
32 | | |
33 | | #include <algorithm> |
34 | | #include <atomic> |
35 | | #include <chrono> |
36 | | #include <cstdint> |
37 | | #include <memory> |
38 | | #include <mutex> |
39 | | #include <random> |
40 | | #include <shared_mutex> |
41 | | #include <string> |
42 | | #include <type_traits> |
43 | | #include <vector> |
44 | | |
45 | | #include "cloud/cloud_storage_engine.h" |
46 | | #include "cloud/cloud_tablet.h" |
47 | | #include "cloud/cloud_warm_up_manager.h" |
48 | | #include "cloud/config.h" |
49 | | #include "cloud/delete_bitmap_file_reader.h" |
50 | | #include "cloud/delete_bitmap_file_writer.h" |
51 | | #include "cloud/pb_convert.h" |
52 | | #include "common/config.h" |
53 | | #include "common/logging.h" |
54 | | #include "common/status.h" |
55 | | #include "cpp/sync_point.h" |
56 | | #include "io/fs/obj_storage_client.h" |
57 | | #include "load/stream_load/stream_load_context.h" |
58 | | #include "runtime/exec_env.h" |
59 | | #include "storage/olap_common.h" |
60 | | #include "storage/rowset/rowset.h" |
61 | | #include "storage/rowset/rowset_factory.h" |
62 | | #include "storage/rowset/rowset_fwd.h" |
63 | | #include "storage/storage_engine.h" |
64 | | #include "storage/tablet/tablet_meta.h" |
65 | | #include "util/client_cache.h" |
66 | | #include "util/network_util.h" |
67 | | #include "util/s3_util.h" |
68 | | #include "util/thrift_rpc_helper.h" |
69 | | |
70 | | namespace doris::cloud { |
71 | | #include "common/compile_check_begin.h" |
72 | | using namespace ErrorCode; |
73 | | |
74 | 1.92M | void* run_bthread_work(void* arg) { |
75 | 1.92M | auto* f = reinterpret_cast<std::function<void()>*>(arg); |
76 | 1.92M | (*f)(); |
77 | 1.92M | delete f; |
78 | 1.92M | return nullptr; |
79 | 1.92M | } |
80 | | |
81 | 364k | Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency) { |
82 | 364k | if (tasks.empty()) { |
83 | 2.25k | return Status::OK(); |
84 | 2.25k | } |
85 | | |
86 | 362k | bthread::Mutex lock; |
87 | 362k | bthread::ConditionVariable cond; |
88 | 362k | Status status; // Guard by lock |
89 | 362k | int count = 0; // Guard by lock |
90 | | |
91 | 1.61M | for (const auto& task : tasks) { |
92 | 1.61M | { |
93 | 1.61M | std::unique_lock lk(lock); |
94 | | // Wait until there are available slots |
95 | 1.72M | while (status.ok() && count >= concurrency) { |
96 | 110k | cond.wait(lk); |
97 | 110k | } |
98 | 1.61M | if (!status.ok()) { |
99 | 2 | break; |
100 | 2 | } |
101 | | |
102 | | // Increase running task count |
103 | 1.61M | ++count; |
104 | 1.61M | } |
105 | | |
106 | | // dispatch task into bthreads |
107 | 1.61M | auto* fn = new std::function<void()>([&, &task = task] { |
108 | 1.61M | auto st = task(); |
109 | 1.61M | { |
110 | 1.61M | std::lock_guard lk(lock); |
111 | 1.61M | --count; |
112 | 1.61M | if (!st.ok()) { |
113 | 2 | std::swap(st, status); |
114 | 2 | } |
115 | 1.61M | cond.notify_one(); |
116 | 1.61M | } |
117 | 1.61M | }); |
118 | | |
119 | 1.61M | bthread_t bthread_id; |
120 | 1.61M | if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, fn) != 0) { |
121 | 0 | run_bthread_work(fn); |
122 | 0 | } |
123 | 1.61M | } |
124 | | |
125 | | // Wait until all running tasks have done |
126 | 362k | { |
127 | 362k | std::unique_lock lk(lock); |
128 | 1.06M | while (count > 0) { |
129 | 705k | cond.wait(lk); |
130 | 705k | } |
131 | 362k | } |
132 | | |
133 | 362k | return status; |
134 | 364k | } |
135 | | |
136 | | Status bthread_fork_join(std::vector<std::function<Status()>>&& tasks, int concurrency, |
137 | 306k | std::future<Status>* fut) { |
138 | | // std::function will cause `copy`, we need to use heap memory to avoid copy ctor called |
139 | 306k | auto prom = std::make_shared<std::promise<Status>>(); |
140 | 306k | *fut = prom->get_future(); |
141 | 306k | std::function<void()>* fn = new std::function<void()>( |
142 | 308k | [tasks = std::move(tasks), concurrency, p = std::move(prom)]() mutable { |
143 | 308k | p->set_value(bthread_fork_join(tasks, concurrency)); |
144 | 308k | }); |
145 | | |
146 | 306k | bthread_t bthread_id; |
147 | 306k | if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, fn) != 0) { |
148 | 0 | delete fn; |
149 | 0 | return Status::InternalError<false>("failed to create bthread"); |
150 | 0 | } |
151 | 306k | return Status::OK(); |
152 | 306k | } |
153 | | |
154 | | namespace { |
155 | | constexpr int kBrpcRetryTimes = 3; |
156 | | |
157 | | bvar::LatencyRecorder _get_rowset_latency("doris_cloud_meta_mgr_get_rowset"); |
158 | | bvar::LatencyRecorder g_cloud_commit_txn_resp_redirect_latency("cloud_table_stats_report_latency"); |
159 | | bvar::Adder<uint64_t> g_cloud_meta_mgr_rpc_timeout_count("cloud_meta_mgr_rpc_timeout_count"); |
160 | | bvar::Window<bvar::Adder<uint64_t>> g_cloud_ms_rpc_timeout_count_window( |
161 | | "cloud_meta_mgr_rpc_timeout_qps", &g_cloud_meta_mgr_rpc_timeout_count, 30); |
162 | | bvar::LatencyRecorder g_cloud_be_mow_get_dbm_lock_backoff_sleep_time( |
163 | | "cloud_be_mow_get_dbm_lock_backoff_sleep_time"); |
164 | | bvar::Adder<uint64_t> g_cloud_version_hole_filled_count("cloud_version_hole_filled_count"); |
165 | | |
166 | | class MetaServiceProxy { |
167 | | public: |
168 | 1.25M | static Status get_proxy(MetaServiceProxy** proxy) { |
169 | | // The 'stub' is a useless parameter, added only to reuse the `get_pooled_client` function. |
170 | 1.25M | std::shared_ptr<MetaService_Stub> stub; |
171 | 1.25M | return get_pooled_client(&stub, proxy); |
172 | 1.25M | } |
173 | | |
174 | 0 | void set_unhealthy() { |
175 | 0 | std::unique_lock lock(_mutex); |
176 | 0 | maybe_unhealthy = true; |
177 | 0 | } |
178 | | |
179 | 2.49M | bool need_reconn(long now) { |
180 | 2.49M | return maybe_unhealthy && ((now - last_reconn_time_ms.front()) > |
181 | 0 | config::meta_service_rpc_reconnect_interval_ms); |
182 | 2.49M | } |
183 | | |
184 | 2.49M | Status get(std::shared_ptr<MetaService_Stub>* stub) { |
185 | 2.49M | using namespace std::chrono; |
186 | | |
187 | 2.49M | auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
188 | 2.49M | { |
189 | 2.49M | std::shared_lock lock(_mutex); |
190 | 2.50M | if (_deadline_ms >= now && !is_idle_timeout(now) && !need_reconn(now)) { |
191 | 2.50M | _last_access_at_ms.store(now, std::memory_order_relaxed); |
192 | 2.50M | *stub = _stub; |
193 | 2.50M | return Status::OK(); |
194 | 2.50M | } |
195 | 2.49M | } |
196 | | |
197 | 18.4E | auto channel = std::make_unique<brpc::Channel>(); |
198 | 18.4E | Status s = init_channel(channel.get()); |
199 | 18.4E | if (!s.ok()) [[unlikely]] { |
200 | 0 | return s; |
201 | 0 | } |
202 | | |
203 | 18.4E | *stub = std::make_shared<MetaService_Stub>(channel.release(), |
204 | 18.4E | google::protobuf::Service::STUB_OWNS_CHANNEL); |
205 | | |
206 | 18.4E | long deadline = now; |
207 | | // connection age only works without list endpoint. |
208 | 18.4E | if (config::meta_service_connection_age_base_seconds > 0) { |
209 | 1.92k | std::default_random_engine rng(static_cast<uint32_t>(now)); |
210 | 1.92k | std::uniform_int_distribution<> uni( |
211 | 1.92k | config::meta_service_connection_age_base_seconds, |
212 | 1.92k | config::meta_service_connection_age_base_seconds * 2); |
213 | 1.92k | deadline = now + duration_cast<milliseconds>(seconds(uni(rng))).count(); |
214 | 1.92k | } |
215 | | |
216 | | // Last one WIN |
217 | 18.4E | std::unique_lock lock(_mutex); |
218 | 18.4E | _last_access_at_ms.store(now, std::memory_order_relaxed); |
219 | 18.4E | _deadline_ms = deadline; |
220 | 18.4E | _stub = *stub; |
221 | | |
222 | 18.4E | last_reconn_time_ms.push(now); |
223 | 18.4E | last_reconn_time_ms.pop(); |
224 | 18.4E | maybe_unhealthy = false; |
225 | | |
226 | 18.4E | return Status::OK(); |
227 | 18.4E | } |
228 | | |
229 | | private: |
230 | 2.50M | static bool is_meta_service_endpoint_list() { |
231 | 2.50M | return config::meta_service_endpoint.find(',') != std::string::npos; |
232 | 2.50M | } |
233 | | |
234 | | /** |
235 | | * This function initializes a pool of `MetaServiceProxy` objects and selects one using |
236 | | * round-robin. It returns a client stub via the selected proxy. |
237 | | * |
238 | | * @param stub A pointer to a shared pointer of `MetaService_Stub` to be retrieved. |
239 | | * @param proxy (Optional) A pointer to store the selected `MetaServiceProxy`. |
240 | | * |
241 | | * @return Status Returns `Status::OK()` on success or an error status on failure. |
242 | | */ |
243 | | static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub, |
244 | 1.25M | MetaServiceProxy** proxy) { |
245 | 1.25M | static std::once_flag proxies_flag; |
246 | 1.25M | static size_t num_proxies = 1; |
247 | 1.25M | static std::atomic<size_t> index(0); |
248 | 1.25M | static std::unique_ptr<MetaServiceProxy[]> proxies; |
249 | 1.25M | if (config::meta_service_endpoint.empty()) { |
250 | 21 | return Status::InvalidArgument( |
251 | 21 | "Meta service endpoint is empty. Please configure manually or wait for " |
252 | 21 | "heartbeat to obtain."); |
253 | 21 | } |
254 | 1.25M | std::call_once( |
255 | 1.25M | proxies_flag, +[]() { |
256 | 1 | if (config::meta_service_connection_pooled) { |
257 | 1 | num_proxies = config::meta_service_connection_pool_size; |
258 | 1 | } |
259 | 1 | proxies = std::make_unique<MetaServiceProxy[]>(num_proxies); |
260 | 1 | }); |
261 | | |
262 | 1.25M | for (size_t i = 0; i + 1 < num_proxies; ++i) { |
263 | 1.25M | size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies; |
264 | 1.25M | Status s = proxies[next_index].get(stub); |
265 | 1.25M | if (proxy != nullptr) { |
266 | 1.25M | *proxy = &(proxies[next_index]); |
267 | 1.25M | } |
268 | 1.25M | if (s.ok()) return Status::OK(); |
269 | 1.25M | } |
270 | | |
271 | 3.63k | size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies; |
272 | 3.63k | if (proxy != nullptr) { |
273 | 0 | *proxy = &(proxies[next_index]); |
274 | 0 | } |
275 | 3.63k | return proxies[next_index].get(stub); |
276 | 1.25M | } |
277 | | |
278 | 1.91k | static Status init_channel(brpc::Channel* channel) { |
279 | 1.91k | static std::atomic<size_t> index = 1; |
280 | | |
281 | 1.91k | const char* load_balancer_name = nullptr; |
282 | 1.91k | std::string endpoint; |
283 | 1.91k | if (is_meta_service_endpoint_list()) { |
284 | 0 | endpoint = fmt::format("list://{}", config::meta_service_endpoint); |
285 | 0 | load_balancer_name = "random"; |
286 | 1.91k | } else { |
287 | 1.91k | std::string ip; |
288 | 1.91k | uint16_t port; |
289 | 1.91k | Status s = get_meta_service_ip_and_port(&ip, &port); |
290 | 1.91k | if (!s.ok()) { |
291 | 0 | LOG(WARNING) << "fail to get meta service ip and port: " << s; |
292 | 0 | return s; |
293 | 0 | } |
294 | | |
295 | 1.91k | endpoint = get_host_port(ip, port); |
296 | 1.91k | } |
297 | | |
298 | 1.91k | brpc::ChannelOptions options; |
299 | 1.91k | options.connection_group = |
300 | 1.91k | fmt::format("ms_{}", index.fetch_add(1, std::memory_order_relaxed)); |
301 | 1.91k | if (channel->Init(endpoint.c_str(), load_balancer_name, &options) != 0) { |
302 | 0 | return Status::InvalidArgument("failed to init brpc channel, endpoint: {}", endpoint); |
303 | 0 | } |
304 | 1.91k | return Status::OK(); |
305 | 1.91k | } |
306 | | |
307 | 1.91k | static Status get_meta_service_ip_and_port(std::string* ip, uint16_t* port) { |
308 | 1.91k | std::string parsed_host; |
309 | 1.91k | if (!parse_endpoint(config::meta_service_endpoint, &parsed_host, port)) { |
310 | 0 | return Status::InvalidArgument("invalid meta service endpoint: {}", |
311 | 0 | config::meta_service_endpoint); |
312 | 0 | } |
313 | 1.91k | if (is_valid_ip(parsed_host)) { |
314 | 1.91k | *ip = std::move(parsed_host); |
315 | 1.91k | return Status::OK(); |
316 | 1.91k | } |
317 | 1 | return hostname_to_ip(parsed_host, *ip); |
318 | 1.91k | } |
319 | | |
320 | 2.50M | bool is_idle_timeout(long now) { |
321 | 2.50M | auto idle_timeout_ms = config::meta_service_idle_connection_timeout_ms; |
322 | | // idle timeout only works without list endpoint. |
323 | 2.50M | return !is_meta_service_endpoint_list() && idle_timeout_ms > 0 && |
324 | 2.50M | _last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now; |
325 | 2.50M | } |
326 | | |
327 | | std::shared_mutex _mutex; |
328 | | std::atomic<long> _last_access_at_ms {0}; |
329 | | long _deadline_ms {0}; |
330 | | std::shared_ptr<MetaService_Stub> _stub; |
331 | | |
332 | | std::queue<long> last_reconn_time_ms {std::deque<long> {0, 0, 0}}; |
333 | | bool maybe_unhealthy = false; |
334 | | }; |
335 | | |
336 | | template <typename T, typename... Ts> |
337 | | struct is_any : std::disjunction<std::is_same<T, Ts>...> {}; |
338 | | |
339 | | template <typename T, typename... Ts> |
340 | | constexpr bool is_any_v = is_any<T, Ts...>::value; |
341 | | |
342 | | template <typename Request> |
343 | 667 | static std::string debug_info(const Request& req) { |
344 | 667 | if constexpr (is_any_v<Request, CommitTxnRequest, AbortTxnRequest, PrecommitTxnRequest>) { |
345 | 0 | return fmt::format(" txn_id={}", req.txn_id()); |
346 | 0 | } else if constexpr (is_any_v<Request, StartTabletJobRequest, FinishTabletJobRequest>) { |
347 | 0 | return fmt::format(" tablet_id={}", req.job().idx().tablet_id()); |
348 | 0 | } else if constexpr (is_any_v<Request, UpdateDeleteBitmapRequest>) { |
349 | 0 | return fmt::format(" tablet_id={}, lock_id={}", req.tablet_id(), req.lock_id()); |
350 | 667 | } else if constexpr (is_any_v<Request, GetDeleteBitmapUpdateLockRequest>) { |
351 | 667 | return fmt::format(" table_id={}, lock_id={}", req.table_id(), req.lock_id()); |
352 | 667 | } else if constexpr (is_any_v<Request, GetTabletRequest>) { |
353 | 0 | return fmt::format(" tablet_id={}", req.tablet_id()); |
354 | | } else if constexpr (is_any_v<Request, GetObjStoreInfoRequest, ListSnapshotRequest, |
355 | 0 | GetInstanceRequest, GetClusterStatusRequest>) { |
356 | 0 | return ""; |
357 | 0 | } else if constexpr (is_any_v<Request, CreateRowsetRequest>) { |
358 | 0 | return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id()); |
359 | | } else if constexpr (is_any_v<Request, RemoveDeleteBitmapRequest>) { |
360 | | return fmt::format(" tablet_id={}", req.tablet_id()); |
361 | 0 | } else if constexpr (is_any_v<Request, RemoveDeleteBitmapUpdateLockRequest>) { |
362 | 0 | return fmt::format(" table_id={}, tablet_id={}, lock_id={}", req.table_id(), |
363 | 0 | req.tablet_id(), req.lock_id()); |
364 | 0 | } else if constexpr (is_any_v<Request, GetDeleteBitmapRequest>) { |
365 | 0 | return fmt::format(" tablet_id={}", req.tablet_id()); |
366 | | } else if constexpr (is_any_v<Request, GetSchemaDictRequest>) { |
367 | | return fmt::format(" index_id={}", req.index_id()); |
368 | 0 | } else if constexpr (is_any_v<Request, RestoreJobRequest>) { |
369 | 0 | return fmt::format(" tablet_id={}", req.tablet_id()); |
370 | 0 | } else if constexpr (is_any_v<Request, UpdatePackedFileInfoRequest>) { |
371 | 0 | return fmt::format(" packed_file_path={}", req.packed_file_path()); |
372 | | } else { |
373 | | static_assert(!sizeof(Request)); |
374 | | } |
375 | 667 | } Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_16GetTabletRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_22GetDeleteBitmapRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_19CreateRowsetRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_16CommitTxnRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_15AbortTxnRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_19PrecommitTxnRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_17RestoreJobRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_22GetObjStoreInfoRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_21StartTabletJobRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_22FinishTabletJobRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_25UpdateDeleteBitmapRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_32GetDeleteBitmapUpdateLockRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Line | Count | Source | 343 | 667 | static std::string debug_info(const Request& req) { | 344 | | if constexpr (is_any_v<Request, CommitTxnRequest, AbortTxnRequest, PrecommitTxnRequest>) { | 345 | | return fmt::format(" txn_id={}", req.txn_id()); | 346 | | } else if constexpr (is_any_v<Request, StartTabletJobRequest, FinishTabletJobRequest>) { | 347 | | return fmt::format(" tablet_id={}", req.job().idx().tablet_id()); | 348 | | } else if constexpr (is_any_v<Request, UpdateDeleteBitmapRequest>) { | 349 | | return fmt::format(" tablet_id={}, lock_id={}", req.tablet_id(), req.lock_id()); | 350 | 667 | } else if constexpr (is_any_v<Request, GetDeleteBitmapUpdateLockRequest>) { | 351 | 667 | return fmt::format(" table_id={}, lock_id={}", req.table_id(), req.lock_id()); | 352 | | } else if constexpr (is_any_v<Request, GetTabletRequest>) { | 353 | | return fmt::format(" tablet_id={}", req.tablet_id()); | 354 | | } else if constexpr (is_any_v<Request, GetObjStoreInfoRequest, ListSnapshotRequest, | 355 | | GetInstanceRequest, GetClusterStatusRequest>) { | 356 | | return ""; | 357 | | } else if constexpr (is_any_v<Request, CreateRowsetRequest>) { | 358 | | return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id()); | 359 | | } else if constexpr (is_any_v<Request, RemoveDeleteBitmapRequest>) { | 360 | | return fmt::format(" tablet_id={}", req.tablet_id()); | 361 | | } else if constexpr (is_any_v<Request, RemoveDeleteBitmapUpdateLockRequest>) { | 362 | | return fmt::format(" table_id={}, tablet_id={}, lock_id={}", req.table_id(), | 363 | | req.tablet_id(), req.lock_id()); | 364 | | } else if constexpr (is_any_v<Request, GetDeleteBitmapRequest>) { | 365 | | return fmt::format(" tablet_id={}", req.tablet_id()); | 366 | | } else if constexpr (is_any_v<Request, GetSchemaDictRequest>) { | 367 | | return fmt::format(" index_id={}", req.index_id()); | 368 | | } else if constexpr (is_any_v<Request, RestoreJobRequest>) { | 369 | | return fmt::format(" tablet_id={}", req.tablet_id()); | 370 | | } else if constexpr (is_any_v<Request, UpdatePackedFileInfoRequest>) { | 371 | | return fmt::format(" packed_file_path={}", req.packed_file_path()); | 372 | | } else { | 373 | | static_assert(!sizeof(Request)); | 374 | | } | 375 | 667 | } |
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_35RemoveDeleteBitmapUpdateLockRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_19ListSnapshotRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_18GetInstanceRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_27UpdatePackedFileInfoRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_23GetClusterStatusRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ |
376 | | |
377 | 896k | inline std::default_random_engine make_random_engine() { |
378 | 896k | return std::default_random_engine( |
379 | 896k | static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count())); |
380 | 896k | } |
381 | | |
382 | | template <typename Request, typename Response> |
383 | | using MetaServiceMethod = void (MetaService_Stub::*)(::google::protobuf::RpcController*, |
384 | | const Request*, Response*, |
385 | | ::google::protobuf::Closure*); |
386 | | |
387 | | template <typename Request, typename Response> |
388 | | Status retry_rpc(std::string_view op_name, const Request& req, Response* res, |
389 | 893k | MetaServiceMethod<Request, Response> method) { |
390 | 893k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); |
391 | 893k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); |
392 | | |
393 | | // Applies only to the current file, and all req are non-const, but passed as const types. |
394 | 893k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); |
395 | | |
396 | 893k | int retry_times = 0; |
397 | 893k | uint32_t duration_ms = 0; |
398 | 893k | std::string error_msg; |
399 | 893k | std::default_random_engine rng = make_random_engine(); |
400 | 893k | std::uniform_int_distribution<uint32_t> u(20, 200); |
401 | 893k | std::uniform_int_distribution<uint32_t> u2(500, 1000); |
402 | 893k | MetaServiceProxy* proxy; |
403 | 893k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); |
404 | 893k | while (true) { |
405 | 891k | std::shared_ptr<MetaService_Stub> stub; |
406 | 891k | RETURN_IF_ERROR(proxy->get(&stub)); |
407 | 891k | brpc::Controller cntl; |
408 | 891k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { |
409 | 92.4k | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); |
410 | 799k | } else { |
411 | 799k | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); |
412 | 799k | } |
413 | 891k | cntl.set_max_retry(kBrpcRetryTimes); |
414 | 891k | res->Clear(); |
415 | 891k | int error_code = 0; |
416 | 891k | (stub.get()->*method)(&cntl, &req, res, nullptr); |
417 | 891k | if (cntl.Failed()) [[unlikely]] { |
418 | 0 | error_msg = cntl.ErrorText(); |
419 | 0 | error_code = cntl.ErrorCode(); |
420 | 0 | proxy->set_unhealthy(); |
421 | 894k | } else if (res->status().code() == MetaServiceCode::OK) { |
422 | 894k | return Status::OK(); |
423 | 18.4E | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { |
424 | 18 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, |
425 | 18 | res->status().msg()); |
426 | 18.4E | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { |
427 | 1.17k | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, |
428 | 1.17k | res->status().msg()); |
429 | 18.4E | } else { |
430 | 18.4E | error_msg = res->status().msg(); |
431 | 18.4E | } |
432 | | |
433 | 18.4E | if (error_code == brpc::ERPCTIMEDOUT) { |
434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; |
435 | 0 | } |
436 | | |
437 | 18.4E | ++retry_times; |
438 | 18.4E | if (retry_times > config::meta_service_rpc_retry_times || |
439 | 18.4E | (retry_times > config::meta_service_rpc_timeout_retry_times && |
440 | 0 | error_code == brpc::ERPCTIMEDOUT) || |
441 | 18.4E | (retry_times > config::meta_service_conflict_error_retry_times && |
442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { |
443 | 0 | break; |
444 | 0 | } |
445 | | |
446 | 18.4E | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); |
447 | 18.4E | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times |
448 | 18.4E | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); |
449 | 18.4E | bthread_usleep(duration_ms * 1000); |
450 | 18.4E | } |
451 | 18.4E | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); |
452 | 893k | } cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_16GetTabletRequestENS0_17GetTabletResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 331k | MetaServiceMethod<Request, Response> method) { | 390 | 331k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 331k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 331k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 331k | int retry_times = 0; | 397 | 331k | uint32_t duration_ms = 0; | 398 | 331k | std::string error_msg; | 399 | 331k | std::default_random_engine rng = make_random_engine(); | 400 | 331k | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 331k | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 331k | MetaServiceProxy* proxy; | 403 | 331k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 331k | while (true) { | 405 | 330k | std::shared_ptr<MetaService_Stub> stub; | 406 | 330k | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 330k | brpc::Controller cntl; | 408 | 330k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 330k | } else { | 411 | 330k | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 330k | } | 413 | 330k | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 330k | res->Clear(); | 415 | 330k | int error_code = 0; | 416 | 330k | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 330k | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 330k | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 330k | return Status::OK(); | 423 | 330k | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 228 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 100 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 100 | res->status().msg()); | 429 | 128 | } else { | 430 | 128 | error_msg = res->status().msg(); | 431 | 128 | } | 432 | | | 433 | 128 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | | | 437 | 128 | ++retry_times; | 438 | 128 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 128 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 128 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 128 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 128 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 128 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 128 | bthread_usleep(duration_ms * 1000); | 450 | 128 | } | 451 | 317 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 331k | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_22GetDeleteBitmapRequestENS0_23GetDeleteBitmapResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 28.7k | MetaServiceMethod<Request, Response> method) { | 390 | 28.7k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 28.7k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 28.7k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 28.7k | int retry_times = 0; | 397 | 28.7k | uint32_t duration_ms = 0; | 398 | 28.7k | std::string error_msg; | 399 | 28.7k | std::default_random_engine rng = make_random_engine(); | 400 | 28.7k | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 28.7k | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 28.7k | MetaServiceProxy* proxy; | 403 | 28.7k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 28.7k | while (true) { | 405 | 28.7k | std::shared_ptr<MetaService_Stub> stub; | 406 | 28.7k | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 28.7k | brpc::Controller cntl; | 408 | 28.7k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 28.7k | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 28.7k | } else { | 411 | 18 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 18 | } | 413 | 28.7k | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 28.7k | res->Clear(); | 415 | 28.7k | int error_code = 0; | 416 | 28.7k | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 28.7k | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 28.7k | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 28.7k | return Status::OK(); | 423 | 18.4E | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 18.4E | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 0 | res->status().msg()); | 429 | 18.4E | } else { | 430 | 18.4E | error_msg = res->status().msg(); | 431 | 18.4E | } | 432 | | | 433 | 18.4E | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | | | 437 | 18.4E | ++retry_times; | 438 | 18.4E | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 18.4E | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 18.4E | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 18.4E | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 18.4E | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 18.4E | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 18.4E | bthread_usleep(duration_ms * 1000); | 450 | 18.4E | } | 451 | 18.4E | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 28.7k | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_19CreateRowsetRequestENS0_20CreateRowsetResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 403k | MetaServiceMethod<Request, Response> method) { | 390 | 403k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 403k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 403k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 403k | int retry_times = 0; | 397 | 403k | uint32_t duration_ms = 0; | 398 | 403k | std::string error_msg; | 399 | 403k | std::default_random_engine rng = make_random_engine(); | 400 | 403k | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 403k | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 403k | MetaServiceProxy* proxy; | 403 | 403k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 403k | while (true) { | 405 | 402k | std::shared_ptr<MetaService_Stub> stub; | 406 | 402k | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 402k | brpc::Controller cntl; | 408 | 403k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 402k | } else { | 411 | 402k | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 402k | } | 413 | 402k | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 402k | res->Clear(); | 415 | 402k | int error_code = 0; | 416 | 402k | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 402k | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 405k | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 405k | return Status::OK(); | 423 | 18.4E | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 18.4E | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 0 | res->status().msg()); | 429 | 18.4E | } else { | 430 | 18.4E | error_msg = res->status().msg(); | 431 | 18.4E | } | 432 | | | 433 | 18.4E | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | | | 437 | 18.4E | ++retry_times; | 438 | 18.4E | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 18.4E | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 18.4E | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 18.4E | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 18.4E | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 18.4E | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 18.4E | bthread_usleep(duration_ms * 1000); | 450 | 18.4E | } | 451 | 18.4E | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 403k | } |
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_16CommitTxnRequestENS0_17CommitTxnResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_15AbortTxnRequestENS0_16AbortTxnResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 487 | MetaServiceMethod<Request, Response> method) { | 390 | 487 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 487 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 487 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 487 | int retry_times = 0; | 397 | 487 | uint32_t duration_ms = 0; | 398 | 487 | std::string error_msg; | 399 | 487 | std::default_random_engine rng = make_random_engine(); | 400 | 487 | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 487 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 487 | MetaServiceProxy* proxy; | 403 | 487 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 487 | while (true) { | 405 | 487 | std::shared_ptr<MetaService_Stub> stub; | 406 | 487 | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 487 | brpc::Controller cntl; | 408 | 487 | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 487 | } else { | 411 | 487 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 487 | } | 413 | 487 | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 487 | res->Clear(); | 415 | 487 | int error_code = 0; | 416 | 487 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 487 | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 487 | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 478 | return Status::OK(); | 423 | 478 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 9 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 9 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 9 | res->status().msg()); | 429 | 9 | } else { | 430 | 0 | error_msg = res->status().msg(); | 431 | 0 | } | 432 | | | 433 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | |
| 437 | 0 | ++retry_times; | 438 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 0 | bthread_usleep(duration_ms * 1000); | 450 | 0 | } | 451 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 487 | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_19PrecommitTxnRequestENS0_20PrecommitTxnResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 33 | MetaServiceMethod<Request, Response> method) { | 390 | 33 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 33 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 33 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 33 | int retry_times = 0; | 397 | 33 | uint32_t duration_ms = 0; | 398 | 33 | std::string error_msg; | 399 | 33 | std::default_random_engine rng = make_random_engine(); | 400 | 33 | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 33 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 33 | MetaServiceProxy* proxy; | 403 | 33 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 33 | while (true) { | 405 | 33 | std::shared_ptr<MetaService_Stub> stub; | 406 | 33 | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 33 | brpc::Controller cntl; | 408 | 33 | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 33 | } else { | 411 | 33 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 33 | } | 413 | 33 | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 33 | res->Clear(); | 415 | 33 | int error_code = 0; | 416 | 33 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 33 | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 33 | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 33 | return Status::OK(); | 423 | 33 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 0 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 0 | res->status().msg()); | 429 | 0 | } else { | 430 | 0 | error_msg = res->status().msg(); | 431 | 0 | } | 432 | | | 433 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | |
| 437 | 0 | ++retry_times; | 438 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 0 | bthread_usleep(duration_ms * 1000); | 450 | 0 | } | 451 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 33 | } |
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_17RestoreJobRequestENS0_18RestoreJobResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_22GetObjStoreInfoRequestENS0_23GetObjStoreInfoResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 126 | MetaServiceMethod<Request, Response> method) { | 390 | 126 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 126 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 126 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 126 | int retry_times = 0; | 397 | 126 | uint32_t duration_ms = 0; | 398 | 126 | std::string error_msg; | 399 | 126 | std::default_random_engine rng = make_random_engine(); | 400 | 126 | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 126 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 126 | MetaServiceProxy* proxy; | 403 | 126 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 126 | while (true) { | 405 | 126 | std::shared_ptr<MetaService_Stub> stub; | 406 | 126 | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 126 | brpc::Controller cntl; | 408 | 126 | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 126 | } else { | 411 | 126 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 126 | } | 413 | 126 | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 126 | res->Clear(); | 415 | 126 | int error_code = 0; | 416 | 126 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 126 | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 126 | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 126 | return Status::OK(); | 423 | 126 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 0 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 0 | res->status().msg()); | 429 | 0 | } else { | 430 | 0 | error_msg = res->status().msg(); | 431 | 0 | } | 432 | | | 433 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | |
| 437 | 0 | ++retry_times; | 438 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 0 | bthread_usleep(duration_ms * 1000); | 450 | 0 | } | 451 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 126 | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_21StartTabletJobRequestENS0_22StartTabletJobResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 23.1k | MetaServiceMethod<Request, Response> method) { | 390 | 23.1k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 23.1k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 23.1k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 23.1k | int retry_times = 0; | 397 | 23.1k | uint32_t duration_ms = 0; | 398 | 23.1k | std::string error_msg; | 399 | 23.1k | std::default_random_engine rng = make_random_engine(); | 400 | 23.1k | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 23.1k | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 23.1k | MetaServiceProxy* proxy; | 403 | 23.1k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 23.2k | while (true) { | 405 | 23.2k | std::shared_ptr<MetaService_Stub> stub; | 406 | 23.2k | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 23.2k | brpc::Controller cntl; | 408 | 23.2k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 23.2k | } else { | 411 | 23.2k | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 23.2k | } | 413 | 23.2k | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 23.2k | res->Clear(); | 415 | 23.2k | int error_code = 0; | 416 | 23.2k | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 23.2k | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 23.2k | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 22.9k | return Status::OK(); | 423 | 22.9k | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 260 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 260 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 260 | res->status().msg()); | 429 | 18.4E | } else { | 430 | 18.4E | error_msg = res->status().msg(); | 431 | 18.4E | } | 432 | | | 433 | 18.4E | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | | | 437 | 18.4E | ++retry_times; | 438 | 18.4E | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 18.4E | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 18.4E | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 18.4E | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 18.4E | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 18.4E | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 18.4E | bthread_usleep(duration_ms * 1000); | 450 | 18.4E | } | 451 | 18.4E | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 23.1k | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_22FinishTabletJobRequestENS0_23FinishTabletJobResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 21.8k | MetaServiceMethod<Request, Response> method) { | 390 | 21.8k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 21.8k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 21.8k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 21.8k | int retry_times = 0; | 397 | 21.8k | uint32_t duration_ms = 0; | 398 | 21.8k | std::string error_msg; | 399 | 21.8k | std::default_random_engine rng = make_random_engine(); | 400 | 21.8k | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 21.8k | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 21.8k | MetaServiceProxy* proxy; | 403 | 21.8k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 21.8k | while (true) { | 405 | 21.8k | std::shared_ptr<MetaService_Stub> stub; | 406 | 21.8k | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 21.8k | brpc::Controller cntl; | 408 | 21.9k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 21.8k | } else { | 411 | 21.8k | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 21.8k | } | 413 | 21.8k | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 21.8k | res->Clear(); | 415 | 21.8k | int error_code = 0; | 416 | 21.8k | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 21.8k | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 21.9k | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 21.9k | return Status::OK(); | 423 | 18.4E | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 18 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 18 | res->status().msg()); | 426 | 18.4E | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 88 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 88 | res->status().msg()); | 429 | 18.4E | } else { | 430 | 18.4E | error_msg = res->status().msg(); | 431 | 18.4E | } | 432 | | | 433 | 18.4E | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | | | 437 | 18.4E | ++retry_times; | 438 | 18.4E | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 18.4E | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 18.4E | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 18.4E | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 18.4E | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 18.4E | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 18.4E | bthread_usleep(duration_ms * 1000); | 450 | 18.4E | } | 451 | 18.4E | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 21.8k | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_25UpdateDeleteBitmapRequestENS0_26UpdateDeleteBitmapResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 63.4k | MetaServiceMethod<Request, Response> method) { | 390 | 63.4k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 63.4k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 63.4k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 63.4k | int retry_times = 0; | 397 | 63.4k | uint32_t duration_ms = 0; | 398 | 63.4k | std::string error_msg; | 399 | 63.4k | std::default_random_engine rng = make_random_engine(); | 400 | 63.4k | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 63.4k | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 63.4k | MetaServiceProxy* proxy; | 403 | 63.4k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 63.4k | while (true) { | 405 | 63.4k | std::shared_ptr<MetaService_Stub> stub; | 406 | 63.4k | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 63.4k | brpc::Controller cntl; | 408 | 63.7k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 63.7k | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 18.4E | } else { | 411 | 18.4E | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 18.4E | } | 413 | 63.4k | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 63.4k | res->Clear(); | 415 | 63.4k | int error_code = 0; | 416 | 63.4k | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 63.4k | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 64.1k | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 64.1k | return Status::OK(); | 423 | 18.4E | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 18.4E | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 25 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 25 | res->status().msg()); | 429 | 18.4E | } else { | 430 | 18.4E | error_msg = res->status().msg(); | 431 | 18.4E | } | 432 | | | 433 | 18.4E | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | | | 437 | 18.4E | ++retry_times; | 438 | 18.4E | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 18.4E | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 18.4E | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 18.4E | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 18.4E | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 18.4E | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 18.4E | bthread_usleep(duration_ms * 1000); | 450 | 18.4E | } | 451 | 18.4E | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 63.4k | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_32GetDeleteBitmapUpdateLockRequestENS0_33GetDeleteBitmapUpdateLockResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 5.91k | MetaServiceMethod<Request, Response> method) { | 390 | 5.91k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 5.91k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 5.91k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 5.91k | int retry_times = 0; | 397 | 5.91k | uint32_t duration_ms = 0; | 398 | 5.91k | std::string error_msg; | 399 | 5.91k | std::default_random_engine rng = make_random_engine(); | 400 | 5.91k | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 5.91k | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 5.91k | MetaServiceProxy* proxy; | 403 | 5.91k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 5.91k | while (true) { | 405 | 5.91k | std::shared_ptr<MetaService_Stub> stub; | 406 | 5.91k | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 5.91k | brpc::Controller cntl; | 408 | 5.91k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 5.91k | } else { | 411 | 5.91k | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 5.91k | } | 413 | 5.91k | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 5.91k | res->Clear(); | 415 | 5.91k | int error_code = 0; | 416 | 5.91k | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 5.91k | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 5.91k | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 5.25k | return Status::OK(); | 423 | 5.25k | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 667 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 667 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 667 | res->status().msg()); | 429 | 18.4E | } else { | 430 | 18.4E | error_msg = res->status().msg(); | 431 | 18.4E | } | 432 | | | 433 | 18.4E | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | | | 437 | 18.4E | ++retry_times; | 438 | 18.4E | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 18.4E | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 18.4E | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 18.4E | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 18.4E | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 18.4E | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 18.4E | bthread_usleep(duration_ms * 1000); | 450 | 18.4E | } | 451 | 18.4E | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 5.91k | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_35RemoveDeleteBitmapUpdateLockRequestENS0_36RemoveDeleteBitmapUpdateLockResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 23 | MetaServiceMethod<Request, Response> method) { | 390 | 23 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 23 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 23 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 23 | int retry_times = 0; | 397 | 23 | uint32_t duration_ms = 0; | 398 | 23 | std::string error_msg; | 399 | 23 | std::default_random_engine rng = make_random_engine(); | 400 | 23 | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 23 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 23 | MetaServiceProxy* proxy; | 403 | 23 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 23 | while (true) { | 405 | 23 | std::shared_ptr<MetaService_Stub> stub; | 406 | 23 | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 23 | brpc::Controller cntl; | 408 | 23 | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 23 | } else { | 411 | 23 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 23 | } | 413 | 23 | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 23 | res->Clear(); | 415 | 23 | int error_code = 0; | 416 | 23 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 23 | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 23 | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 2 | return Status::OK(); | 423 | 21 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 21 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 21 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 21 | res->status().msg()); | 429 | 21 | } else { | 430 | 0 | error_msg = res->status().msg(); | 431 | 0 | } | 432 | | | 433 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | |
| 437 | 0 | ++retry_times; | 438 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 0 | bthread_usleep(duration_ms * 1000); | 450 | 0 | } | 451 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 23 | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_19ListSnapshotRequestENS0_20ListSnapshotResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 2 | MetaServiceMethod<Request, Response> method) { | 390 | 2 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 2 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 2 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 2 | int retry_times = 0; | 397 | 2 | uint32_t duration_ms = 0; | 398 | 2 | std::string error_msg; | 399 | 2 | std::default_random_engine rng = make_random_engine(); | 400 | 2 | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 2 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 2 | MetaServiceProxy* proxy; | 403 | 2 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 2 | while (true) { | 405 | 2 | std::shared_ptr<MetaService_Stub> stub; | 406 | 2 | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 2 | brpc::Controller cntl; | 408 | 2 | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 2 | } else { | 411 | 2 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 2 | } | 413 | 2 | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 2 | res->Clear(); | 415 | 2 | int error_code = 0; | 416 | 2 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 2 | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 2 | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 0 | return Status::OK(); | 423 | 2 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 2 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 2 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 2 | res->status().msg()); | 429 | 2 | } else { | 430 | 0 | error_msg = res->status().msg(); | 431 | 0 | } | 432 | | | 433 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | |
| 437 | 0 | ++retry_times; | 438 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 0 | bthread_usleep(duration_ms * 1000); | 450 | 0 | } | 451 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 2 | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_18GetInstanceRequestENS0_19GetInstanceResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 1 | MetaServiceMethod<Request, Response> method) { | 390 | 1 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 1 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 1 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 1 | int retry_times = 0; | 397 | 1 | uint32_t duration_ms = 0; | 398 | 1 | std::string error_msg; | 399 | 1 | std::default_random_engine rng = make_random_engine(); | 400 | 1 | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 1 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 1 | MetaServiceProxy* proxy; | 403 | 1 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 1 | while (true) { | 405 | 1 | std::shared_ptr<MetaService_Stub> stub; | 406 | 1 | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 1 | brpc::Controller cntl; | 408 | 1 | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 1 | } else { | 411 | 1 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 1 | } | 413 | 1 | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 1 | res->Clear(); | 415 | 1 | int error_code = 0; | 416 | 1 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 1 | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 1 | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 1 | return Status::OK(); | 423 | 1 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 0 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 0 | res->status().msg()); | 429 | 0 | } else { | 430 | 0 | error_msg = res->status().msg(); | 431 | 0 | } | 432 | | | 433 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | |
| 437 | 0 | ++retry_times; | 438 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 0 | bthread_usleep(duration_ms * 1000); | 450 | 0 | } | 451 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 1 | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_27UpdatePackedFileInfoRequestENS0_28UpdatePackedFileInfoResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 14.6k | MetaServiceMethod<Request, Response> method) { | 390 | 14.6k | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 14.6k | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 14.6k | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 14.6k | int retry_times = 0; | 397 | 14.6k | uint32_t duration_ms = 0; | 398 | 14.6k | std::string error_msg; | 399 | 14.6k | std::default_random_engine rng = make_random_engine(); | 400 | 14.6k | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 14.6k | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 14.6k | MetaServiceProxy* proxy; | 403 | 14.6k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 14.6k | while (true) { | 405 | 14.6k | std::shared_ptr<MetaService_Stub> stub; | 406 | 14.6k | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 14.6k | brpc::Controller cntl; | 408 | 14.6k | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 14.6k | } else { | 411 | 14.6k | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 14.6k | } | 413 | 14.6k | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 14.6k | res->Clear(); | 415 | 14.6k | int error_code = 0; | 416 | 14.6k | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 14.6k | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 14.6k | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 14.6k | return Status::OK(); | 423 | 14.6k | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 0 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 0 | res->status().msg()); | 429 | 0 | } else { | 430 | 0 | error_msg = res->status().msg(); | 431 | 0 | } | 432 | | | 433 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | |
| 437 | 0 | ++retry_times; | 438 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 0 | bthread_usleep(duration_ms * 1000); | 450 | 0 | } | 451 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 14.6k | } |
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_23GetClusterStatusRequestENS0_24GetClusterStatusResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE Line | Count | Source | 389 | 76 | MetaServiceMethod<Request, Response> method) { | 390 | 76 | static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); | 391 | 76 | static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); | 392 | | | 393 | | // Applies only to the current file, and all req are non-const, but passed as const types. | 394 | 76 | const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint()); | 395 | | | 396 | 76 | int retry_times = 0; | 397 | 76 | uint32_t duration_ms = 0; | 398 | 76 | std::string error_msg; | 399 | 76 | std::default_random_engine rng = make_random_engine(); | 400 | 76 | std::uniform_int_distribution<uint32_t> u(20, 200); | 401 | 76 | std::uniform_int_distribution<uint32_t> u2(500, 1000); | 402 | 76 | MetaServiceProxy* proxy; | 403 | 76 | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); | 404 | 76 | while (true) { | 405 | 76 | std::shared_ptr<MetaService_Stub> stub; | 406 | 76 | RETURN_IF_ERROR(proxy->get(&stub)); | 407 | 76 | brpc::Controller cntl; | 408 | 76 | if (op_name == "get delete bitmap" || op_name == "update delete bitmap") { | 409 | 0 | cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms); | 410 | 76 | } else { | 411 | 76 | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); | 412 | 76 | } | 413 | 76 | cntl.set_max_retry(kBrpcRetryTimes); | 414 | 76 | res->Clear(); | 415 | 76 | int error_code = 0; | 416 | 76 | (stub.get()->*method)(&cntl, &req, res, nullptr); | 417 | 76 | if (cntl.Failed()) [[unlikely]] { | 418 | 0 | error_msg = cntl.ErrorText(); | 419 | 0 | error_code = cntl.ErrorCode(); | 420 | 0 | proxy->set_unhealthy(); | 421 | 76 | } else if (res->status().code() == MetaServiceCode::OK) { | 422 | 76 | return Status::OK(); | 423 | 76 | } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) { | 424 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name, | 425 | 0 | res->status().msg()); | 426 | 0 | } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { | 427 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, | 428 | 0 | res->status().msg()); | 429 | 0 | } else { | 430 | 0 | error_msg = res->status().msg(); | 431 | 0 | } | 432 | | | 433 | 0 | if (error_code == brpc::ERPCTIMEDOUT) { | 434 | 0 | g_cloud_meta_mgr_rpc_timeout_count << 1; | 435 | 0 | } | 436 | |
| 437 | 0 | ++retry_times; | 438 | 0 | if (retry_times > config::meta_service_rpc_retry_times || | 439 | 0 | (retry_times > config::meta_service_rpc_timeout_retry_times && | 440 | 0 | error_code == brpc::ERPCTIMEDOUT) || | 441 | 0 | (retry_times > config::meta_service_conflict_error_retry_times && | 442 | 0 | res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) { | 443 | 0 | break; | 444 | 0 | } | 445 | | | 446 | 0 | duration_ms = retry_times <= 100 ? u(rng) : u2(rng); | 447 | 0 | LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times | 448 | 0 | << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); | 449 | 0 | bthread_usleep(duration_ms * 1000); | 450 | 0 | } | 451 | 0 | return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); | 452 | 76 | } |
|
453 | | |
454 | | } // namespace |
455 | | |
456 | 331k | Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta) { |
457 | 18.4E | VLOG_DEBUG << "send GetTabletRequest, tablet_id: " << tablet_id; |
458 | 331k | TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::get_tablet_meta", Status::OK(), tablet_id, |
459 | 331k | tablet_meta); |
460 | 331k | GetTabletRequest req; |
461 | 331k | GetTabletResponse resp; |
462 | 331k | req.set_cloud_unique_id(config::cloud_unique_id); |
463 | 331k | req.set_tablet_id(tablet_id); |
464 | 331k | Status st = retry_rpc("get tablet meta", req, &resp, &MetaService_Stub::get_tablet); |
465 | 331k | if (!st.ok()) { |
466 | 100 | if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) { |
467 | 100 | return Status::NotFound("failed to get tablet meta: {}", resp.status().msg()); |
468 | 100 | } |
469 | 0 | return st; |
470 | 100 | } |
471 | | |
472 | 330k | *tablet_meta = std::make_shared<TabletMeta>(); |
473 | 330k | (*tablet_meta) |
474 | 330k | ->init_from_pb(cloud_tablet_meta_to_doris(std::move(*resp.mutable_tablet_meta()))); |
475 | 330k | VLOG_DEBUG << "get tablet meta, tablet_id: " << (*tablet_meta)->tablet_id(); |
476 | 330k | return Status::OK(); |
477 | 331k | } |
478 | | |
479 | | Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, const SyncOptions& options, |
480 | 141k | SyncRowsetStats* sync_stats) { |
481 | 141k | std::unique_lock lock {tablet->get_sync_meta_lock()}; |
482 | 141k | return sync_tablet_rowsets_unlocked(tablet, lock, options, sync_stats); |
483 | 141k | } |
484 | | |
485 | | Status CloudMetaMgr::_log_mow_delete_bitmap(CloudTablet* tablet, GetRowsetResponse& resp, |
486 | | DeleteBitmap& delete_bitmap, int64_t old_max_version, |
487 | 143k | bool full_sync, int32_t read_version) { |
488 | 143k | if (config::enable_mow_verbose_log && !resp.rowset_meta().empty() && |
489 | 143k | delete_bitmap.cardinality() > 0) { |
490 | 0 | int64_t tablet_id = tablet->tablet_id(); |
491 | 0 | std::vector<std::string> new_rowset_msgs; |
492 | 0 | std::vector<std::string> old_rowset_msgs; |
493 | 0 | std::unordered_set<RowsetId> new_rowset_ids; |
494 | 0 | int64_t new_max_version = resp.rowset_meta().rbegin()->end_version(); |
495 | 0 | for (const auto& rs : resp.rowset_meta()) { |
496 | 0 | RowsetId rowset_id; |
497 | 0 | rowset_id.init(rs.rowset_id_v2()); |
498 | 0 | new_rowset_ids.insert(rowset_id); |
499 | 0 | DeleteBitmap rowset_dbm(tablet_id); |
500 | 0 | delete_bitmap.subset({rowset_id, 0, 0}, |
501 | 0 | {rowset_id, std::numeric_limits<DeleteBitmap::SegmentId>::max(), |
502 | 0 | std::numeric_limits<DeleteBitmap::Version>::max()}, |
503 | 0 | &rowset_dbm); |
504 | 0 | size_t cardinality = rowset_dbm.cardinality(); |
505 | 0 | size_t count = rowset_dbm.get_delete_bitmap_count(); |
506 | 0 | if (cardinality > 0) { |
507 | 0 | new_rowset_msgs.push_back(fmt::format("({}[{}-{}],{},{})", rs.rowset_id_v2(), |
508 | 0 | rs.start_version(), rs.end_version(), count, |
509 | 0 | cardinality)); |
510 | 0 | } |
511 | 0 | } |
512 | |
|
513 | 0 | if (old_max_version > 0) { |
514 | 0 | std::vector<RowsetSharedPtr> old_rowsets; |
515 | 0 | RowsetIdUnorderedSet old_rowset_ids; |
516 | 0 | { |
517 | 0 | std::lock_guard<std::shared_mutex> rlock(tablet->get_header_lock()); |
518 | 0 | RETURN_IF_ERROR(tablet->get_all_rs_id_unlocked(old_max_version, &old_rowset_ids)); |
519 | 0 | old_rowsets = tablet->get_rowset_by_ids(&old_rowset_ids); |
520 | 0 | } |
521 | 0 | for (const auto& rs : old_rowsets) { |
522 | 0 | if (!new_rowset_ids.contains(rs->rowset_id())) { |
523 | 0 | DeleteBitmap rowset_dbm(tablet_id); |
524 | 0 | delete_bitmap.subset( |
525 | 0 | {rs->rowset_id(), 0, 0}, |
526 | 0 | {rs->rowset_id(), std::numeric_limits<DeleteBitmap::SegmentId>::max(), |
527 | 0 | std::numeric_limits<DeleteBitmap::Version>::max()}, |
528 | 0 | &rowset_dbm); |
529 | 0 | size_t cardinality = rowset_dbm.cardinality(); |
530 | 0 | size_t count = rowset_dbm.get_delete_bitmap_count(); |
531 | 0 | if (cardinality > 0) { |
532 | 0 | old_rowset_msgs.push_back( |
533 | 0 | fmt::format("({}{},{},{})", rs->rowset_id().to_string(), |
534 | 0 | rs->version().to_string(), count, cardinality)); |
535 | 0 | } |
536 | 0 | } |
537 | 0 | } |
538 | 0 | } |
539 | | |
540 | 0 | std::string tablet_info = fmt::format( |
541 | 0 | "tablet_id={} table_id={} index_id={} partition_id={}", tablet->tablet_id(), |
542 | 0 | tablet->table_id(), tablet->index_id(), tablet->partition_id()); |
543 | 0 | LOG_INFO("[verbose] sync tablet delete bitmap " + tablet_info) |
544 | 0 | .tag("full_sync", full_sync) |
545 | 0 | .tag("read_version", read_version) |
546 | 0 | .tag("old_max_version", old_max_version) |
547 | 0 | .tag("new_max_version", new_max_version) |
548 | 0 | .tag("cumu_compaction_cnt", resp.stats().cumulative_compaction_cnt()) |
549 | 0 | .tag("base_compaction_cnt", resp.stats().base_compaction_cnt()) |
550 | 0 | .tag("cumu_point", resp.stats().cumulative_point()) |
551 | 0 | .tag("rowset_num", resp.rowset_meta().size()) |
552 | 0 | .tag("delete_bitmap_cardinality", delete_bitmap.cardinality()) |
553 | 0 | .tag("old_rowsets(rowset,count,cardinality)", |
554 | 0 | fmt::format("[{}]", fmt::join(old_rowset_msgs, ", "))) |
555 | 0 | .tag("new_rowsets(rowset,count,cardinality)", |
556 | 0 | fmt::format("[{}]", fmt::join(new_rowset_msgs, ", "))); |
557 | 0 | } |
558 | 143k | return Status::OK(); |
559 | 143k | } |
560 | | |
561 | | Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet, |
562 | | std::unique_lock<bthread::Mutex>& lock, |
563 | | const SyncOptions& options, |
564 | 361k | SyncRowsetStats* sync_stats) { |
565 | 361k | using namespace std::chrono; |
566 | | |
567 | 361k | TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet); |
568 | | |
569 | 361k | MetaServiceProxy* proxy; |
570 | 361k | RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy)); |
571 | 361k | std::string tablet_info = |
572 | 361k | fmt::format("tablet_id={} table_id={} index_id={} partition_id={}", tablet->tablet_id(), |
573 | 361k | tablet->table_id(), tablet->index_id(), tablet->partition_id()); |
574 | 361k | int tried = 0; |
575 | 361k | while (true) { |
576 | 359k | std::shared_ptr<MetaService_Stub> stub; |
577 | 359k | RETURN_IF_ERROR(proxy->get(&stub)); |
578 | 359k | brpc::Controller cntl; |
579 | 359k | cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); |
580 | 359k | GetRowsetRequest req; |
581 | 359k | GetRowsetResponse resp; |
582 | | |
583 | 359k | int64_t tablet_id = tablet->tablet_id(); |
584 | 359k | int64_t table_id = tablet->table_id(); |
585 | 359k | int64_t index_id = tablet->index_id(); |
586 | 359k | req.set_cloud_unique_id(config::cloud_unique_id); |
587 | 359k | auto* idx = req.mutable_idx(); |
588 | 359k | idx->set_tablet_id(tablet_id); |
589 | 359k | idx->set_table_id(table_id); |
590 | 359k | idx->set_index_id(index_id); |
591 | 359k | idx->set_partition_id(tablet->partition_id()); |
592 | 359k | { |
593 | 359k | std::shared_lock rlock(tablet->get_header_lock()); |
594 | 359k | if (options.full_sync) { |
595 | 2 | req.set_start_version(0); |
596 | 359k | } else { |
597 | 359k | req.set_start_version(tablet->max_version_unlocked() + 1); |
598 | 359k | } |
599 | 359k | req.set_base_compaction_cnt(tablet->base_compaction_cnt()); |
600 | 359k | req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt()); |
601 | 359k | req.set_full_compaction_cnt(tablet->full_compaction_cnt()); |
602 | 359k | req.set_cumulative_point(tablet->cumulative_layer_point()); |
603 | 359k | } |
604 | 359k | req.set_end_version(-1); |
605 | 359k | VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString(); |
606 | 359k | auto start = std::chrono::steady_clock::now(); |
607 | 359k | stub->get_rowset(&cntl, &req, &resp, nullptr); |
608 | 359k | auto end = std::chrono::steady_clock::now(); |
609 | 359k | int64_t latency = cntl.latency_us(); |
610 | 359k | _get_rowset_latency << latency; |
611 | 359k | int retry_times = config::meta_service_rpc_retry_times; |
612 | 359k | if (cntl.Failed()) { |
613 | 0 | proxy->set_unhealthy(); |
614 | 0 | if (tried++ < retry_times) { |
615 | 0 | auto rng = make_random_engine(); |
616 | 0 | std::uniform_int_distribution<uint32_t> u(20, 200); |
617 | 0 | std::uniform_int_distribution<uint32_t> u1(500, 1000); |
618 | 0 | uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng); |
619 | 0 | bthread_usleep(duration_ms * 1000); |
620 | 0 | LOG_INFO("failed to get rowset meta, " + tablet_info) |
621 | 0 | .tag("reason", cntl.ErrorText()) |
622 | 0 | .tag("tried", tried) |
623 | 0 | .tag("sleep", duration_ms); |
624 | 0 | continue; |
625 | 0 | } |
626 | 0 | return Status::RpcError("failed to get rowset meta: {}", cntl.ErrorText()); |
627 | 0 | } |
628 | 359k | if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) { |
629 | 0 | LOG(WARNING) << "failed to get rowset meta, err=" << resp.status().msg() << " " |
630 | 0 | << tablet_info; |
631 | 0 | return Status::NotFound("failed to get rowset meta: {}, {}", resp.status().msg(), |
632 | 0 | tablet_info); |
633 | 0 | } |
634 | 359k | if (resp.status().code() != MetaServiceCode::OK) { |
635 | 0 | LOG(WARNING) << " failed to get rowset meta, err=" << resp.status().msg() << " " |
636 | 0 | << tablet_info; |
637 | 0 | return Status::InternalError("failed to get rowset meta: {}, {}", resp.status().msg(), |
638 | 0 | tablet_info); |
639 | 0 | } |
640 | 359k | if (latency > 100 * 1000) { // 100ms |
641 | 1.42k | LOG(INFO) << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size() |
642 | 1.42k | << ", latency=" << latency << "us" |
643 | 1.42k | << " " << tablet_info; |
644 | 358k | } else { |
645 | 358k | LOG_EVERY_N(INFO, 100) |
646 | 3.59k | << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size() |
647 | 3.59k | << ", latency=" << latency << "us" |
648 | 3.59k | << " " << tablet_info; |
649 | 358k | } |
650 | | |
651 | 359k | int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); |
652 | 359k | tablet->last_sync_time_s = now; |
653 | | |
654 | 359k | if (sync_stats) { |
655 | 120k | sync_stats->get_remote_rowsets_rpc_ns += |
656 | 120k | std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count(); |
657 | 120k | sync_stats->get_remote_rowsets_num += resp.rowset_meta().size(); |
658 | 120k | } |
659 | | |
660 | | // If is mow, the tablet has no delete bitmap in base rowsets. |
661 | | // So dont need to sync it. |
662 | 361k | if (options.sync_delete_bitmap && tablet->enable_unique_key_merge_on_write() && |
663 | 359k | tablet->tablet_state() == TABLET_RUNNING) { |
664 | 143k | DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.sync_tablet_delete_bitmap.block", |
665 | 143k | DBUG_BLOCK); |
666 | 143k | DeleteBitmap delete_bitmap(tablet_id); |
667 | 143k | int64_t old_max_version = req.start_version() - 1; |
668 | 143k | auto read_version = config::delete_bitmap_store_read_version; |
669 | 143k | auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(), |
670 | 143k | resp.stats(), req.idx(), &delete_bitmap, |
671 | 143k | options.full_sync, sync_stats, read_version, false); |
672 | 143k | if (st.is<ErrorCode::ROWSETS_EXPIRED>() && tried++ < retry_times) { |
673 | 0 | LOG_INFO("rowset meta is expired, need to retry, " + tablet_info) |
674 | 0 | .tag("tried", tried) |
675 | 0 | .error(st); |
676 | 0 | continue; |
677 | 0 | } |
678 | 143k | if (!st.ok()) { |
679 | 0 | LOG_WARNING("failed to get delete bitmap, " + tablet_info).error(st); |
680 | 0 | return st; |
681 | 0 | } |
682 | 143k | tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap); |
683 | 143k | RETURN_IF_ERROR(_log_mow_delete_bitmap(tablet, resp, delete_bitmap, old_max_version, |
684 | 143k | options.full_sync, read_version)); |
685 | 143k | RETURN_IF_ERROR( |
686 | 143k | _check_delete_bitmap_v2_correctness(tablet, req, resp, old_max_version)); |
687 | 143k | } |
688 | 359k | DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.before.modify_tablet_meta", { |
689 | 359k | auto target_tablet_id = dp->param<int64_t>("tablet_id", -1); |
690 | 359k | if (target_tablet_id == tablet->tablet_id()) { |
691 | 359k | DBUG_BLOCK |
692 | 359k | } |
693 | 359k | }); |
694 | 359k | { |
695 | 359k | const auto& stats = resp.stats(); |
696 | 359k | std::unique_lock wlock(tablet->get_header_lock()); |
697 | | |
698 | | // ATTN: we are facing following data race |
699 | | // |
700 | | // resp_base_compaction_cnt=0|base_compaction_cnt=0|resp_cumulative_compaction_cnt=0|cumulative_compaction_cnt=1|resp_max_version=11|max_version=8 |
701 | | // |
702 | | // BE-compaction-thread meta-service BE-query-thread |
703 | | // | | | |
704 | | // local | commit cumu-compaction | | |
705 | | // cc_cnt=0 | ---------------------------> | sync rowset (long rpc, local cc_cnt=0 ) | local |
706 | | // | | <----------------------------------------- | cc_cnt=0 |
707 | | // | | -. | |
708 | | // local | done cc_cnt=1 | \ | |
709 | | // cc_cnt=1 | <--------------------------- | \ | |
710 | | // | | \ returned with resp cc_cnt=0 (snapshot) | |
711 | | // | | '------------------------------------> | local |
712 | | // | | | cc_cnt=1 |
713 | | // | | | |
714 | | // | | | CHECK FAIL |
715 | | // | | | need retry |
716 | | // To get rid of just retry syncing tablet |
717 | 359k | if (stats.base_compaction_cnt() < tablet->base_compaction_cnt() || |
718 | 361k | stats.cumulative_compaction_cnt() < tablet->cumulative_compaction_cnt()) |
719 | 34 | [[unlikely]] { |
720 | | // stale request, ignore |
721 | 34 | LOG_WARNING("stale get rowset meta request " + tablet_info) |
722 | 34 | .tag("resp_base_compaction_cnt", stats.base_compaction_cnt()) |
723 | 34 | .tag("base_compaction_cnt", tablet->base_compaction_cnt()) |
724 | 34 | .tag("resp_cumulative_compaction_cnt", stats.cumulative_compaction_cnt()) |
725 | 34 | .tag("cumulative_compaction_cnt", tablet->cumulative_compaction_cnt()) |
726 | 34 | .tag("tried", tried); |
727 | 34 | if (tried++ < 10) continue; |
728 | 0 | return Status::OK(); |
729 | 34 | } |
730 | 359k | std::vector<RowsetSharedPtr> rowsets; |
731 | 359k | rowsets.reserve(resp.rowset_meta().size()); |
732 | 359k | for (const auto& cloud_rs_meta_pb : resp.rowset_meta()) { |
733 | 323k | VLOG_DEBUG << "get rowset meta, tablet_id=" << cloud_rs_meta_pb.tablet_id() |
734 | 74 | << ", version=[" << cloud_rs_meta_pb.start_version() << '-' |
735 | 74 | << cloud_rs_meta_pb.end_version() << ']'; |
736 | 323k | auto existed_rowset = tablet->get_rowset_by_version( |
737 | 323k | {cloud_rs_meta_pb.start_version(), cloud_rs_meta_pb.end_version()}); |
738 | 323k | if (existed_rowset && |
739 | 323k | existed_rowset->rowset_id().to_string() == cloud_rs_meta_pb.rowset_id_v2()) { |
740 | 83 | continue; // Same rowset, skip it |
741 | 83 | } |
742 | 323k | RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris(cloud_rs_meta_pb); |
743 | 323k | auto rs_meta = std::make_shared<RowsetMeta>(); |
744 | 323k | rs_meta->init_from_pb(meta_pb); |
745 | 323k | RowsetSharedPtr rowset; |
746 | | // schema is nullptr implies using RowsetMeta.tablet_schema |
747 | 323k | Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset); |
748 | 323k | if (!s.ok()) { |
749 | 0 | LOG_WARNING("create rowset").tag("status", s); |
750 | 0 | return s; |
751 | 0 | } |
752 | 323k | rowsets.push_back(std::move(rowset)); |
753 | 323k | } |
754 | 359k | if (!rowsets.empty()) { |
755 | | // `rowsets.empty()` could happen after doing EMPTY_CUMULATIVE compaction. e.g.: |
756 | | // BE has [0-1][2-11][12-12], [12-12] is delete predicate, cp is 2; |
757 | | // after doing EMPTY_CUMULATIVE compaction, MS cp is 13, get_rowset will return [2-11][12-12]. |
758 | 270k | bool version_overlap = |
759 | 270k | tablet->max_version_unlocked() >= rowsets.front()->start_version(); |
760 | 270k | tablet->add_rowsets(std::move(rowsets), version_overlap, wlock, |
761 | 270k | options.warmup_delta_data || |
762 | 270k | config::enable_warmup_immediately_on_new_rowset); |
763 | 270k | } |
764 | | |
765 | | // Fill version holes |
766 | 359k | int64_t partition_max_version = |
767 | 18.4E | resp.has_partition_max_version() ? resp.partition_max_version() : -1; |
768 | 359k | RETURN_IF_ERROR(fill_version_holes(tablet, partition_max_version, wlock)); |
769 | | |
770 | 359k | tablet->last_base_compaction_success_time_ms = stats.last_base_compaction_time_ms(); |
771 | 359k | tablet->last_cumu_compaction_success_time_ms = stats.last_cumu_compaction_time_ms(); |
772 | 359k | tablet->set_base_compaction_cnt(stats.base_compaction_cnt()); |
773 | 359k | tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt()); |
774 | 359k | tablet->set_full_compaction_cnt(stats.full_compaction_cnt()); |
775 | 359k | tablet->set_cumulative_layer_point(stats.cumulative_point()); |
776 | 359k | tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(), |
777 | 359k | stats.num_rows(), stats.data_size()); |
778 | | |
779 | | // Sync last active cluster info for compaction read-write separation |
780 | 360k | if (config::enable_compaction_rw_separation && stats.has_last_active_cluster_id()) { |
781 | 80.8k | tablet->set_last_active_cluster_info(stats.last_active_cluster_id(), |
782 | 80.8k | stats.last_active_time_ms()); |
783 | 80.8k | } |
784 | 359k | } |
785 | 0 | return Status::OK(); |
786 | 359k | } |
787 | 361k | } |
788 | | |
789 | | bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t old_max_version, |
790 | | std::ranges::range auto&& rs_metas, |
791 | 81.6k | DeleteBitmap* delete_bitmap) { |
792 | 81.6k | std::set<int64_t> txn_processed; |
793 | 81.7k | for (auto& rs_meta : rs_metas) { |
794 | 81.7k | auto txn_id = rs_meta.txn_id(); |
795 | 81.7k | if (txn_processed.find(txn_id) != txn_processed.end()) { |
796 | 0 | continue; |
797 | 0 | } |
798 | 81.7k | txn_processed.insert(txn_id); |
799 | 81.7k | DeleteBitmapPtr tmp_delete_bitmap; |
800 | 81.7k | std::shared_ptr<PublishStatus> publish_status = |
801 | 81.7k | std::make_shared<PublishStatus>(PublishStatus::INIT); |
802 | 81.7k | CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud(); |
803 | 81.7k | Status status = engine.txn_delete_bitmap_cache().get_delete_bitmap( |
804 | 81.7k | txn_id, tablet->tablet_id(), &tmp_delete_bitmap, nullptr, &publish_status); |
805 | | // CloudMetaMgr::sync_tablet_delete_bitmap_by_cache() is called after we sync rowsets from meta services. |
806 | | // If the control flows reaches here, it's gauranteed that the rowsets is commited in meta services, so we can |
807 | | // use the delete bitmap from cache directly if *publish_status == PublishStatus::SUCCEED without checking other |
808 | | // stats(version or compaction stats) |
809 | 81.7k | if (status.ok() && *publish_status == PublishStatus::SUCCEED) { |
810 | | // tmp_delete_bitmap contains sentinel marks, we should remove it before merge it to delete bitmap. |
811 | | // Also, the version of delete bitmap key in tmp_delete_bitmap is DeleteBitmap::TEMP_VERSION_COMMON, |
812 | | // we should replace it with the rowset's real version |
813 | 53.3k | DCHECK(rs_meta.start_version() == rs_meta.end_version()); |
814 | 53.3k | int64_t rowset_version = rs_meta.start_version(); |
815 | 240k | for (const auto& [delete_bitmap_key, bitmap_value] : tmp_delete_bitmap->delete_bitmap) { |
816 | | // skip sentinel mark, which is used for delete bitmap correctness check |
817 | 240k | if (std::get<1>(delete_bitmap_key) != DeleteBitmap::INVALID_SEGMENT_ID) { |
818 | 9.88k | delete_bitmap->merge({std::get<0>(delete_bitmap_key), |
819 | 9.88k | std::get<1>(delete_bitmap_key), rowset_version}, |
820 | 9.88k | bitmap_value); |
821 | 9.88k | } |
822 | 240k | } |
823 | 53.3k | engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id, |
824 | 53.3k | tablet->tablet_id()); |
825 | 53.3k | } else { |
826 | 28.4k | LOG_EVERY_N(INFO, 20) |
827 | 1.41k | << "delete bitmap not found in cache, will sync rowset to get. tablet_id= " |
828 | 1.41k | << tablet->tablet_id() << ", txn_id=" << txn_id << ", status=" << status; |
829 | 28.4k | return false; |
830 | 28.4k | } |
831 | 81.7k | } |
832 | 53.2k | return true; |
833 | 81.6k | } |
834 | | |
835 | | Status CloudMetaMgr::_get_delete_bitmap_from_ms(GetDeleteBitmapRequest& req, |
836 | 28.7k | GetDeleteBitmapResponse& res) { |
837 | 18.4E | VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString(); |
838 | 28.7k | TEST_SYNC_POINT_CALLBACK("CloudMetaMgr::_get_delete_bitmap_from_ms", &req, &res); |
839 | | |
840 | 28.7k | auto st = retry_rpc("get delete bitmap", req, &res, &MetaService_Stub::get_delete_bitmap); |
841 | 28.7k | if (st.code() == ErrorCode::THRIFT_RPC_ERROR) { |
842 | 0 | return st; |
843 | 0 | } |
844 | | |
845 | 28.7k | if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) { |
846 | 1 | return Status::NotFound("failed to get delete bitmap: {}", res.status().msg()); |
847 | 1 | } |
848 | | // The delete bitmap of stale rowsets will be removed when commit compaction job, |
849 | | // then delete bitmap of stale rowsets cannot be obtained. But the rowsets obtained |
850 | | // by sync_tablet_rowsets may include these stale rowsets. When this case happend, the |
851 | | // error code of ROWSETS_EXPIRED will be returned, we need to retry sync rowsets again. |
852 | | // |
853 | | // Be query thread meta-service Be compaction thread |
854 | | // | | | |
855 | | // | get rowset | | |
856 | | // |--------------------------->| | |
857 | | // | return get rowset | | |
858 | | // |<---------------------------| | |
859 | | // | | commit job | |
860 | | // | |<------------------------| |
861 | | // | | return commit job | |
862 | | // | |------------------------>| |
863 | | // | get delete bitmap | | |
864 | | // |--------------------------->| | |
865 | | // | return get delete bitmap | | |
866 | | // |<---------------------------| | |
867 | | // | | | |
868 | 28.7k | if (res.status().code() == MetaServiceCode::ROWSETS_EXPIRED) { |
869 | 0 | return Status::Error<ErrorCode::ROWSETS_EXPIRED, false>("failed to get delete bitmap: {}", |
870 | 0 | res.status().msg()); |
871 | 0 | } |
872 | 28.7k | if (res.status().code() != MetaServiceCode::OK) { |
873 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to get delete bitmap: {}", |
874 | 0 | res.status().msg()); |
875 | 0 | } |
876 | 28.7k | return Status::OK(); |
877 | 28.7k | } |
878 | | |
879 | | Status CloudMetaMgr::_get_delete_bitmap_from_ms_by_batch(GetDeleteBitmapRequest& req, |
880 | | GetDeleteBitmapResponse& res, |
881 | 28.4k | int64_t bytes_threadhold) { |
882 | 28.4k | std::unordered_set<std::string> finished_rowset_ids {}; |
883 | 28.4k | int count = 0; |
884 | 28.7k | do { |
885 | 28.7k | GetDeleteBitmapRequest cur_req; |
886 | 28.7k | GetDeleteBitmapResponse cur_res; |
887 | | |
888 | 28.7k | cur_req.set_cloud_unique_id(config::cloud_unique_id); |
889 | 28.7k | cur_req.set_tablet_id(req.tablet_id()); |
890 | 28.7k | cur_req.set_base_compaction_cnt(req.base_compaction_cnt()); |
891 | 28.7k | cur_req.set_cumulative_compaction_cnt(req.cumulative_compaction_cnt()); |
892 | 28.7k | cur_req.set_cumulative_point(req.cumulative_point()); |
893 | 28.7k | *(cur_req.mutable_idx()) = req.idx(); |
894 | 28.7k | cur_req.set_store_version(req.store_version()); |
895 | 28.7k | if (bytes_threadhold > 0) { |
896 | 28.7k | cur_req.set_dbm_bytes_threshold(bytes_threadhold); |
897 | 28.7k | } |
898 | 65.6k | for (int i = 0; i < req.rowset_ids_size(); i++) { |
899 | 36.9k | if (!finished_rowset_ids.contains(req.rowset_ids(i))) { |
900 | 35.8k | cur_req.add_rowset_ids(req.rowset_ids(i)); |
901 | 35.8k | cur_req.add_begin_versions(req.begin_versions(i)); |
902 | 35.8k | cur_req.add_end_versions(req.end_versions(i)); |
903 | 35.8k | } |
904 | 36.9k | } |
905 | | |
906 | 28.7k | RETURN_IF_ERROR(_get_delete_bitmap_from_ms(cur_req, cur_res)); |
907 | 28.7k | ++count; |
908 | | |
909 | | // v1 delete bitmap |
910 | 28.7k | res.mutable_rowset_ids()->MergeFrom(cur_res.rowset_ids()); |
911 | 28.7k | res.mutable_segment_ids()->MergeFrom(cur_res.segment_ids()); |
912 | 28.7k | res.mutable_versions()->MergeFrom(cur_res.versions()); |
913 | 28.7k | res.mutable_segment_delete_bitmaps()->MergeFrom(cur_res.segment_delete_bitmaps()); |
914 | | |
915 | | // v2 delete bitmap |
916 | 28.7k | res.mutable_delta_rowset_ids()->MergeFrom(cur_res.delta_rowset_ids()); |
917 | 28.7k | res.mutable_delete_bitmap_storages()->MergeFrom(cur_res.delete_bitmap_storages()); |
918 | | |
919 | 34.8k | for (const auto& rowset_id : cur_res.returned_rowset_ids()) { |
920 | 34.8k | finished_rowset_ids.insert(rowset_id); |
921 | 34.8k | } |
922 | | |
923 | 28.7k | bool has_more = cur_res.has_has_more() && cur_res.has_more(); |
924 | 28.7k | if (!has_more) { |
925 | 28.4k | break; |
926 | 28.4k | } |
927 | 210 | LOG_INFO("batch get delete bitmap, progress={}/{}", finished_rowset_ids.size(), |
928 | 210 | req.rowset_ids_size()) |
929 | 210 | .tag("tablet_id", req.tablet_id()) |
930 | 210 | .tag("cur_returned_rowsets", cur_res.returned_rowset_ids_size()) |
931 | 210 | .tag("rpc_count", count); |
932 | 210 | } while (finished_rowset_ids.size() < req.rowset_ids_size()); |
933 | 28.4k | return Status::OK(); |
934 | 28.4k | } |
935 | | |
936 | | Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version, |
937 | | std::ranges::range auto&& rs_metas, |
938 | | const TabletStatsPB& stats, const TabletIndexPB& idx, |
939 | | DeleteBitmap* delete_bitmap, bool full_sync, |
940 | | SyncRowsetStats* sync_stats, int32_t read_version, |
941 | 142k | bool full_sync_v2) { |
942 | 142k | if (rs_metas.empty()) { |
943 | 61.1k | return Status::OK(); |
944 | 61.1k | } |
945 | | |
946 | 81.6k | if (!full_sync && config::enable_sync_tablet_delete_bitmap_by_cache && |
947 | 81.7k | sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, delete_bitmap)) { |
948 | 53.5k | if (sync_stats) { |
949 | 23.7k | sync_stats->get_local_delete_bitmap_rowsets_num += rs_metas.size(); |
950 | 23.7k | } |
951 | 53.5k | return Status::OK(); |
952 | 53.5k | } else { |
953 | 28.1k | DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id()); |
954 | 28.1k | *delete_bitmap = *new_delete_bitmap; |
955 | 28.1k | } |
956 | | |
957 | 28.1k | if (read_version == 2 && config::delete_bitmap_store_write_version == 1) { |
958 | 0 | return Status::InternalError( |
959 | 0 | "please set delete_bitmap_store_read_version to 1 or 3 because " |
960 | 0 | "delete_bitmap_store_write_version is 1"); |
961 | 28.3k | } else if (read_version == 1 && config::delete_bitmap_store_write_version == 2) { |
962 | 0 | return Status::InternalError( |
963 | 0 | "please set delete_bitmap_store_read_version to 2 or 3 because " |
964 | 0 | "delete_bitmap_store_write_version is 2"); |
965 | 0 | } |
966 | | |
967 | 28.1k | int64_t new_max_version = std::max(old_max_version, rs_metas.rbegin()->end_version()); |
968 | | // When there are many delete bitmaps that need to be synchronized, it |
969 | | // may take a longer time, especially when loading the tablet for the |
970 | | // first time, so set a relatively long timeout time. |
971 | 28.1k | GetDeleteBitmapRequest req; |
972 | 28.1k | GetDeleteBitmapResponse res; |
973 | 28.1k | req.set_cloud_unique_id(config::cloud_unique_id); |
974 | 28.1k | req.set_tablet_id(tablet->tablet_id()); |
975 | 28.1k | req.set_base_compaction_cnt(stats.base_compaction_cnt()); |
976 | 28.1k | req.set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt()); |
977 | 28.1k | req.set_cumulative_point(stats.cumulative_point()); |
978 | 28.1k | *(req.mutable_idx()) = idx; |
979 | 28.1k | req.set_store_version(read_version); |
980 | | // New rowset sync all versions of delete bitmap |
981 | 34.7k | for (const auto& rs_meta : rs_metas) { |
982 | 34.7k | req.add_rowset_ids(rs_meta.rowset_id_v2()); |
983 | 34.7k | req.add_begin_versions(0); |
984 | 34.7k | req.add_end_versions(new_max_version); |
985 | 34.7k | } |
986 | | |
987 | 28.4k | if (!full_sync_v2) { |
988 | | // old rowset sync incremental versions of delete bitmap |
989 | 28.4k | if (old_max_version > 0 && old_max_version < new_max_version) { |
990 | 43 | RowsetIdUnorderedSet all_rs_ids; |
991 | 43 | RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids)); |
992 | 98 | for (const auto& rs_id : all_rs_ids) { |
993 | 98 | req.add_rowset_ids(rs_id.to_string()); |
994 | 98 | req.add_begin_versions(old_max_version + 1); |
995 | 98 | req.add_end_versions(new_max_version); |
996 | 98 | } |
997 | 43 | } |
998 | 18.4E | } else { |
999 | 18.4E | if (old_max_version > 0) { |
1000 | 0 | RowsetIdUnorderedSet all_rs_ids; |
1001 | 0 | RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids)); |
1002 | 0 | for (const auto& rs_id : all_rs_ids) { |
1003 | 0 | req.add_rowset_ids(rs_id.to_string()); |
1004 | 0 | req.add_begin_versions(0); |
1005 | 0 | req.add_end_versions(new_max_version); |
1006 | 0 | } |
1007 | 0 | } |
1008 | 18.4E | } |
1009 | 28.1k | if (sync_stats) { |
1010 | 10.5k | sync_stats->get_remote_delete_bitmap_rowsets_num += req.rowset_ids_size(); |
1011 | 10.5k | } |
1012 | | |
1013 | 28.1k | auto start = std::chrono::steady_clock::now(); |
1014 | 28.4k | if (config::enable_batch_get_delete_bitmap) { |
1015 | 28.4k | RETURN_IF_ERROR(_get_delete_bitmap_from_ms_by_batch( |
1016 | 28.4k | req, res, config::get_delete_bitmap_bytes_threshold)); |
1017 | 18.4E | } else { |
1018 | 18.4E | RETURN_IF_ERROR(_get_delete_bitmap_from_ms(req, res)); |
1019 | 18.4E | } |
1020 | 28.1k | auto end = std::chrono::steady_clock::now(); |
1021 | | |
1022 | | // v1 delete bitmap |
1023 | 28.1k | const auto& rowset_ids = res.rowset_ids(); |
1024 | 28.1k | const auto& segment_ids = res.segment_ids(); |
1025 | 28.1k | const auto& vers = res.versions(); |
1026 | 28.1k | const auto& delete_bitmaps = res.segment_delete_bitmaps(); |
1027 | 28.4k | if (rowset_ids.size() != segment_ids.size() || rowset_ids.size() != vers.size() || |
1028 | 28.4k | rowset_ids.size() != delete_bitmaps.size()) { |
1029 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
1030 | 0 | "get delete bitmap data wrong," |
1031 | 0 | "rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}", |
1032 | 0 | rowset_ids.size(), segment_ids.size(), vers.size(), delete_bitmaps.size()); |
1033 | 0 | } |
1034 | 28.4k | for (int i = 0; i < rowset_ids.size(); i++) { |
1035 | 361 | RowsetId rst_id; |
1036 | 361 | rst_id.init(rowset_ids[i]); |
1037 | 361 | delete_bitmap->merge( |
1038 | 361 | {rst_id, segment_ids[i], vers[i]}, |
1039 | 361 | roaring::Roaring::readSafe(delete_bitmaps[i].data(), delete_bitmaps[i].length())); |
1040 | 361 | } |
1041 | | // v2 delete bitmap |
1042 | 28.1k | const auto& delta_rowset_ids = res.delta_rowset_ids(); |
1043 | 28.1k | const auto& delete_bitmap_storages = res.delete_bitmap_storages(); |
1044 | 28.1k | if (delta_rowset_ids.size() != delete_bitmap_storages.size()) { |
1045 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
1046 | 0 | "get delete bitmap data wrong, delta_rowset_ids.size={}, " |
1047 | 0 | "delete_bitmap_storages.size={}", |
1048 | 0 | delta_rowset_ids.size(), delete_bitmap_storages.size()); |
1049 | 0 | } |
1050 | 28.1k | int64_t remote_delete_bitmap_bytes = 0; |
1051 | 28.1k | RETURN_IF_ERROR(_read_tablet_delete_bitmap_v2(tablet, old_max_version, rs_metas, delete_bitmap, |
1052 | 28.1k | res, remote_delete_bitmap_bytes, full_sync_v2)); |
1053 | | |
1054 | 28.1k | if (sync_stats) { |
1055 | 10.5k | sync_stats->get_remote_delete_bitmap_rpc_ns += |
1056 | 10.5k | std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count(); |
1057 | 10.5k | sync_stats->get_remote_delete_bitmap_key_count += |
1058 | 10.5k | delete_bitmaps.size() + delete_bitmap_storages.size(); |
1059 | 10.5k | for (const auto& dbm : delete_bitmaps) { |
1060 | 342 | sync_stats->get_remote_delete_bitmap_bytes += dbm.length(); |
1061 | 342 | } |
1062 | 10.5k | sync_stats->get_remote_delete_bitmap_bytes += remote_delete_bitmap_bytes; |
1063 | 10.5k | } |
1064 | 28.1k | int64_t latency = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count(); |
1065 | 28.1k | if (latency > 100 * 1000) { // 100ms |
1066 | 160 | LOG(INFO) << "finish get_delete_bitmap rpcs. rowset_ids.size()=" << rowset_ids.size() |
1067 | 160 | << ", delete_bitmaps.size()=" << delete_bitmaps.size() |
1068 | 160 | << ", delta_delete_bitmaps.size()=" << delta_rowset_ids.size() |
1069 | 160 | << ", latency=" << latency << "us, read_version=" << read_version; |
1070 | 27.9k | } else { |
1071 | 27.9k | LOG_EVERY_N(INFO, 100) << "finish get_delete_bitmap rpcs. rowset_ids.size()=" |
1072 | 283 | << rowset_ids.size() |
1073 | 283 | << ", delete_bitmaps.size()=" << delete_bitmaps.size() |
1074 | 283 | << ", delta_delete_bitmaps.size()=" << delta_rowset_ids.size() |
1075 | 283 | << ", latency=" << latency << "us, read_version=" << read_version; |
1076 | 27.9k | } |
1077 | 28.1k | return Status::OK(); |
1078 | 28.1k | } |
1079 | | |
1080 | | Status CloudMetaMgr::_check_delete_bitmap_v2_correctness(CloudTablet* tablet, GetRowsetRequest& req, |
1081 | | GetRowsetResponse& resp, |
1082 | 143k | int64_t old_max_version) { |
1083 | 143k | if (!config::enable_delete_bitmap_store_v2_check_correctness || |
1084 | 143k | config::delete_bitmap_store_write_version == 1 || resp.rowset_meta().empty()) { |
1085 | 143k | return Status::OK(); |
1086 | 143k | } |
1087 | 18.4E | int64_t tablet_id = tablet->tablet_id(); |
1088 | 18.4E | int64_t new_max_version = std::max(old_max_version, resp.rowset_meta().rbegin()->end_version()); |
1089 | | // rowset_id, num_segments |
1090 | 18.4E | std::vector<std::pair<RowsetId, int64_t>> all_rowsets; |
1091 | 18.4E | std::map<std::string, std::string> rowset_to_resource; |
1092 | 18.4E | for (const auto& rs_meta : resp.rowset_meta()) { |
1093 | 0 | RowsetId rowset_id; |
1094 | 0 | rowset_id.init(rs_meta.rowset_id_v2()); |
1095 | 0 | all_rowsets.emplace_back(std::make_pair(rowset_id, rs_meta.num_segments())); |
1096 | 0 | rowset_to_resource[rs_meta.rowset_id_v2()] = rs_meta.resource_id(); |
1097 | 0 | } |
1098 | 18.4E | if (old_max_version > 0) { |
1099 | 0 | RowsetIdUnorderedSet all_rs_ids; |
1100 | 0 | RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids)); |
1101 | 0 | for (auto& rowset : tablet->get_rowset_by_ids(&all_rs_ids)) { |
1102 | 0 | all_rowsets.emplace_back(std::make_pair(rowset->rowset_id(), rowset->num_segments())); |
1103 | 0 | rowset_to_resource[rowset->rowset_id().to_string()] = |
1104 | 0 | rowset->rowset_meta()->resource_id(); |
1105 | 0 | } |
1106 | 0 | } |
1107 | | |
1108 | 18.4E | auto compare_delete_bitmap = [&](DeleteBitmap* delete_bitmap, int version) { |
1109 | 0 | bool success = true; |
1110 | 0 | for (auto& [rs_id, num_segments] : all_rowsets) { |
1111 | 0 | for (int seg_id = 0; seg_id < num_segments; ++seg_id) { |
1112 | 0 | DeleteBitmap::BitmapKey key = {rs_id, seg_id, new_max_version}; |
1113 | 0 | auto dm1 = tablet->tablet_meta()->delete_bitmap().get_agg(key); |
1114 | 0 | auto dm2 = delete_bitmap->get_agg_without_cache(key); |
1115 | 0 | if (*dm1 != *dm2) { |
1116 | 0 | success = false; |
1117 | 0 | LOG(WARNING) << "failed to check delete bitmap correctness by v" |
1118 | 0 | << std::to_string(version) << ", tablet_id=" << tablet->tablet_id() |
1119 | 0 | << ", rowset_id=" << rs_id.to_string() << ", segment_id=" << seg_id |
1120 | 0 | << ", max_version=" << new_max_version |
1121 | 0 | << ". size1=" << dm1->cardinality() |
1122 | 0 | << ", size2=" << dm2->cardinality(); |
1123 | 0 | } |
1124 | 0 | } |
1125 | 0 | } |
1126 | 0 | if (success) { |
1127 | 0 | LOG(INFO) << "succeed to check delete bitmap correctness by v" |
1128 | 0 | << std::to_string(version) << ", tablet_id=" << tablet->tablet_id() |
1129 | 0 | << ", max_version=" << new_max_version; |
1130 | 0 | } |
1131 | 0 | }; |
1132 | | |
1133 | 18.4E | DeleteBitmap full_delete_bitmap(tablet_id); |
1134 | 18.4E | auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(), resp.stats(), |
1135 | 18.4E | req.idx(), &full_delete_bitmap, false, nullptr, 2, true); |
1136 | 18.4E | if (!st.ok()) { |
1137 | 0 | LOG_WARNING("failed to check delete bitmap correctness by v2") |
1138 | 0 | .tag("tablet", tablet->tablet_id()) |
1139 | 0 | .error(st); |
1140 | 18.4E | } else { |
1141 | 18.4E | compare_delete_bitmap(&full_delete_bitmap, 2); |
1142 | 18.4E | } |
1143 | 18.4E | return Status::OK(); |
1144 | 18.4E | } |
1145 | | |
1146 | | Status CloudMetaMgr::_read_tablet_delete_bitmap_v2(CloudTablet* tablet, int64_t old_max_version, |
1147 | | std::ranges::range auto&& rs_metas, |
1148 | | DeleteBitmap* delete_bitmap, |
1149 | | GetDeleteBitmapResponse& res, |
1150 | | int64_t& remote_delete_bitmap_bytes, |
1151 | 28.4k | bool full_sync_v2) { |
1152 | 28.5k | if (res.delta_rowset_ids().empty()) { |
1153 | 28.5k | return Status::OK(); |
1154 | 28.5k | } |
1155 | 18.4E | const auto& rowset_ids = res.delta_rowset_ids(); |
1156 | 18.4E | const auto& delete_bitmap_storages = res.delete_bitmap_storages(); |
1157 | 18.4E | RowsetIdUnorderedSet all_rs_ids; |
1158 | 18.4E | std::map<std::string, std::string> rowset_to_resource; |
1159 | 18.4E | if (old_max_version > 0) { |
1160 | 0 | RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids)); |
1161 | 0 | if (full_sync_v2) { |
1162 | 0 | for (auto& rowset : tablet->get_rowset_by_ids(&all_rs_ids)) { |
1163 | 0 | rowset_to_resource[rowset->rowset_id().to_string()] = |
1164 | 0 | rowset->rowset_meta()->resource_id(); |
1165 | 0 | } |
1166 | 0 | } |
1167 | 0 | } |
1168 | 18.4E | for (const auto& rs_meta : rs_metas) { |
1169 | 0 | RowsetId rs_id; |
1170 | 0 | rs_id.init(rs_meta.rowset_id_v2()); |
1171 | 0 | all_rs_ids.emplace(rs_id); |
1172 | 0 | rowset_to_resource[rs_meta.rowset_id_v2()] = rs_meta.resource_id(); |
1173 | 0 | } |
1174 | 18.4E | if (config::enable_mow_verbose_log) { |
1175 | 0 | LOG(INFO) << "read delete bitmap for tablet_id=" << tablet->tablet_id() |
1176 | 0 | << ", old_max_version=" << old_max_version |
1177 | 0 | << ", new rowset num=" << rs_metas.size() |
1178 | 0 | << ", rowset has delete bitmap num=" << rowset_ids.size() |
1179 | 0 | << ". all rowset num=" << all_rs_ids.size(); |
1180 | 0 | } |
1181 | | |
1182 | 18.4E | std::mutex result_mtx; |
1183 | 18.4E | Status result; |
1184 | 18.4E | auto merge_delete_bitmap = [&](const std::string& rowset_id, DeleteBitmapPB& dbm) { |
1185 | 0 | if (dbm.rowset_ids_size() != dbm.segment_ids_size() || |
1186 | 0 | dbm.rowset_ids_size() != dbm.versions_size() || |
1187 | 0 | dbm.rowset_ids_size() != dbm.segment_delete_bitmaps_size()) { |
1188 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
1189 | 0 | "get delete bitmap data wrong, rowset_id={}" |
1190 | 0 | "rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}", |
1191 | 0 | rowset_id, dbm.rowset_ids_size(), dbm.segment_ids_size(), dbm.versions_size(), |
1192 | 0 | dbm.segment_delete_bitmaps_size()); |
1193 | 0 | } |
1194 | 0 | if (config::enable_mow_verbose_log) { |
1195 | 0 | LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id() |
1196 | 0 | << ", rowset_id=" << rowset_id |
1197 | 0 | << ", delete_bitmap num=" << dbm.segment_delete_bitmaps_size(); |
1198 | 0 | } |
1199 | 0 | std::lock_guard lock(result_mtx); |
1200 | 0 | for (int j = 0; j < dbm.rowset_ids_size(); j++) { |
1201 | 0 | RowsetId rst_id; |
1202 | 0 | rst_id.init(dbm.rowset_ids(j)); |
1203 | 0 | if (!all_rs_ids.contains(rst_id)) { |
1204 | 0 | LOG(INFO) << "skip merge delete bitmap for tablet_id=" << tablet->tablet_id() |
1205 | 0 | << ", rowset_id=" << rowset_id << ", unused rowset_id=" << rst_id; |
1206 | 0 | continue; |
1207 | 0 | } |
1208 | 0 | delete_bitmap->merge( |
1209 | 0 | {rst_id, dbm.segment_ids(j), dbm.versions(j)}, |
1210 | 0 | roaring::Roaring::readSafe(dbm.segment_delete_bitmaps(j).data(), |
1211 | 0 | dbm.segment_delete_bitmaps(j).length())); |
1212 | 0 | remote_delete_bitmap_bytes += dbm.segment_delete_bitmaps(j).length(); |
1213 | 0 | } |
1214 | 0 | return Status::OK(); |
1215 | 0 | }; |
1216 | 18.4E | auto get_delete_bitmap_from_file = [&](const std::string& rowset_id, |
1217 | 18.4E | const DeleteBitmapStoragePB& storage) { |
1218 | 0 | if (config::enable_mow_verbose_log) { |
1219 | 0 | LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id() |
1220 | 0 | << ", rowset_id=" << rowset_id << " from file" |
1221 | 0 | << ", is_packed=" << storage.has_packed_slice_location(); |
1222 | 0 | } |
1223 | 0 | if (rowset_to_resource.find(rowset_id) == rowset_to_resource.end()) { |
1224 | 0 | return Status::InternalError("vault id not found for tablet_id={}, rowset_id={}", |
1225 | 0 | tablet->tablet_id(), rowset_id); |
1226 | 0 | } |
1227 | 0 | auto resource_id = rowset_to_resource[rowset_id]; |
1228 | 0 | CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud(); |
1229 | 0 | auto storage_resource = engine.get_storage_resource(resource_id); |
1230 | 0 | if (!storage_resource) { |
1231 | 0 | return Status::InternalError("vault id not found, maybe not sync, vault id {}", |
1232 | 0 | resource_id); |
1233 | 0 | } |
1234 | | |
1235 | | // Use packed file reader if packed_slice_location is present |
1236 | 0 | std::unique_ptr<DeleteBitmapFileReader> reader; |
1237 | 0 | if (storage.has_packed_slice_location() && |
1238 | 0 | !storage.packed_slice_location().packed_file_path().empty()) { |
1239 | 0 | reader = std::make_unique<DeleteBitmapFileReader>(tablet->tablet_id(), rowset_id, |
1240 | 0 | storage_resource, |
1241 | 0 | storage.packed_slice_location()); |
1242 | 0 | } else { |
1243 | 0 | reader = std::make_unique<DeleteBitmapFileReader>(tablet->tablet_id(), rowset_id, |
1244 | 0 | storage_resource); |
1245 | 0 | } |
1246 | |
|
1247 | 0 | RETURN_IF_ERROR(reader->init()); |
1248 | 0 | DeleteBitmapPB dbm; |
1249 | 0 | RETURN_IF_ERROR(reader->read(dbm)); |
1250 | 0 | RETURN_IF_ERROR(reader->close()); |
1251 | 0 | return merge_delete_bitmap(rowset_id, dbm); |
1252 | 0 | }; |
1253 | 18.4E | CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud(); |
1254 | 18.4E | std::unique_ptr<ThreadPoolToken> token = engine.sync_delete_bitmap_thread_pool().new_token( |
1255 | 18.4E | ThreadPool::ExecutionMode::CONCURRENT); |
1256 | 18.4E | bthread::CountdownEvent wait {rowset_ids.size()}; |
1257 | 18.4E | for (int i = 0; i < rowset_ids.size(); i++) { |
1258 | 0 | auto& rowset_id = rowset_ids[i]; |
1259 | 0 | if (delete_bitmap_storages[i].store_in_fdb()) { |
1260 | 0 | wait.signal(); |
1261 | 0 | DeleteBitmapPB dbm = delete_bitmap_storages[i].delete_bitmap(); |
1262 | 0 | RETURN_IF_ERROR(merge_delete_bitmap(rowset_id, dbm)); |
1263 | 0 | } else { |
1264 | 0 | const auto& storage = delete_bitmap_storages[i]; |
1265 | 0 | auto submit_st = token->submit_func([&, rowset_id, storage]() { |
1266 | 0 | auto status = get_delete_bitmap_from_file(rowset_id, storage); |
1267 | 0 | if (!status.ok()) { |
1268 | 0 | LOG(WARNING) << "failed to get delete bitmap for tablet_id=" |
1269 | 0 | << tablet->tablet_id() << ", rowset_id=" << rowset_id |
1270 | 0 | << " from file, st=" << status.to_string(); |
1271 | 0 | std::lock_guard lock(result_mtx); |
1272 | 0 | if (result.ok()) { |
1273 | 0 | result = status; |
1274 | 0 | } |
1275 | 0 | } |
1276 | 0 | wait.signal(); |
1277 | 0 | }); |
1278 | 0 | RETURN_IF_ERROR(submit_st); |
1279 | 0 | } |
1280 | 0 | } |
1281 | | // wait for all finished |
1282 | 18.4E | wait.wait(); |
1283 | 18.4E | token->wait(); |
1284 | 18.4E | return result; |
1285 | 18.4E | } |
1286 | | |
1287 | | Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id, |
1288 | 202k | RowsetMetaSharedPtr* existed_rs_meta) { |
1289 | 18.4E | VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id() |
1290 | 18.4E | << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id(); |
1291 | 202k | { |
1292 | 202k | Status ret_st; |
1293 | 202k | TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_rowset", ret_st); |
1294 | 202k | } |
1295 | 202k | CreateRowsetRequest req; |
1296 | 202k | CreateRowsetResponse resp; |
1297 | 202k | req.set_cloud_unique_id(config::cloud_unique_id); |
1298 | 202k | req.set_txn_id(rs_meta.txn_id()); |
1299 | 202k | req.set_tablet_job_id(job_id); |
1300 | | |
1301 | 202k | RowsetMetaPB doris_rs_meta = rs_meta.get_rowset_pb(/*skip_schema=*/true); |
1302 | 202k | doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(doris_rs_meta)); |
1303 | | |
1304 | 202k | Status st = retry_rpc("prepare rowset", req, &resp, &MetaService_Stub::prepare_rowset); |
1305 | 202k | if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) { |
1306 | 0 | if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) { |
1307 | 0 | RowsetMetaPB doris_rs_meta_tmp = |
1308 | 0 | cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta())); |
1309 | 0 | *existed_rs_meta = std::make_shared<RowsetMeta>(); |
1310 | 0 | (*existed_rs_meta)->init_from_pb(doris_rs_meta_tmp); |
1311 | 0 | } |
1312 | 0 | return Status::AlreadyExist("failed to prepare rowset: {}", resp.status().msg()); |
1313 | 0 | } |
1314 | 202k | return st; |
1315 | 202k | } |
1316 | | |
1317 | | Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_id, |
1318 | 202k | RowsetMetaSharedPtr* existed_rs_meta) { |
1319 | 202k | VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id() |
1320 | 3 | << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id(); |
1321 | 202k | { |
1322 | 202k | Status ret_st; |
1323 | 202k | TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_rowset", ret_st); |
1324 | 202k | } |
1325 | 202k | check_table_size_correctness(rs_meta); |
1326 | 202k | CreateRowsetRequest req; |
1327 | 202k | CreateRowsetResponse resp; |
1328 | 202k | req.set_cloud_unique_id(config::cloud_unique_id); |
1329 | 202k | req.set_txn_id(rs_meta.txn_id()); |
1330 | 202k | req.set_tablet_job_id(job_id); |
1331 | | |
1332 | 202k | RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(); |
1333 | 202k | doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb)); |
1334 | 202k | Status st = retry_rpc("commit rowset", req, &resp, &MetaService_Stub::commit_rowset); |
1335 | 202k | if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) { |
1336 | 0 | if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) { |
1337 | 0 | RowsetMetaPB doris_rs_meta = |
1338 | 0 | cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta())); |
1339 | 0 | *existed_rs_meta = std::make_shared<RowsetMeta>(); |
1340 | 0 | (*existed_rs_meta)->init_from_pb(doris_rs_meta); |
1341 | 0 | } |
1342 | 0 | return Status::AlreadyExist("failed to commit rowset: {}", resp.status().msg()); |
1343 | 0 | } |
1344 | 202k | int64_t timeout_ms = -1; |
1345 | | // if the `job_id` is not empty, it means this rowset was produced by a compaction job. |
1346 | 202k | if (config::enable_compaction_delay_commit_for_warm_up && !job_id.empty()) { |
1347 | | // 1. assume the download speed is 100MB/s |
1348 | | // 2. we double the download time as timeout for safety |
1349 | | // 3. for small rowsets, the timeout we calculate maybe quite small, so we need a min_time_out |
1350 | 0 | const double speed_mbps = 100.0; // 100MB/s |
1351 | 0 | const double safety_factor = 2.0; |
1352 | 0 | timeout_ms = std::min( |
1353 | 0 | std::max(static_cast<int64_t>(static_cast<double>(rs_meta.data_disk_size()) / |
1354 | 0 | (speed_mbps * 1024 * 1024) * safety_factor * 1000), |
1355 | 0 | config::warm_up_rowset_sync_wait_min_timeout_ms), |
1356 | 0 | config::warm_up_rowset_sync_wait_max_timeout_ms); |
1357 | 0 | LOG(INFO) << "warm up rowset: " << rs_meta.version() << ", job_id: " << job_id |
1358 | 0 | << ", with timeout: " << timeout_ms << " ms"; |
1359 | 0 | } |
1360 | 202k | auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); |
1361 | 202k | manager.warm_up_rowset(rs_meta, timeout_ms); |
1362 | 202k | return st; |
1363 | 202k | } |
1364 | | |
1365 | 19 | Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) { |
1366 | 19 | VLOG_DEBUG << "update committed rowset, tablet_id: " << rs_meta.tablet_id() |
1367 | 0 | << ", rowset_id: " << rs_meta.rowset_id(); |
1368 | 19 | CreateRowsetRequest req; |
1369 | 19 | CreateRowsetResponse resp; |
1370 | 19 | req.set_cloud_unique_id(config::cloud_unique_id); |
1371 | | |
1372 | | // Variant schema maybe updated, so we need to update the schema as well. |
1373 | | // The updated rowset meta after `rowset->merge_rowset_meta` in `BaseTablet::update_delete_bitmap` |
1374 | | // will be lost in `update_tmp_rowset` if skip_schema.So in order to keep the latest schema we should keep schema in update_tmp_rowset |
1375 | | // for variant type |
1376 | 19 | bool skip_schema = rs_meta.tablet_schema()->num_variant_columns() == 0; |
1377 | 19 | RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(skip_schema); |
1378 | 19 | doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb)); |
1379 | 19 | Status st = |
1380 | 19 | retry_rpc("update committed rowset", req, &resp, &MetaService_Stub::update_tmp_rowset); |
1381 | 19 | if (!st.ok() && resp.status().code() == MetaServiceCode::ROWSET_META_NOT_FOUND) { |
1382 | 0 | return Status::InternalError("failed to update committed rowset: {}", resp.status().msg()); |
1383 | 0 | } |
1384 | 19 | return st; |
1385 | 19 | } |
1386 | | |
1387 | | // async send TableStats(in res) to FE coz we are in streamload ctx, response to the user ASAP |
1388 | | static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id, |
1389 | 0 | const std::string& label, CommitTxnResponse& res) { |
1390 | 0 | std::string protobufBytes; |
1391 | 0 | res.SerializeToString(&protobufBytes); |
1392 | 0 | auto st = ExecEnv::GetInstance()->send_table_stats_thread_pool()->submit_func( |
1393 | 0 | [db_id, txn_id, label, protobufBytes]() -> Status { |
1394 | 0 | TReportCommitTxnResultRequest request; |
1395 | 0 | TStatus result; |
1396 | |
|
1397 | 0 | if (protobufBytes.length() <= 0) { |
1398 | 0 | LOG(WARNING) << "protobufBytes: " << protobufBytes.length(); |
1399 | 0 | return Status::OK(); // nobody cares the return status |
1400 | 0 | } |
1401 | | |
1402 | 0 | request.__set_dbId(db_id); |
1403 | 0 | request.__set_txnId(txn_id); |
1404 | 0 | request.__set_label(label); |
1405 | 0 | request.__set_payload(protobufBytes); |
1406 | |
|
1407 | 0 | Status status; |
1408 | 0 | int64_t duration_ns = 0; |
1409 | 0 | TNetworkAddress master_addr = |
1410 | 0 | ExecEnv::GetInstance()->cluster_info()->master_fe_addr; |
1411 | 0 | if (master_addr.hostname.empty() || master_addr.port == 0) { |
1412 | 0 | status = Status::Error<SERVICE_UNAVAILABLE>( |
1413 | 0 | "Have not get FE Master heartbeat yet"); |
1414 | 0 | } else { |
1415 | 0 | SCOPED_RAW_TIMER(&duration_ns); |
1416 | |
|
1417 | 0 | RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
1418 | 0 | master_addr.hostname, master_addr.port, |
1419 | 0 | [&request, &result](FrontendServiceConnection& client) { |
1420 | 0 | client->reportCommitTxnResult(result, request); |
1421 | 0 | })); |
1422 | | |
1423 | 0 | status = Status::create<false>(result); |
1424 | 0 | } |
1425 | 0 | g_cloud_commit_txn_resp_redirect_latency << duration_ns / 1000; |
1426 | |
|
1427 | 0 | if (!status.ok()) { |
1428 | 0 | LOG(WARNING) << "TableStats report RPC to FE failed, errmsg=" << status |
1429 | 0 | << " dbId=" << db_id << " txnId=" << txn_id << " label=" << label; |
1430 | 0 | return Status::OK(); // nobody cares the return status |
1431 | 0 | } else { |
1432 | 0 | LOG(INFO) << "TableStats report RPC to FE success, msg=" << status |
1433 | 0 | << " dbId=" << db_id << " txnId=" << txn_id << " label=" << label; |
1434 | 0 | return Status::OK(); |
1435 | 0 | } |
1436 | 0 | }); |
1437 | 0 | if (!st.ok()) { |
1438 | 0 | LOG(WARNING) << "TableStats report to FE task submission failed: " << st.to_string(); |
1439 | 0 | } |
1440 | 0 | } |
1441 | | |
1442 | 0 | Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) { |
1443 | 0 | VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id |
1444 | 0 | << ", label: " << ctx.label << ", is_2pc: " << is_2pc; |
1445 | 0 | { |
1446 | 0 | Status ret_st; |
1447 | 0 | TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_txn", ret_st); |
1448 | 0 | } |
1449 | 0 | CommitTxnRequest req; |
1450 | 0 | CommitTxnResponse res; |
1451 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1452 | 0 | req.set_db_id(ctx.db_id); |
1453 | 0 | req.set_txn_id(ctx.txn_id); |
1454 | 0 | req.set_is_2pc(is_2pc); |
1455 | 0 | req.set_enable_txn_lazy_commit(config::enable_cloud_txn_lazy_commit); |
1456 | 0 | auto st = retry_rpc("commit txn", req, &res, &MetaService_Stub::commit_txn); |
1457 | |
|
1458 | 0 | if (st.ok()) { |
1459 | 0 | send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, res); |
1460 | 0 | } |
1461 | |
|
1462 | 0 | return st; |
1463 | 0 | } |
1464 | | |
1465 | 487 | Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) { |
1466 | 487 | VLOG_DEBUG << "abort txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id |
1467 | 0 | << ", label: " << ctx.label; |
1468 | 487 | { |
1469 | 487 | Status ret_st; |
1470 | 487 | TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::abort_txn", ret_st); |
1471 | 487 | } |
1472 | 487 | AbortTxnRequest req; |
1473 | 487 | AbortTxnResponse res; |
1474 | 487 | req.set_cloud_unique_id(config::cloud_unique_id); |
1475 | 487 | req.set_reason(std::string(ctx.status.msg().substr(0, 1024))); |
1476 | 487 | if (ctx.db_id > 0 && !ctx.label.empty()) { |
1477 | 479 | req.set_db_id(ctx.db_id); |
1478 | 479 | req.set_label(ctx.label); |
1479 | 479 | } else if (ctx.txn_id > 0) { |
1480 | 8 | req.set_txn_id(ctx.txn_id); |
1481 | 8 | } else { |
1482 | 0 | LOG(WARNING) << "failed abort txn, with illegal input, db_id=" << ctx.db_id |
1483 | 0 | << " txn_id=" << ctx.txn_id << " label=" << ctx.label; |
1484 | 0 | return Status::InternalError<false>("failed to abort txn"); |
1485 | 0 | } |
1486 | 487 | return retry_rpc("abort txn", req, &res, &MetaService_Stub::abort_txn); |
1487 | 487 | } |
1488 | | |
1489 | 33 | Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) { |
1490 | 33 | VLOG_DEBUG << "precommit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id |
1491 | 0 | << ", label: " << ctx.label; |
1492 | 33 | { |
1493 | 33 | Status ret_st; |
1494 | 33 | TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::precommit_txn", ret_st); |
1495 | 33 | } |
1496 | 33 | PrecommitTxnRequest req; |
1497 | 33 | PrecommitTxnResponse res; |
1498 | 33 | req.set_cloud_unique_id(config::cloud_unique_id); |
1499 | 33 | req.set_db_id(ctx.db_id); |
1500 | 33 | req.set_txn_id(ctx.txn_id); |
1501 | 33 | return retry_rpc("precommit txn", req, &res, &MetaService_Stub::precommit_txn); |
1502 | 33 | } |
1503 | | |
1504 | 0 | Status CloudMetaMgr::prepare_restore_job(const TabletMetaPB& tablet_meta) { |
1505 | 0 | VLOG_DEBUG << "prepare restore job, tablet_id: " << tablet_meta.tablet_id(); |
1506 | 0 | RestoreJobRequest req; |
1507 | 0 | RestoreJobResponse resp; |
1508 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1509 | 0 | req.set_tablet_id(tablet_meta.tablet_id()); |
1510 | 0 | req.set_expiration(config::snapshot_expire_time_sec); |
1511 | 0 | req.set_action(RestoreJobRequest::PREPARE); |
1512 | |
|
1513 | 0 | doris_tablet_meta_to_cloud(req.mutable_tablet_meta(), std::move(tablet_meta)); |
1514 | 0 | return retry_rpc("prepare restore job", req, &resp, &MetaService_Stub::prepare_restore_job); |
1515 | 0 | } |
1516 | | |
1517 | 0 | Status CloudMetaMgr::commit_restore_job(const int64_t tablet_id) { |
1518 | 0 | VLOG_DEBUG << "commit restore job, tablet_id: " << tablet_id; |
1519 | 0 | RestoreJobRequest req; |
1520 | 0 | RestoreJobResponse resp; |
1521 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1522 | 0 | req.set_tablet_id(tablet_id); |
1523 | 0 | req.set_action(RestoreJobRequest::COMMIT); |
1524 | 0 | req.set_store_version(config::delete_bitmap_store_write_version); |
1525 | |
|
1526 | 0 | return retry_rpc("commit restore job", req, &resp, &MetaService_Stub::commit_restore_job); |
1527 | 0 | } |
1528 | | |
1529 | 0 | Status CloudMetaMgr::finish_restore_job(const int64_t tablet_id, bool is_completed) { |
1530 | 0 | VLOG_DEBUG << "finish restore job, tablet_id: " << tablet_id |
1531 | 0 | << ", is_completed: " << is_completed; |
1532 | 0 | RestoreJobRequest req; |
1533 | 0 | RestoreJobResponse resp; |
1534 | 0 | req.set_cloud_unique_id(config::cloud_unique_id); |
1535 | 0 | req.set_tablet_id(tablet_id); |
1536 | 0 | req.set_action(is_completed ? RestoreJobRequest::COMPLETE : RestoreJobRequest::ABORT); |
1537 | |
|
1538 | 0 | return retry_rpc("finish restore job", req, &resp, &MetaService_Stub::finish_restore_job); |
1539 | 0 | } |
1540 | | |
1541 | 126 | Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos, bool* is_vault_mode) { |
1542 | 126 | GetObjStoreInfoRequest req; |
1543 | 126 | GetObjStoreInfoResponse resp; |
1544 | 126 | req.set_cloud_unique_id(config::cloud_unique_id); |
1545 | 126 | Status s = |
1546 | 126 | retry_rpc("get storage vault info", req, &resp, &MetaService_Stub::get_obj_store_info); |
1547 | 126 | if (!s.ok()) { |
1548 | 0 | return s; |
1549 | 0 | } |
1550 | | |
1551 | 126 | *is_vault_mode = resp.enable_storage_vault(); |
1552 | | |
1553 | 126 | auto add_obj_store = [&vault_infos](const auto& obj_store) { |
1554 | 126 | vault_infos->emplace_back(obj_store.id(), S3Conf::get_s3_conf(obj_store), |
1555 | 126 | StorageVaultPB_PathFormat {}); |
1556 | 126 | }; |
1557 | | |
1558 | 126 | std::ranges::for_each(resp.obj_info(), add_obj_store); |
1559 | 126 | std::ranges::for_each(resp.storage_vault(), [&](const auto& vault) { |
1560 | 0 | if (vault.has_hdfs_info()) { |
1561 | 0 | vault_infos->emplace_back(vault.id(), vault.hdfs_info(), vault.path_format()); |
1562 | 0 | } |
1563 | 0 | if (vault.has_obj_info()) { |
1564 | 0 | add_obj_store(vault.obj_info()); |
1565 | 0 | } |
1566 | 0 | }); |
1567 | | |
1568 | | // desensitization, hide secret |
1569 | 252 | for (int i = 0; i < resp.obj_info_size(); ++i) { |
1570 | 126 | resp.mutable_obj_info(i)->set_sk(resp.obj_info(i).sk().substr(0, 2) + "xxx"); |
1571 | 126 | } |
1572 | 126 | for (int i = 0; i < resp.storage_vault_size(); ++i) { |
1573 | 0 | auto* j = resp.mutable_storage_vault(i); |
1574 | 0 | if (!j->has_obj_info()) continue; |
1575 | 0 | j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx"); |
1576 | 0 | } |
1577 | | |
1578 | 252 | for (int i = 0; i < resp.obj_info_size(); ++i) { |
1579 | 126 | resp.mutable_obj_info(i)->set_ak(hide_access_key(resp.obj_info(i).sk())); |
1580 | 126 | } |
1581 | 126 | for (int i = 0; i < resp.storage_vault_size(); ++i) { |
1582 | 0 | auto* j = resp.mutable_storage_vault(i); |
1583 | 0 | if (!j->has_obj_info()) continue; |
1584 | 0 | j->mutable_obj_info()->set_sk(hide_access_key(j->obj_info().sk())); |
1585 | 0 | } |
1586 | | |
1587 | 126 | LOG(INFO) << "get storage vault, enable_storage_vault=" << *is_vault_mode |
1588 | 126 | << " response=" << resp.ShortDebugString(); |
1589 | 126 | return Status::OK(); |
1590 | 126 | } |
1591 | | |
1592 | 23.2k | Status CloudMetaMgr::prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res) { |
1593 | 23.2k | VLOG_DEBUG << "prepare_tablet_job: " << job.ShortDebugString(); |
1594 | 23.2k | TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_tablet_job", Status::OK(), job, res); |
1595 | | |
1596 | 23.2k | StartTabletJobRequest req; |
1597 | 23.2k | req.mutable_job()->CopyFrom(job); |
1598 | 23.2k | req.set_cloud_unique_id(config::cloud_unique_id); |
1599 | 23.2k | return retry_rpc("start tablet job", req, res, &MetaService_Stub::start_tablet_job); |
1600 | 23.2k | } |
1601 | | |
1602 | 21.5k | Status CloudMetaMgr::commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJobResponse* res) { |
1603 | 18.4E | VLOG_DEBUG << "commit_tablet_job: " << job.ShortDebugString(); |
1604 | 21.5k | TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_tablet_job", Status::OK(), job, res); |
1605 | 21.5k | DBUG_EXECUTE_IF("CloudMetaMgr::commit_tablet_job.fail", { |
1606 | 21.5k | return Status::InternalError<false>("inject CloudMetaMgr::commit_tablet_job.fail"); |
1607 | 21.5k | }); |
1608 | | |
1609 | 21.5k | FinishTabletJobRequest req; |
1610 | 21.5k | req.mutable_job()->CopyFrom(job); |
1611 | 21.5k | req.set_action(FinishTabletJobRequest::COMMIT); |
1612 | 21.5k | req.set_cloud_unique_id(config::cloud_unique_id); |
1613 | 21.5k | auto st = retry_rpc("commit tablet job", req, res, &MetaService_Stub::finish_tablet_job); |
1614 | 21.5k | if (res->status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) { |
1615 | 0 | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>( |
1616 | 0 | "txn conflict when commit tablet job {}", job.ShortDebugString()); |
1617 | 0 | } |
1618 | 21.5k | return st; |
1619 | 21.5k | } |
1620 | | |
1621 | 106 | Status CloudMetaMgr::abort_tablet_job(const TabletJobInfoPB& job) { |
1622 | 106 | VLOG_DEBUG << "abort_tablet_job: " << job.ShortDebugString(); |
1623 | 106 | FinishTabletJobRequest req; |
1624 | 106 | FinishTabletJobResponse res; |
1625 | 106 | req.mutable_job()->CopyFrom(job); |
1626 | 106 | req.set_action(FinishTabletJobRequest::ABORT); |
1627 | 106 | req.set_cloud_unique_id(config::cloud_unique_id); |
1628 | 106 | return retry_rpc("abort tablet job", req, &res, &MetaService_Stub::finish_tablet_job); |
1629 | 106 | } |
1630 | | |
1631 | 208 | Status CloudMetaMgr::lease_tablet_job(const TabletJobInfoPB& job) { |
1632 | 208 | VLOG_DEBUG << "lease_tablet_job: " << job.ShortDebugString(); |
1633 | 208 | FinishTabletJobRequest req; |
1634 | 208 | FinishTabletJobResponse res; |
1635 | 208 | req.mutable_job()->CopyFrom(job); |
1636 | 208 | req.set_action(FinishTabletJobRequest::LEASE); |
1637 | 208 | req.set_cloud_unique_id(config::cloud_unique_id); |
1638 | 208 | return retry_rpc("lease tablet job", req, &res, &MetaService_Stub::finish_tablet_job); |
1639 | 208 | } |
1640 | | |
1641 | | static void add_delete_bitmap(DeleteBitmapPB& delete_bitmap_pb, const DeleteBitmap::BitmapKey& key, |
1642 | 0 | roaring::Roaring& bitmap) { |
1643 | 0 | delete_bitmap_pb.add_rowset_ids(std::get<0>(key).to_string()); |
1644 | 0 | delete_bitmap_pb.add_segment_ids(std::get<1>(key)); |
1645 | 0 | delete_bitmap_pb.add_versions(std::get<2>(key)); |
1646 | | // To save space, convert array and bitmap containers to run containers |
1647 | 0 | bitmap.runOptimize(); |
1648 | 0 | std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); |
1649 | 0 | bitmap.write(bitmap_data.data()); |
1650 | 0 | *(delete_bitmap_pb.add_segment_delete_bitmaps()) = std::move(bitmap_data); |
1651 | 0 | } |
1652 | | |
1653 | | static Status store_delete_bitmap(std::string& rowset_id, DeleteBitmapPB& delete_bitmap_pb, |
1654 | | int64_t tablet_id, |
1655 | | std::optional<StorageResource> storage_resource, |
1656 | 0 | UpdateDeleteBitmapRequest& req, int64_t txn_id) { |
1657 | 0 | if (config::enable_mow_verbose_log) { |
1658 | 0 | std::stringstream ss; |
1659 | 0 | for (int i = 0; i < delete_bitmap_pb.rowset_ids_size(); i++) { |
1660 | 0 | ss << "{rid=" << delete_bitmap_pb.rowset_ids(i) |
1661 | 0 | << ", sid=" << delete_bitmap_pb.segment_ids(i) |
1662 | 0 | << ", ver=" << delete_bitmap_pb.versions(i) << "}, "; |
1663 | 0 | } |
1664 | 0 | LOG(INFO) << "handle one rowset delete bitmap for tablet_id: " << tablet_id |
1665 | 0 | << ", rowset_id: " << rowset_id |
1666 | 0 | << ", delete_bitmap num: " << delete_bitmap_pb.rowset_ids_size() |
1667 | 0 | << ", size: " << delete_bitmap_pb.ByteSizeLong() << ", keys=[" << ss.str() |
1668 | 0 | << "]"; |
1669 | 0 | } |
1670 | 0 | if (delete_bitmap_pb.rowset_ids_size() == 0) { |
1671 | 0 | return Status::OK(); |
1672 | 0 | } |
1673 | 0 | DeleteBitmapStoragePB delete_bitmap_storage; |
1674 | 0 | if (config::delete_bitmap_store_v2_max_bytes_in_fdb >= 0 && |
1675 | 0 | delete_bitmap_pb.ByteSizeLong() > config::delete_bitmap_store_v2_max_bytes_in_fdb) { |
1676 | | // Enable packed file only for load (txn_id > 0) |
1677 | 0 | bool enable_packed = config::enable_packed_file && txn_id > 0; |
1678 | 0 | DeleteBitmapFileWriter file_writer(tablet_id, rowset_id, storage_resource, enable_packed, |
1679 | 0 | txn_id); |
1680 | 0 | RETURN_IF_ERROR(file_writer.init()); |
1681 | 0 | RETURN_IF_ERROR(file_writer.write(delete_bitmap_pb)); |
1682 | 0 | RETURN_IF_ERROR(file_writer.close()); |
1683 | 0 | delete_bitmap_pb.Clear(); |
1684 | 0 | delete_bitmap_storage.set_store_in_fdb(false); |
1685 | | |
1686 | | // Store packed slice location if file was written to packed file |
1687 | 0 | if (file_writer.is_packed()) { |
1688 | 0 | io::PackedSliceLocation loc; |
1689 | 0 | RETURN_IF_ERROR(file_writer.get_packed_slice_location(&loc)); |
1690 | 0 | auto* packed_loc = delete_bitmap_storage.mutable_packed_slice_location(); |
1691 | 0 | packed_loc->set_packed_file_path(loc.packed_file_path); |
1692 | 0 | packed_loc->set_offset(loc.offset); |
1693 | 0 | packed_loc->set_size(loc.size); |
1694 | 0 | packed_loc->set_packed_file_size(loc.packed_file_size); |
1695 | 0 | } |
1696 | 0 | } else { |
1697 | 0 | delete_bitmap_storage.set_store_in_fdb(true); |
1698 | 0 | *(delete_bitmap_storage.mutable_delete_bitmap()) = std::move(delete_bitmap_pb); |
1699 | 0 | } |
1700 | 0 | req.add_delta_rowset_ids(rowset_id); |
1701 | 0 | *(req.add_delete_bitmap_storages()) = std::move(delete_bitmap_storage); |
1702 | 0 | return Status::OK(); |
1703 | 0 | } |
1704 | | |
1705 | | Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id, |
1706 | | int64_t initiator, DeleteBitmap* delete_bitmap, |
1707 | | DeleteBitmap* delete_bitmap_v2, std::string rowset_id, |
1708 | | std::optional<StorageResource> storage_resource, |
1709 | | int64_t store_version, int64_t txn_id, |
1710 | 59.8k | bool is_explicit_txn, int64_t next_visible_version) { |
1711 | 59.8k | VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet.tablet_id(); |
1712 | 59.8k | if (config::enable_mow_verbose_log) { |
1713 | 0 | std::stringstream ss; |
1714 | 0 | ss << "start update delete bitmap for tablet_id: " << tablet.tablet_id() |
1715 | 0 | << ", rowset_id: " << rowset_id |
1716 | 0 | << ", delete_bitmap num: " << delete_bitmap->delete_bitmap.size() |
1717 | 0 | << ", store_version: " << store_version << ", lock_id=" << lock_id |
1718 | 0 | << ", initiator=" << initiator; |
1719 | 0 | if (store_version == 2 || store_version == 3) { |
1720 | 0 | ss << ", delete_bitmap v2 num: " << delete_bitmap_v2->delete_bitmap.size(); |
1721 | 0 | } |
1722 | 0 | LOG(INFO) << ss.str(); |
1723 | 0 | } |
1724 | 59.8k | UpdateDeleteBitmapRequest req; |
1725 | 59.8k | UpdateDeleteBitmapResponse res; |
1726 | 59.8k | req.set_cloud_unique_id(config::cloud_unique_id); |
1727 | 59.8k | req.set_table_id(tablet.table_id()); |
1728 | 59.8k | req.set_partition_id(tablet.partition_id()); |
1729 | 59.8k | req.set_tablet_id(tablet.tablet_id()); |
1730 | 59.8k | req.set_lock_id(lock_id); |
1731 | 59.8k | req.set_initiator(initiator); |
1732 | 59.8k | req.set_is_explicit_txn(is_explicit_txn); |
1733 | 59.8k | if (txn_id > 0) { |
1734 | 54.0k | req.set_txn_id(txn_id); |
1735 | 54.0k | } |
1736 | 59.8k | if (next_visible_version > 0) { |
1737 | 54.1k | req.set_next_visible_version(next_visible_version); |
1738 | 54.1k | } |
1739 | 59.8k | req.set_store_version(store_version); |
1740 | | |
1741 | 59.8k | bool write_v1 = store_version == 1 || store_version == 3; |
1742 | 59.8k | bool write_v2 = store_version == 2 || store_version == 3; |
1743 | | // write v1 kvs |
1744 | 59.8k | if (write_v1) { |
1745 | 59.5k | for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) { |
1746 | 10.9k | req.add_rowset_ids(std::get<0>(key).to_string()); |
1747 | 10.9k | req.add_segment_ids(std::get<1>(key)); |
1748 | 10.9k | req.add_versions(std::get<2>(key)); |
1749 | | // To save space, convert array and bitmap containers to run containers |
1750 | 10.9k | bitmap.runOptimize(); |
1751 | 10.9k | std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); |
1752 | 10.9k | bitmap.write(bitmap_data.data()); |
1753 | 10.9k | *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data); |
1754 | 10.9k | } |
1755 | 59.5k | } |
1756 | | |
1757 | | // write v2 kvs |
1758 | 59.8k | if (write_v2) { |
1759 | 0 | if (config::enable_mow_verbose_log) { |
1760 | 0 | LOG(INFO) << "update delete bitmap for tablet_id: " << tablet.tablet_id() |
1761 | 0 | << ", rowset_id: " << rowset_id |
1762 | 0 | << ", delete_bitmap num: " << delete_bitmap_v2->delete_bitmap.size() |
1763 | 0 | << ", lock_id=" << lock_id << ", initiator=" << initiator; |
1764 | 0 | } |
1765 | 0 | if (rowset_id.empty()) { |
1766 | 0 | std::string pre_rowset_id = ""; |
1767 | 0 | std::string cur_rowset_id = ""; |
1768 | 0 | DeleteBitmapPB delete_bitmap_pb; |
1769 | 0 | for (auto it = delete_bitmap_v2->delete_bitmap.begin(); |
1770 | 0 | it != delete_bitmap_v2->delete_bitmap.end(); ++it) { |
1771 | 0 | auto& key = it->first; |
1772 | 0 | auto& bitmap = it->second; |
1773 | 0 | cur_rowset_id = std::get<0>(key).to_string(); |
1774 | 0 | if (cur_rowset_id != pre_rowset_id) { |
1775 | 0 | if (!pre_rowset_id.empty() && delete_bitmap_pb.rowset_ids_size() > 0) { |
1776 | 0 | RETURN_IF_ERROR(store_delete_bitmap(pre_rowset_id, delete_bitmap_pb, |
1777 | 0 | tablet.tablet_id(), storage_resource, |
1778 | 0 | req, txn_id)); |
1779 | 0 | } |
1780 | 0 | pre_rowset_id = cur_rowset_id; |
1781 | 0 | DCHECK_EQ(delete_bitmap_pb.rowset_ids_size(), 0); |
1782 | 0 | DCHECK_EQ(delete_bitmap_pb.segment_ids_size(), 0); |
1783 | 0 | DCHECK_EQ(delete_bitmap_pb.versions_size(), 0); |
1784 | 0 | DCHECK_EQ(delete_bitmap_pb.segment_delete_bitmaps_size(), 0); |
1785 | 0 | } |
1786 | 0 | add_delete_bitmap(delete_bitmap_pb, key, bitmap); |
1787 | 0 | } |
1788 | 0 | if (delete_bitmap_pb.rowset_ids_size() > 0) { |
1789 | 0 | DCHECK(!cur_rowset_id.empty()); |
1790 | 0 | RETURN_IF_ERROR(store_delete_bitmap(cur_rowset_id, delete_bitmap_pb, |
1791 | 0 | tablet.tablet_id(), storage_resource, req, |
1792 | 0 | txn_id)); |
1793 | 0 | } |
1794 | 0 | } else { |
1795 | 0 | DeleteBitmapPB delete_bitmap_pb; |
1796 | 0 | for (auto& [key, bitmap] : delete_bitmap_v2->delete_bitmap) { |
1797 | 0 | add_delete_bitmap(delete_bitmap_pb, key, bitmap); |
1798 | 0 | } |
1799 | 0 | RETURN_IF_ERROR(store_delete_bitmap(rowset_id, delete_bitmap_pb, tablet.tablet_id(), |
1800 | 0 | storage_resource, req, txn_id)); |
1801 | 0 | } |
1802 | 0 | DCHECK_EQ(req.delta_rowset_ids_size(), req.delete_bitmap_storages_size()); |
1803 | 0 | } |
1804 | 59.8k | DBUG_EXECUTE_IF("CloudMetaMgr::test_update_big_delete_bitmap", { |
1805 | 59.8k | LOG(INFO) << "test_update_big_delete_bitmap for tablet " << tablet.tablet_id(); |
1806 | 59.8k | auto count = dp->param<int>("count", 30000); |
1807 | 59.8k | if (!delete_bitmap->delete_bitmap.empty()) { |
1808 | 59.8k | auto& key = delete_bitmap->delete_bitmap.begin()->first; |
1809 | 59.8k | auto& bitmap = delete_bitmap->delete_bitmap.begin()->second; |
1810 | 59.8k | for (int i = 1000; i < (1000 + count); i++) { |
1811 | 59.8k | req.add_rowset_ids(std::get<0>(key).to_string()); |
1812 | 59.8k | req.add_segment_ids(std::get<1>(key)); |
1813 | 59.8k | req.add_versions(i); |
1814 | | // To save space, convert array and bitmap containers to run containers |
1815 | 59.8k | bitmap.runOptimize(); |
1816 | 59.8k | std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); |
1817 | 59.8k | bitmap.write(bitmap_data.data()); |
1818 | 59.8k | *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data); |
1819 | 59.8k | } |
1820 | 59.8k | } |
1821 | 59.8k | }); |
1822 | 59.8k | DBUG_EXECUTE_IF("CloudMetaMgr::test_update_delete_bitmap_fail", { |
1823 | 59.8k | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>( |
1824 | 59.8k | "test update delete bitmap failed, tablet_id: {}, lock_id: {}", tablet.tablet_id(), |
1825 | 59.8k | lock_id); |
1826 | 59.8k | }); |
1827 | 59.8k | auto st = retry_rpc("update delete bitmap", req, &res, &MetaService_Stub::update_delete_bitmap); |
1828 | 59.8k | if (config::enable_update_delete_bitmap_kv_check_core && |
1829 | 59.8k | res.status().code() == MetaServiceCode::UPDATE_OVERRIDE_EXISTING_KV) { |
1830 | 0 | auto& msg = res.status().msg(); |
1831 | 0 | LOG_WARNING(msg); |
1832 | 0 | CHECK(false) << msg; |
1833 | 0 | } |
1834 | 59.8k | if (res.status().code() == MetaServiceCode::LOCK_EXPIRED) { |
1835 | 25 | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>( |
1836 | 25 | "lock expired when update delete bitmap, tablet_id: {}, lock_id: {}, initiator: " |
1837 | 25 | "{}, error_msg: {}", |
1838 | 25 | tablet.tablet_id(), lock_id, initiator, res.status().msg()); |
1839 | 25 | } |
1840 | 59.8k | return st; |
1841 | 59.8k | } |
1842 | | |
1843 | | Status CloudMetaMgr::cloud_update_delete_bitmap_without_lock( |
1844 | | const CloudTablet& tablet, DeleteBitmap* delete_bitmap, |
1845 | | std::map<std::string, int64_t>& rowset_to_versions, int64_t pre_rowset_agg_start_version, |
1846 | 3.91k | int64_t pre_rowset_agg_end_version) { |
1847 | 3.91k | if (config::delete_bitmap_store_write_version == 2) { |
1848 | 0 | VLOG_DEBUG << "no need to agg delete bitmap v1 in ms because use v2"; |
1849 | 0 | return Status::OK(); |
1850 | 0 | } |
1851 | 3.91k | LOG(INFO) << "cloud_update_delete_bitmap_without_lock, tablet_id: " << tablet.tablet_id() |
1852 | 3.91k | << ", delete_bitmap size: " << delete_bitmap->delete_bitmap.size(); |
1853 | 3.91k | UpdateDeleteBitmapRequest req; |
1854 | 3.91k | UpdateDeleteBitmapResponse res; |
1855 | 3.91k | req.set_cloud_unique_id(config::cloud_unique_id); |
1856 | 3.91k | req.set_table_id(tablet.table_id()); |
1857 | 3.91k | req.set_partition_id(tablet.partition_id()); |
1858 | 3.91k | req.set_tablet_id(tablet.tablet_id()); |
1859 | | // use a fake lock id to resolve compatibility issues |
1860 | 3.91k | req.set_lock_id(-3); |
1861 | 3.91k | req.set_without_lock(true); |
1862 | 3.91k | for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) { |
1863 | 1.22k | req.add_rowset_ids(std::get<0>(key).to_string()); |
1864 | 1.22k | req.add_segment_ids(std::get<1>(key)); |
1865 | 1.22k | req.add_versions(std::get<2>(key)); |
1866 | 1.22k | if (pre_rowset_agg_end_version > 0) { |
1867 | 1.22k | DCHECK(rowset_to_versions.find(std::get<0>(key).to_string()) != |
1868 | 0 | rowset_to_versions.end()) |
1869 | 0 | << "rowset_to_versions not found for key=" << std::get<0>(key).to_string(); |
1870 | 1.22k | req.add_pre_rowset_versions(rowset_to_versions[std::get<0>(key).to_string()]); |
1871 | 1.22k | } |
1872 | 1.22k | DCHECK(pre_rowset_agg_end_version <= 0 || pre_rowset_agg_end_version == std::get<2>(key)) |
1873 | 0 | << "pre_rowset_agg_end_version=" << pre_rowset_agg_end_version |
1874 | 0 | << " not equal to version=" << std::get<2>(key); |
1875 | | // To save space, convert array and bitmap containers to run containers |
1876 | 1.22k | bitmap.runOptimize(); |
1877 | 1.22k | std::string bitmap_data(bitmap.getSizeInBytes(), '\0'); |
1878 | 1.22k | bitmap.write(bitmap_data.data()); |
1879 | 1.22k | *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data); |
1880 | 1.22k | } |
1881 | 3.91k | if (pre_rowset_agg_start_version > 0 && pre_rowset_agg_end_version > 0) { |
1882 | 3.91k | req.set_pre_rowset_agg_start_version(pre_rowset_agg_start_version); |
1883 | 3.91k | req.set_pre_rowset_agg_end_version(pre_rowset_agg_end_version); |
1884 | 3.91k | } |
1885 | 3.91k | return retry_rpc("update delete bitmap", req, &res, &MetaService_Stub::update_delete_bitmap); |
1886 | 3.91k | } |
1887 | | |
1888 | | Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id, |
1889 | 5.25k | int64_t initiator) { |
1890 | 5.25k | DBUG_EXECUTE_IF("get_delete_bitmap_update_lock.inject_fail", { |
1891 | 5.25k | auto p = dp->param("percent", 0.01); |
1892 | 5.25k | std::mt19937 gen {std::random_device {}()}; |
1893 | 5.25k | std::bernoulli_distribution inject_fault {p}; |
1894 | 5.25k | if (inject_fault(gen)) { |
1895 | 5.25k | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>( |
1896 | 5.25k | "injection error when get get_delete_bitmap_update_lock, " |
1897 | 5.25k | "tablet_id={}, lock_id={}, initiator={}", |
1898 | 5.25k | tablet.tablet_id(), lock_id, initiator); |
1899 | 5.25k | } |
1900 | 5.25k | }); |
1901 | 18.4E | VLOG_DEBUG << "get_delete_bitmap_update_lock , tablet_id: " << tablet.tablet_id() |
1902 | 18.4E | << ",lock_id:" << lock_id; |
1903 | 5.25k | GetDeleteBitmapUpdateLockRequest req; |
1904 | 5.25k | GetDeleteBitmapUpdateLockResponse res; |
1905 | 5.25k | req.set_cloud_unique_id(config::cloud_unique_id); |
1906 | 5.25k | req.set_table_id(tablet.table_id()); |
1907 | 5.25k | req.set_lock_id(lock_id); |
1908 | 5.25k | req.set_initiator(initiator); |
1909 | | // set expiration time for compaction and schema_change |
1910 | 5.25k | req.set_expiration(config::delete_bitmap_lock_expiration_seconds); |
1911 | 5.25k | int retry_times = 0; |
1912 | 5.25k | Status st; |
1913 | 5.25k | std::default_random_engine rng = make_random_engine(); |
1914 | 5.25k | std::uniform_int_distribution<uint32_t> u(500, 2000); |
1915 | 5.25k | uint64_t backoff_sleep_time_ms {0}; |
1916 | 5.91k | do { |
1917 | 5.91k | bool test_conflict = false; |
1918 | 5.91k | st = retry_rpc("get delete bitmap update lock", req, &res, |
1919 | 5.91k | &MetaService_Stub::get_delete_bitmap_update_lock); |
1920 | 5.91k | DBUG_EXECUTE_IF("CloudMetaMgr::test_get_delete_bitmap_update_lock_conflict", |
1921 | 5.91k | { test_conflict = true; }); |
1922 | 5.91k | if (!test_conflict && res.status().code() != MetaServiceCode::LOCK_CONFLICT) { |
1923 | 5.25k | break; |
1924 | 5.25k | } |
1925 | | |
1926 | 665 | uint32_t duration_ms = u(rng); |
1927 | 665 | LOG(WARNING) << "get delete bitmap lock conflict. " << debug_info(req) |
1928 | 665 | << " retry_times=" << retry_times << " sleep=" << duration_ms |
1929 | 665 | << "ms : " << res.status().msg(); |
1930 | 665 | auto start = std::chrono::steady_clock::now(); |
1931 | 665 | bthread_usleep(duration_ms * 1000); |
1932 | 665 | auto end = std::chrono::steady_clock::now(); |
1933 | 665 | backoff_sleep_time_ms += duration_cast<std::chrono::milliseconds>(end - start).count(); |
1934 | 665 | } while (++retry_times <= config::get_delete_bitmap_lock_max_retry_times); |
1935 | 0 | g_cloud_be_mow_get_dbm_lock_backoff_sleep_time << backoff_sleep_time_ms; |
1936 | 5.25k | DBUG_EXECUTE_IF("CloudMetaMgr.get_delete_bitmap_update_lock.inject_sleep", { |
1937 | 5.25k | auto p = dp->param("percent", 0.01); |
1938 | | // 100s > Config.calculate_delete_bitmap_task_timeout_seconds = 60s |
1939 | 5.25k | auto sleep_time = dp->param("sleep", 15); |
1940 | 5.25k | std::mt19937 gen {std::random_device {}()}; |
1941 | 5.25k | std::bernoulli_distribution inject_fault {p}; |
1942 | 5.25k | if (inject_fault(gen)) { |
1943 | 5.25k | LOG_INFO("injection sleep for {} seconds, tablet_id={}", sleep_time, |
1944 | 5.25k | tablet.tablet_id()); |
1945 | 5.25k | std::this_thread::sleep_for(std::chrono::seconds(sleep_time)); |
1946 | 5.25k | } |
1947 | 5.25k | }); |
1948 | 5.25k | if (res.status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) { |
1949 | 0 | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>( |
1950 | 0 | "txn conflict when get delete bitmap update lock, table_id {}, lock_id {}, " |
1951 | 0 | "initiator {}", |
1952 | 0 | tablet.table_id(), lock_id, initiator); |
1953 | 5.25k | } else if (res.status().code() == MetaServiceCode::LOCK_CONFLICT) { |
1954 | 0 | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>( |
1955 | 0 | "lock conflict when get delete bitmap update lock, table_id {}, lock_id {}, " |
1956 | 0 | "initiator {}", |
1957 | 0 | tablet.table_id(), lock_id, initiator); |
1958 | 0 | } |
1959 | 5.25k | return st; |
1960 | 5.25k | } |
1961 | | |
1962 | | void CloudMetaMgr::remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id, |
1963 | 23 | int64_t initiator, int64_t tablet_id) { |
1964 | 23 | LOG(INFO) << "remove_delete_bitmap_update_lock ,table_id: " << table_id |
1965 | 23 | << ",lock_id:" << lock_id << ",initiator:" << initiator << ",tablet_id:" << tablet_id; |
1966 | 23 | RemoveDeleteBitmapUpdateLockRequest req; |
1967 | 23 | RemoveDeleteBitmapUpdateLockResponse res; |
1968 | 23 | req.set_cloud_unique_id(config::cloud_unique_id); |
1969 | 23 | req.set_table_id(table_id); |
1970 | 23 | req.set_tablet_id(tablet_id); |
1971 | 23 | req.set_lock_id(lock_id); |
1972 | 23 | req.set_initiator(initiator); |
1973 | 23 | auto st = retry_rpc("remove delete bitmap update lock", req, &res, |
1974 | 23 | &MetaService_Stub::remove_delete_bitmap_update_lock); |
1975 | 23 | if (!st.ok()) { |
1976 | 21 | LOG(WARNING) << "remove delete bitmap update lock fail,table_id=" << table_id |
1977 | 21 | << ",tablet_id=" << tablet_id << ",lock_id=" << lock_id |
1978 | 21 | << ",st=" << st.to_string(); |
1979 | 21 | } |
1980 | 23 | } |
1981 | | |
1982 | 201k | void CloudMetaMgr::check_table_size_correctness(RowsetMeta& rs_meta) { |
1983 | 202k | if (!config::enable_table_size_correctness_check) { |
1984 | 202k | return; |
1985 | 202k | } |
1986 | 18.4E | int64_t total_segment_size = get_segment_file_size(rs_meta); |
1987 | 18.4E | int64_t total_inverted_index_size = get_inverted_index_file_size(rs_meta); |
1988 | 18.4E | if (rs_meta.data_disk_size() != total_segment_size || |
1989 | 18.4E | rs_meta.index_disk_size() != total_inverted_index_size || |
1990 | 18.4E | rs_meta.data_disk_size() + rs_meta.index_disk_size() != rs_meta.total_disk_size()) { |
1991 | 0 | LOG(WARNING) << "[Cloud table table size check failed]:" |
1992 | 0 | << " tablet id: " << rs_meta.tablet_id() |
1993 | 0 | << ", rowset id:" << rs_meta.rowset_id() |
1994 | 0 | << ", rowset data disk size:" << rs_meta.data_disk_size() |
1995 | 0 | << ", rowset real data disk size:" << total_segment_size |
1996 | 0 | << ", rowset index disk size:" << rs_meta.index_disk_size() |
1997 | 0 | << ", rowset real index disk size:" << total_inverted_index_size |
1998 | 0 | << ", rowset total disk size:" << rs_meta.total_disk_size() |
1999 | 0 | << ", rowset segment path:" |
2000 | 0 | << StorageResource().remote_segment_path(rs_meta.tablet_id(), |
2001 | 0 | rs_meta.rowset_id().to_string(), 0); |
2002 | 0 | DCHECK(false); |
2003 | 0 | } |
2004 | 18.4E | } |
2005 | | |
2006 | 0 | int64_t CloudMetaMgr::get_segment_file_size(RowsetMeta& rs_meta) { |
2007 | 0 | int64_t total_segment_size = 0; |
2008 | 0 | const auto fs = rs_meta.fs(); |
2009 | 0 | if (!fs) { |
2010 | 0 | LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id(); |
2011 | 0 | } |
2012 | 0 | for (int64_t seg_id = 0; seg_id < rs_meta.num_segments(); seg_id++) { |
2013 | 0 | std::string segment_path = StorageResource().remote_segment_path( |
2014 | 0 | rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id); |
2015 | 0 | int64_t segment_file_size = 0; |
2016 | 0 | auto st = fs->file_size(segment_path, &segment_file_size); |
2017 | 0 | if (!st.ok()) { |
2018 | 0 | segment_file_size = 0; |
2019 | 0 | if (st.is<NOT_FOUND>()) { |
2020 | 0 | LOG(INFO) << "cloud table size correctness check get segment size 0 because " |
2021 | 0 | "file not exist! msg:" |
2022 | 0 | << st.msg() << ", segment path:" << segment_path; |
2023 | 0 | } else { |
2024 | 0 | LOG(WARNING) << "cloud table size correctness check get segment size failed! msg:" |
2025 | 0 | << st.msg() << ", segment path:" << segment_path; |
2026 | 0 | } |
2027 | 0 | } |
2028 | 0 | total_segment_size += segment_file_size; |
2029 | 0 | } |
2030 | 0 | return total_segment_size; |
2031 | 0 | } |
2032 | | |
2033 | 0 | int64_t CloudMetaMgr::get_inverted_index_file_size(RowsetMeta& rs_meta) { |
2034 | 0 | int64_t total_inverted_index_size = 0; |
2035 | 0 | const auto fs = rs_meta.fs(); |
2036 | 0 | if (!fs) { |
2037 | 0 | LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id(); |
2038 | 0 | } |
2039 | 0 | if (rs_meta.tablet_schema()->get_inverted_index_storage_format() == |
2040 | 0 | InvertedIndexStorageFormatPB::V1) { |
2041 | 0 | const auto& indices = rs_meta.tablet_schema()->inverted_indexes(); |
2042 | 0 | for (auto& index : indices) { |
2043 | 0 | for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) { |
2044 | 0 | std::string segment_path = StorageResource().remote_segment_path( |
2045 | 0 | rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id); |
2046 | 0 | int64_t file_size = 0; |
2047 | |
|
2048 | 0 | std::string inverted_index_file_path = |
2049 | 0 | InvertedIndexDescriptor::get_index_file_path_v1( |
2050 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(segment_path), |
2051 | 0 | index->index_id(), index->get_index_suffix()); |
2052 | 0 | auto st = fs->file_size(inverted_index_file_path, &file_size); |
2053 | 0 | if (!st.ok()) { |
2054 | 0 | file_size = 0; |
2055 | 0 | if (st.is<NOT_FOUND>()) { |
2056 | 0 | LOG(INFO) << "cloud table size correctness check get inverted index v1 " |
2057 | 0 | "0 because file not exist! msg:" |
2058 | 0 | << st.msg() |
2059 | 0 | << ", inverted index path:" << inverted_index_file_path; |
2060 | 0 | } else { |
2061 | 0 | LOG(WARNING) |
2062 | 0 | << "cloud table size correctness check get inverted index v1 " |
2063 | 0 | "size failed! msg:" |
2064 | 0 | << st.msg() << ", inverted index path:" << inverted_index_file_path; |
2065 | 0 | } |
2066 | 0 | } |
2067 | 0 | total_inverted_index_size += file_size; |
2068 | 0 | } |
2069 | 0 | } |
2070 | 0 | } else { |
2071 | 0 | for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) { |
2072 | 0 | int64_t file_size = 0; |
2073 | 0 | std::string segment_path = StorageResource().remote_segment_path( |
2074 | 0 | rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id); |
2075 | |
|
2076 | 0 | std::string inverted_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2( |
2077 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)); |
2078 | 0 | auto st = fs->file_size(inverted_index_file_path, &file_size); |
2079 | 0 | if (!st.ok()) { |
2080 | 0 | file_size = 0; |
2081 | 0 | if (st.is<NOT_FOUND>()) { |
2082 | 0 | LOG(INFO) << "cloud table size correctness check get inverted index v2 " |
2083 | 0 | "0 because file not exist! msg:" |
2084 | 0 | << st.msg() << ", inverted index path:" << inverted_index_file_path; |
2085 | 0 | } else { |
2086 | 0 | LOG(WARNING) << "cloud table size correctness check get inverted index v2 " |
2087 | 0 | "size failed! msg:" |
2088 | 0 | << st.msg() |
2089 | 0 | << ", inverted index path:" << inverted_index_file_path; |
2090 | 0 | } |
2091 | 0 | } |
2092 | 0 | total_inverted_index_size += file_size; |
2093 | 0 | } |
2094 | 0 | } |
2095 | 0 | return total_inverted_index_size; |
2096 | 0 | } |
2097 | | |
2098 | | Status CloudMetaMgr::fill_version_holes(CloudTablet* tablet, int64_t max_version, |
2099 | 360k | std::unique_lock<std::shared_mutex>& wlock) { |
2100 | 360k | if (max_version <= 0) { |
2101 | 136k | return Status::OK(); |
2102 | 136k | } |
2103 | | |
2104 | 223k | Versions existing_versions; |
2105 | 1.29M | for (const auto& [_, rs] : tablet->tablet_meta()->all_rs_metas()) { |
2106 | 1.29M | existing_versions.emplace_back(rs->version()); |
2107 | 1.29M | } |
2108 | | |
2109 | | // If there are no existing versions, it may be a new tablet for restore, so skip filling holes. |
2110 | 223k | if (existing_versions.empty()) { |
2111 | 1 | return Status::OK(); |
2112 | 1 | } |
2113 | | |
2114 | 223k | std::vector<RowsetSharedPtr> hole_rowsets; |
2115 | | // sort the existing versions in ascending order |
2116 | 223k | std::sort(existing_versions.begin(), existing_versions.end(), |
2117 | 3.92M | [](const Version& a, const Version& b) { |
2118 | | // simple because 2 versions are certainly not overlapping |
2119 | 3.92M | return a.first < b.first; |
2120 | 3.92M | }); |
2121 | | |
2122 | | // During schema change, get_tablet operations on new tablets trigger sync_tablet_rowsets which calls |
2123 | | // fill_version_holes. For schema change tablets (TABLET_NOTREADY state), we selectively skip hole |
2124 | | // filling for versions <= alter_version to prevent: |
2125 | | // 1. Abnormal compaction score calculations for schema change tablets |
2126 | | // 2. Unexpected -235 errors during load operations |
2127 | | // This allows schema change to proceed normally while still permitting hole filling for versions |
2128 | | // beyond the alter_version threshold. |
2129 | 223k | bool is_schema_change_tablet = tablet->tablet_state() == TABLET_NOTREADY; |
2130 | 223k | if (is_schema_change_tablet && tablet->alter_version() <= 1) { |
2131 | 5.50k | LOG(INFO) << "Skip version hole filling for new schema change tablet " |
2132 | 5.50k | << tablet->tablet_id() << " with alter_version " << tablet->alter_version(); |
2133 | 5.50k | return Status::OK(); |
2134 | 5.50k | } |
2135 | | |
2136 | 218k | int64_t last_version = -1; |
2137 | 1.28M | for (const Version& version : existing_versions) { |
2138 | 18.4E | VLOG_NOTICE << "Existing version for tablet " << tablet->tablet_id() << ": [" |
2139 | 18.4E | << version.first << ", " << version.second << "]"; |
2140 | | // missing versions are those that are not in the existing_versions |
2141 | 1.28M | if (version.first > last_version + 1) { |
2142 | | // there is a hole between versions |
2143 | 19 | auto prev_non_hole_rowset = tablet->get_rowset_by_version(version); |
2144 | 157 | for (int64_t ver = last_version + 1; ver < version.first; ++ver) { |
2145 | | // Skip hole filling for versions <= alter_version during schema change |
2146 | 138 | if (is_schema_change_tablet && ver <= tablet->alter_version()) { |
2147 | 128 | continue; |
2148 | 128 | } |
2149 | 10 | RowsetSharedPtr hole_rowset; |
2150 | 10 | RETURN_IF_ERROR(create_empty_rowset_for_hole( |
2151 | 10 | tablet, ver, prev_non_hole_rowset->rowset_meta(), &hole_rowset)); |
2152 | 10 | hole_rowsets.push_back(hole_rowset); |
2153 | 10 | } |
2154 | 19 | LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1 |
2155 | 19 | << " to " << version.first - 1 << " for tablet " << tablet->tablet_id() |
2156 | 19 | << (is_schema_change_tablet |
2157 | 19 | ? (", schema change tablet skipped filling versions <= " + |
2158 | 13 | std::to_string(tablet->alter_version())) |
2159 | 19 | : ""); |
2160 | 19 | } |
2161 | 1.28M | last_version = version.second; |
2162 | 1.28M | } |
2163 | | |
2164 | 218k | if (last_version + 1 <= max_version) { |
2165 | 711 | LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1 << " to " |
2166 | 711 | << max_version << " for tablet " << tablet->tablet_id() |
2167 | 711 | << (is_schema_change_tablet |
2168 | 711 | ? (", schema change tablet skipped filling versions <= " + |
2169 | 9 | std::to_string(tablet->alter_version())) |
2170 | 711 | : ""); |
2171 | | // there is a hole after the last existing version |
2172 | 2.98k | for (; last_version + 1 <= max_version; ++last_version) { |
2173 | | // Skip hole filling for versions <= alter_version during schema change |
2174 | 2.26k | if (is_schema_change_tablet && last_version + 1 <= tablet->alter_version()) { |
2175 | 149 | continue; |
2176 | 149 | } |
2177 | 2.12k | RowsetSharedPtr hole_rowset; |
2178 | 2.12k | auto prev_non_hole_rowset = tablet->get_rowset_by_version(existing_versions.back()); |
2179 | 2.12k | RETURN_IF_ERROR(create_empty_rowset_for_hole( |
2180 | 2.12k | tablet, last_version + 1, prev_non_hole_rowset->rowset_meta(), &hole_rowset)); |
2181 | 2.12k | hole_rowsets.push_back(hole_rowset); |
2182 | 2.12k | } |
2183 | 711 | } |
2184 | | |
2185 | 218k | if (!hole_rowsets.empty()) { |
2186 | 705 | size_t hole_count = hole_rowsets.size(); |
2187 | 705 | tablet->add_rowsets(std::move(hole_rowsets), false, wlock, false); |
2188 | 705 | g_cloud_version_hole_filled_count << hole_count; |
2189 | 705 | } |
2190 | 218k | return Status::OK(); |
2191 | 218k | } |
2192 | | |
2193 | | Status CloudMetaMgr::create_empty_rowset_for_hole(CloudTablet* tablet, int64_t version, |
2194 | | RowsetMetaSharedPtr prev_rowset_meta, |
2195 | 2.13k | RowsetSharedPtr* rowset) { |
2196 | | // Create a RowsetMeta for the empty rowset |
2197 | 2.13k | auto rs_meta = std::make_shared<RowsetMeta>(); |
2198 | | |
2199 | | // Generate a deterministic rowset ID for the hole (same tablet_id + version = same rowset_id) |
2200 | 2.13k | RowsetId hole_rowset_id; |
2201 | 2.13k | hole_rowset_id.init(2, 0, tablet->tablet_id(), version); |
2202 | 2.13k | rs_meta->set_rowset_id(hole_rowset_id); |
2203 | | |
2204 | | // Generate a deterministic load_id for the hole rowset (same tablet_id + version = same load_id) |
2205 | 2.13k | PUniqueId load_id; |
2206 | 2.13k | load_id.set_hi(tablet->tablet_id()); |
2207 | 2.13k | load_id.set_lo(version); |
2208 | 2.13k | rs_meta->set_load_id(load_id); |
2209 | | |
2210 | | // Copy schema and other metadata from template |
2211 | 2.13k | rs_meta->set_tablet_schema(prev_rowset_meta->tablet_schema()); |
2212 | 2.13k | rs_meta->set_rowset_type(prev_rowset_meta->rowset_type()); |
2213 | 2.13k | rs_meta->set_tablet_schema_hash(prev_rowset_meta->tablet_schema_hash()); |
2214 | 2.13k | rs_meta->set_resource_id(prev_rowset_meta->resource_id()); |
2215 | | |
2216 | | // Basic tablet information |
2217 | 2.13k | rs_meta->set_tablet_id(tablet->tablet_id()); |
2218 | 2.13k | rs_meta->set_index_id(tablet->index_id()); |
2219 | 2.13k | rs_meta->set_partition_id(tablet->partition_id()); |
2220 | 2.13k | rs_meta->set_tablet_uid(tablet->tablet_uid()); |
2221 | 2.13k | rs_meta->set_version(Version(version, version)); |
2222 | 2.13k | rs_meta->set_txn_id(version); |
2223 | | |
2224 | 2.13k | rs_meta->set_num_rows(0); |
2225 | 2.13k | rs_meta->set_total_disk_size(0); |
2226 | 2.13k | rs_meta->set_data_disk_size(0); |
2227 | 2.13k | rs_meta->set_index_disk_size(0); |
2228 | 2.13k | rs_meta->set_empty(true); |
2229 | 2.13k | rs_meta->set_num_segments(0); |
2230 | 2.13k | rs_meta->set_segments_overlap(NONOVERLAPPING); |
2231 | 2.13k | rs_meta->set_rowset_state(VISIBLE); |
2232 | 2.13k | rs_meta->set_creation_time(UnixSeconds()); |
2233 | 2.13k | rs_meta->set_newest_write_timestamp(UnixSeconds()); |
2234 | | |
2235 | 2.13k | Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, rowset); |
2236 | 2.13k | if (!s.ok()) { |
2237 | 0 | LOG_WARNING("Failed to create empty rowset for hole") |
2238 | 0 | .tag("tablet_id", tablet->tablet_id()) |
2239 | 0 | .tag("version", version) |
2240 | 0 | .error(s); |
2241 | 0 | return s; |
2242 | 0 | } |
2243 | 2.13k | (*rowset)->set_hole_rowset(true); |
2244 | | |
2245 | 2.13k | return Status::OK(); |
2246 | 2.13k | } |
2247 | | |
2248 | 2 | Status CloudMetaMgr::list_snapshot(std::vector<SnapshotInfoPB>& snapshots) { |
2249 | 2 | ListSnapshotRequest req; |
2250 | 2 | ListSnapshotResponse res; |
2251 | 2 | req.set_cloud_unique_id(config::cloud_unique_id); |
2252 | 2 | req.set_include_aborted(true); |
2253 | 2 | RETURN_IF_ERROR(retry_rpc("list snapshot", req, &res, &MetaService_Stub::list_snapshot)); |
2254 | 0 | for (auto& snapshot : res.snapshots()) { |
2255 | 0 | snapshots.emplace_back(snapshot); |
2256 | 0 | } |
2257 | 0 | return Status::OK(); |
2258 | 2 | } |
2259 | | |
2260 | | Status CloudMetaMgr::get_snapshot_properties(SnapshotSwitchStatus& switch_status, |
2261 | | int64_t& max_reserved_snapshots, |
2262 | 1 | int64_t& snapshot_interval_seconds) { |
2263 | 1 | GetInstanceRequest req; |
2264 | 1 | GetInstanceResponse res; |
2265 | 1 | req.set_cloud_unique_id(config::cloud_unique_id); |
2266 | 1 | RETURN_IF_ERROR( |
2267 | 1 | retry_rpc("get snapshot properties", req, &res, &MetaService_Stub::get_instance)); |
2268 | 1 | switch_status = res.instance().has_snapshot_switch_status() |
2269 | 1 | ? res.instance().snapshot_switch_status() |
2270 | 1 | : SnapshotSwitchStatus::SNAPSHOT_SWITCH_DISABLED; |
2271 | 1 | max_reserved_snapshots = |
2272 | 1 | res.instance().has_max_reserved_snapshot() ? res.instance().max_reserved_snapshot() : 0; |
2273 | 1 | snapshot_interval_seconds = res.instance().has_snapshot_interval_seconds() |
2274 | 1 | ? res.instance().snapshot_interval_seconds() |
2275 | 1 | : 3600; |
2276 | 1 | return Status::OK(); |
2277 | 1 | } |
2278 | | |
2279 | | Status CloudMetaMgr::update_packed_file_info(const std::string& packed_file_path, |
2280 | 14.6k | const cloud::PackedFileInfoPB& packed_file_info) { |
2281 | 14.6k | VLOG_DEBUG << "Updating meta service for packed file: " << packed_file_path << " with " |
2282 | 0 | << packed_file_info.total_slice_num() << " small files" |
2283 | 0 | << ", total bytes: " << packed_file_info.total_slice_bytes(); |
2284 | | |
2285 | | // Create request |
2286 | 14.6k | cloud::UpdatePackedFileInfoRequest req; |
2287 | 14.6k | cloud::UpdatePackedFileInfoResponse resp; |
2288 | | |
2289 | | // Set required fields |
2290 | 14.6k | req.set_cloud_unique_id(config::cloud_unique_id); |
2291 | 14.6k | req.set_packed_file_path(packed_file_path); |
2292 | 14.6k | *req.mutable_packed_file_info() = packed_file_info; |
2293 | | |
2294 | | // Make RPC call using retry pattern |
2295 | 14.6k | return retry_rpc("update packed file info", req, &resp, |
2296 | 14.6k | &cloud::MetaService_Stub::update_packed_file_info); |
2297 | 14.6k | } |
2298 | | |
2299 | | Status CloudMetaMgr::get_cluster_status( |
2300 | | std::unordered_map<std::string, std::pair<int32_t, int64_t>>* result, |
2301 | 76 | std::string* my_cluster_id) { |
2302 | 76 | GetClusterStatusRequest req; |
2303 | 76 | GetClusterStatusResponse resp; |
2304 | 76 | req.add_cloud_unique_ids(config::cloud_unique_id); |
2305 | | |
2306 | 76 | Status s = retry_rpc("get cluster status", req, &resp, &MetaService_Stub::get_cluster_status); |
2307 | 76 | if (!s.ok()) { |
2308 | 0 | return s; |
2309 | 0 | } |
2310 | | |
2311 | 76 | result->clear(); |
2312 | 76 | for (const auto& detail : resp.details()) { |
2313 | 76 | for (const auto& cluster : detail.clusters()) { |
2314 | | // Store cluster status and mtime (mtime is in seconds from MS, convert to ms). |
2315 | | // If mtime is not set, use current time as a conservative default |
2316 | | // to avoid immediate takeover due to elapsed being huge. |
2317 | 76 | int64_t mtime_ms = cluster.has_mtime() ? cluster.mtime() * 1000 : UnixMillis(); |
2318 | 76 | (*result)[cluster.cluster_id()] = {static_cast<int32_t>(cluster.cluster_status()), |
2319 | 76 | mtime_ms}; |
2320 | 76 | } |
2321 | 76 | } |
2322 | | |
2323 | 76 | if (my_cluster_id && resp.has_requester_cluster_id()) { |
2324 | 1 | *my_cluster_id = resp.requester_cluster_id(); |
2325 | 1 | } |
2326 | | |
2327 | 76 | return Status::OK(); |
2328 | 76 | } |
2329 | | |
2330 | | #include "common/compile_check_end.h" |
2331 | | } // namespace doris::cloud |