Coverage Report

Created: 2026-06-01 13:30

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_meta_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#include "cloud/cloud_meta_mgr.h"
18
19
#include <brpc/channel.h>
20
#include <brpc/controller.h>
21
#include <brpc/errno.pb.h>
22
#include <bthread/bthread.h>
23
#include <bthread/condition_variable.h>
24
#include <bthread/mutex.h>
25
#include <gen_cpp/FrontendService.h>
26
#include <gen_cpp/HeartbeatService_types.h>
27
#include <gen_cpp/PlanNodes_types.h>
28
#include <gen_cpp/Types_types.h>
29
#include <gen_cpp/cloud.pb.h>
30
#include <gen_cpp/olap_file.pb.h>
31
#include <glog/logging.h>
32
33
#include <algorithm>
34
#include <atomic>
35
#include <chrono>
36
#include <cstdint>
37
#include <memory>
38
#include <mutex>
39
#include <random>
40
#include <shared_mutex>
41
#include <string>
42
#include <type_traits>
43
#include <vector>
44
45
#include "cloud/cloud_ms_backpressure_handler.h"
46
#include "cloud/cloud_ms_rpc_rate_limiters.h"
47
#include "cloud/cloud_storage_engine.h"
48
#include "cloud/cloud_tablet.h"
49
#include "cloud/cloud_warm_up_manager.h"
50
#include "cloud/config.h"
51
#include "cloud/delete_bitmap_file_reader.h"
52
#include "cloud/delete_bitmap_file_writer.h"
53
#include "cloud/pb_convert.h"
54
#include "common/config.h"
55
#include "common/logging.h"
56
#include "common/status.h"
57
#include "cpp/sync_point.h"
58
#include "io/fs/obj_storage_client.h"
59
#include "load/stream_load/stream_load_context.h"
60
#include "runtime/exec_env.h"
61
#include "storage/olap_common.h"
62
#include "storage/rowset/rowset.h"
63
#include "storage/rowset/rowset_factory.h"
64
#include "storage/rowset/rowset_fwd.h"
65
#include "storage/storage_engine.h"
66
#include "storage/tablet/tablet_meta.h"
67
#include "util/client_cache.h"
68
#include "util/network_util.h"
69
#include "util/s3_util.h"
70
#include "util/thrift_rpc_helper.h"
71
72
namespace doris::cloud {
73
using namespace ErrorCode;
74
75
1.58M
void* run_bthread_work(void* arg) {
76
1.58M
    auto* f = reinterpret_cast<std::function<void()>*>(arg);
77
1.58M
    (*f)();
78
1.58M
    delete f;
79
1.58M
    return nullptr;
80
1.58M
}
81
82
283k
Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency) {
83
283k
    if (tasks.empty()) {
84
2.19k
        return Status::OK();
85
2.19k
    }
86
87
281k
    bthread::Mutex lock;
88
281k
    bthread::ConditionVariable cond;
89
281k
    Status status; // Guard by lock
90
281k
    int count = 0; // Guard by lock
91
92
1.36M
    for (const auto& task : tasks) {
93
1.36M
        {
94
1.36M
            std::unique_lock lk(lock);
95
            // Wait until there are available slots
96
1.43M
            while (status.ok() && count >= concurrency) {
97
76.8k
                cond.wait(lk);
98
76.8k
            }
99
1.36M
            if (!status.ok()) {
100
2
                break;
101
2
            }
102
103
            // Increase running task count
104
1.36M
            ++count;
105
1.36M
        }
106
107
        // dispatch task into bthreads
108
1.35M
        auto* fn = new std::function<void()>([&, &task = task] {
109
1.35M
            auto st = task();
110
1.35M
            {
111
1.35M
                std::lock_guard lk(lock);
112
1.35M
                --count;
113
1.35M
                if (!st.ok()) {
114
2
                    std::swap(st, status);
115
2
                }
116
1.35M
                cond.notify_one();
117
1.35M
            }
118
1.35M
        });
119
120
1.36M
        bthread_t bthread_id;
121
1.36M
        if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, fn) != 0) {
122
0
            run_bthread_work(fn);
123
0
        }
124
1.36M
    }
125
126
    // Wait until all running tasks have done
127
281k
    {
128
281k
        std::unique_lock lk(lock);
129
889k
        while (count > 0) {
130
608k
            cond.wait(lk);
131
608k
        }
132
281k
    }
133
134
281k
    return status;
135
283k
}
136
137
Status bthread_fork_join(std::vector<std::function<Status()>>&& tasks, int concurrency,
138
223k
                         std::future<Status>* fut) {
139
    // std::function will cause `copy`, we need to use heap memory to avoid copy ctor called
140
223k
    auto prom = std::make_shared<std::promise<Status>>();
141
223k
    *fut = prom->get_future();
142
223k
    std::function<void()>* fn = new std::function<void()>(
143
226k
            [tasks = std::move(tasks), concurrency, p = std::move(prom)]() mutable {
144
226k
                p->set_value(bthread_fork_join(tasks, concurrency));
145
226k
            });
146
147
223k
    bthread_t bthread_id;
148
223k
    if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, fn) != 0) {
149
0
        delete fn;
150
0
        return Status::InternalError<false>("failed to create bthread");
151
0
    }
152
223k
    return Status::OK();
153
223k
}
154
155
namespace {
156
constexpr int kBrpcRetryTimes = 3;
157
158
bvar::LatencyRecorder _get_rowset_latency("doris_cloud_meta_mgr_get_rowset");
159
bvar::LatencyRecorder g_cloud_commit_txn_resp_redirect_latency("cloud_table_stats_report_latency");
160
bvar::Adder<uint64_t> g_cloud_meta_mgr_rpc_timeout_count("cloud_meta_mgr_rpc_timeout_count");
161
bvar::Window<bvar::Adder<uint64_t>> g_cloud_ms_rpc_timeout_count_window(
162
        "cloud_meta_mgr_rpc_timeout_qps", &g_cloud_meta_mgr_rpc_timeout_count, 30);
163
bvar::Adder<uint64_t> g_cloud_meta_mgr_ms_too_busy_reason_total_count(
164
        "cloud_meta_mgr_ms_too_busy_reason", "total");
165
bvar::Adder<uint64_t> g_cloud_meta_mgr_ms_too_busy_reason_fdb_cluster_count(
166
        "cloud_meta_mgr_ms_too_busy_reason", "fdb_cluster");
167
bvar::Adder<uint64_t> g_cloud_meta_mgr_ms_too_busy_reason_fdb_client_thread_count(
168
        "cloud_meta_mgr_ms_too_busy_reason", "fdb_client_thread");
169
bvar::Adder<uint64_t> g_cloud_meta_mgr_ms_too_busy_reason_ms_resource_count(
170
        "cloud_meta_mgr_ms_too_busy_reason", "ms_resource");
171
bvar::Adder<uint64_t> g_cloud_meta_mgr_ms_too_busy_reason_test_injection_count(
172
        "cloud_meta_mgr_ms_too_busy_reason", "test_injection");
173
bvar::Adder<uint64_t> g_cloud_meta_mgr_ms_too_busy_reason_no_stress_condition_matched_count(
174
        "cloud_meta_mgr_ms_too_busy_reason", "no_stress_condition_matched");
175
bvar::LatencyRecorder g_cloud_be_mow_get_dbm_lock_backoff_sleep_time(
176
        "cloud_be_mow_get_dbm_lock_backoff_sleep_time");
177
bvar::Adder<uint64_t> g_cloud_version_hole_filled_count("cloud_version_hole_filled_count");
178
179
class MetaServiceProxy {
180
public:
181
997k
    static Status get_proxy(MetaServiceProxy** proxy) {
182
        // The 'stub' is a useless parameter, added only to reuse the `get_pooled_client` function.
183
997k
        std::shared_ptr<MetaService_Stub> stub;
184
997k
        return get_pooled_client(&stub, proxy);
185
997k
    }
186
187
0
    void set_unhealthy() {
188
0
        std::unique_lock lock(_mutex);
189
0
        maybe_unhealthy = true;
190
0
    }
191
192
2.00M
    bool need_reconn(long now) {
193
2.00M
        return maybe_unhealthy && ((now - last_reconn_time_ms.front()) >
194
0
                                   config::meta_service_rpc_reconnect_interval_ms);
195
2.00M
    }
196
197
1.99M
    Status get(std::shared_ptr<MetaService_Stub>* stub) {
198
1.99M
        using namespace std::chrono;
199
200
1.99M
        auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
201
1.99M
        {
202
1.99M
            std::shared_lock lock(_mutex);
203
2.01M
            if (_deadline_ms >= now && !is_idle_timeout(now) && !need_reconn(now)) {
204
2.00M
                _last_access_at_ms.store(now, std::memory_order_relaxed);
205
2.00M
                *stub = _stub;
206
2.00M
                return Status::OK();
207
2.00M
            }
208
1.99M
        }
209
210
18.4E
        auto channel = std::make_unique<brpc::Channel>();
211
18.4E
        Status s = init_channel(channel.get());
212
18.4E
        if (!s.ok()) [[unlikely]] {
213
0
            return s;
214
0
        }
215
216
18.4E
        *stub = std::make_shared<MetaService_Stub>(channel.release(),
217
18.4E
                                                   google::protobuf::Service::STUB_OWNS_CHANNEL);
218
219
18.4E
        long deadline = now;
220
        // connection age only works without list endpoint.
221
18.4E
        if (config::meta_service_connection_age_base_seconds > 0) {
222
1.41k
            std::default_random_engine rng(static_cast<uint32_t>(now));
223
1.41k
            std::uniform_int_distribution<> uni(
224
1.41k
                    config::meta_service_connection_age_base_seconds,
225
1.41k
                    config::meta_service_connection_age_base_seconds * 2);
226
1.41k
            deadline = now + duration_cast<milliseconds>(seconds(uni(rng))).count();
227
1.41k
        }
228
229
        // Last one WIN
230
18.4E
        std::unique_lock lock(_mutex);
231
18.4E
        _last_access_at_ms.store(now, std::memory_order_relaxed);
232
18.4E
        _deadline_ms = deadline;
233
18.4E
        _stub = *stub;
234
235
18.4E
        last_reconn_time_ms.push(now);
236
18.4E
        last_reconn_time_ms.pop();
237
18.4E
        maybe_unhealthy = false;
238
239
18.4E
        return Status::OK();
240
18.4E
    }
241
242
private:
243
2.00M
    static bool is_meta_service_endpoint_list() {
244
2.00M
        return config::meta_service_endpoint.find(',') != std::string::npos;
245
2.00M
    }
246
247
    /**
248
    * This function initializes a pool of `MetaServiceProxy` objects and selects one using
249
    * round-robin. It returns a client stub via the selected proxy.
250
    *
251
    * @param stub A pointer to a shared pointer of `MetaService_Stub` to be retrieved.
252
    * @param proxy (Optional) A pointer to store the selected `MetaServiceProxy`.
253
    *
254
    * @return Status Returns `Status::OK()` on success or an error status on failure.
255
    */
256
    static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub,
257
1.00M
                                    MetaServiceProxy** proxy) {
258
1.00M
        static std::once_flag proxies_flag;
259
1.00M
        static size_t num_proxies = 1;
260
1.00M
        static std::atomic<size_t> index(0);
261
1.00M
        static std::unique_ptr<MetaServiceProxy[]> proxies;
262
1.00M
        if (config::meta_service_endpoint.empty()) {
263
21
            return Status::InvalidArgument(
264
21
                    "Meta service endpoint is empty. Please configure manually or wait for "
265
21
                    "heartbeat to obtain.");
266
21
        }
267
1.00M
        std::call_once(
268
1.00M
                proxies_flag, +[]() {
269
1
                    if (config::meta_service_connection_pooled) {
270
1
                        num_proxies = config::meta_service_connection_pool_size;
271
1
                    }
272
1
                    proxies = std::make_unique<MetaServiceProxy[]>(num_proxies);
273
1
                });
274
275
1.00M
        for (size_t i = 0; i + 1 < num_proxies; ++i) {
276
1.00M
            size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies;
277
1.00M
            Status s = proxies[next_index].get(stub);
278
1.00M
            if (proxy != nullptr) {
279
1.00M
                *proxy = &(proxies[next_index]);
280
1.00M
            }
281
1.00M
            if (s.ok()) return Status::OK();
282
1.00M
        }
283
284
5.52k
        size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies;
285
5.52k
        if (proxy != nullptr) {
286
0
            *proxy = &(proxies[next_index]);
287
0
        }
288
5.52k
        return proxies[next_index].get(stub);
289
1.00M
    }
290
291
1.40k
    static Status init_channel(brpc::Channel* channel) {
292
1.40k
        static std::atomic<size_t> index = 1;
293
294
1.40k
        const char* load_balancer_name = nullptr;
295
1.40k
        std::string endpoint;
296
1.40k
        if (is_meta_service_endpoint_list()) {
297
0
            endpoint = fmt::format("list://{}", config::meta_service_endpoint);
298
0
            load_balancer_name = "random";
299
1.40k
        } else {
300
1.40k
            std::string ip;
301
1.40k
            uint16_t port;
302
1.40k
            Status s = get_meta_service_ip_and_port(&ip, &port);
303
1.40k
            if (!s.ok()) {
304
0
                LOG(WARNING) << "fail to get meta service ip and port: " << s;
305
0
                return s;
306
0
            }
307
308
1.40k
            endpoint = get_host_port(ip, port);
309
1.40k
        }
310
311
1.40k
        brpc::ChannelOptions options;
312
1.40k
        options.connection_group =
313
1.40k
                fmt::format("ms_{}", index.fetch_add(1, std::memory_order_relaxed));
314
1.40k
        if (channel->Init(endpoint.c_str(), load_balancer_name, &options) != 0) {
315
0
            return Status::InvalidArgument("failed to init brpc channel, endpoint: {}", endpoint);
316
0
        }
317
1.40k
        return Status::OK();
318
1.40k
    }
319
320
1.38k
    static Status get_meta_service_ip_and_port(std::string* ip, uint16_t* port) {
321
1.38k
        std::string parsed_host;
322
1.38k
        if (!parse_endpoint(config::meta_service_endpoint, &parsed_host, port)) {
323
0
            return Status::InvalidArgument("invalid meta service endpoint: {}",
324
0
                                           config::meta_service_endpoint);
325
0
        }
326
1.39k
        if (is_valid_ip(parsed_host)) {
327
1.39k
            *ip = std::move(parsed_host);
328
1.39k
            return Status::OK();
329
1.39k
        }
330
18.4E
        return hostname_to_ip(parsed_host, *ip);
331
1.38k
    }
332
333
2.00M
    bool is_idle_timeout(long now) {
334
2.00M
        auto idle_timeout_ms = config::meta_service_idle_connection_timeout_ms;
335
        // idle timeout only works without list endpoint.
336
2.00M
        return !is_meta_service_endpoint_list() && idle_timeout_ms > 0 &&
337
2.00M
               _last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now;
338
2.00M
    }
339
340
    std::shared_mutex _mutex;
341
    std::atomic<long> _last_access_at_ms {0};
342
    long _deadline_ms {0};
343
    std::shared_ptr<MetaService_Stub> _stub;
344
345
    std::queue<long> last_reconn_time_ms {std::deque<long> {0, 0, 0}};
346
    bool maybe_unhealthy = false;
347
};
348
349
template <typename T, typename... Ts>
350
struct is_any : std::disjunction<std::is_same<T, Ts>...> {};
351
352
template <typename T, typename... Ts>
353
constexpr bool is_any_v = is_any<T, Ts...>::value;
354
355
template <typename Request>
356
431
static std::string debug_info(const Request& req) {
357
431
    if constexpr (is_any_v<Request, CommitTxnRequest, AbortTxnRequest, PrecommitTxnRequest>) {
358
0
        return fmt::format(" txn_id={}", req.txn_id());
359
0
    } else if constexpr (is_any_v<Request, StartTabletJobRequest, FinishTabletJobRequest>) {
360
0
        return fmt::format(" tablet_id={}", req.job().idx().tablet_id());
361
0
    } else if constexpr (is_any_v<Request, UpdateDeleteBitmapRequest>) {
362
0
        return fmt::format(" tablet_id={}, lock_id={}", req.tablet_id(), req.lock_id());
363
431
    } else if constexpr (is_any_v<Request, GetDeleteBitmapUpdateLockRequest>) {
364
431
        return fmt::format(" table_id={}, lock_id={}", req.table_id(), req.lock_id());
365
431
    } else if constexpr (is_any_v<Request, GetTabletRequest>) {
366
0
        return fmt::format(" tablet_id={}", req.tablet_id());
367
    } else if constexpr (is_any_v<Request, GetObjStoreInfoRequest, ListSnapshotRequest,
368
0
                                  GetInstanceRequest, GetClusterStatusRequest>) {
369
0
        return "";
370
0
    } else if constexpr (is_any_v<Request, CreateRowsetRequest>) {
371
0
        return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id());
372
    } else if constexpr (is_any_v<Request, RemoveDeleteBitmapRequest>) {
373
        return fmt::format(" tablet_id={}", req.tablet_id());
374
0
    } else if constexpr (is_any_v<Request, RemoveDeleteBitmapUpdateLockRequest>) {
375
0
        return fmt::format(" table_id={}, tablet_id={}, lock_id={}", req.table_id(),
376
0
                           req.tablet_id(), req.lock_id());
377
0
    } else if constexpr (is_any_v<Request, GetDeleteBitmapRequest>) {
378
0
        return fmt::format(" tablet_id={}", req.tablet_id());
379
    } else if constexpr (is_any_v<Request, GetSchemaDictRequest>) {
380
        return fmt::format(" index_id={}", req.index_id());
381
0
    } else if constexpr (is_any_v<Request, RestoreJobRequest>) {
382
0
        return fmt::format(" tablet_id={}", req.tablet_id());
383
0
    } else if constexpr (is_any_v<Request, UpdatePackedFileInfoRequest>) {
384
0
        return fmt::format(" packed_file_path={}", req.packed_file_path());
385
    } else {
386
        static_assert(!sizeof(Request));
387
    }
388
431
}
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_16GetTabletRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_22GetDeleteBitmapRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_19CreateRowsetRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_16CommitTxnRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_15AbortTxnRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_19PrecommitTxnRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_17RestoreJobRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_22GetObjStoreInfoRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_21StartTabletJobRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_22FinishTabletJobRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_25UpdateDeleteBitmapRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_32GetDeleteBitmapUpdateLockRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Line
Count
Source
356
431
static std::string debug_info(const Request& req) {
357
    if constexpr (is_any_v<Request, CommitTxnRequest, AbortTxnRequest, PrecommitTxnRequest>) {
358
        return fmt::format(" txn_id={}", req.txn_id());
359
    } else if constexpr (is_any_v<Request, StartTabletJobRequest, FinishTabletJobRequest>) {
360
        return fmt::format(" tablet_id={}", req.job().idx().tablet_id());
361
    } else if constexpr (is_any_v<Request, UpdateDeleteBitmapRequest>) {
362
        return fmt::format(" tablet_id={}, lock_id={}", req.tablet_id(), req.lock_id());
363
431
    } else if constexpr (is_any_v<Request, GetDeleteBitmapUpdateLockRequest>) {
364
431
        return fmt::format(" table_id={}, lock_id={}", req.table_id(), req.lock_id());
365
    } else if constexpr (is_any_v<Request, GetTabletRequest>) {
366
        return fmt::format(" tablet_id={}", req.tablet_id());
367
    } else if constexpr (is_any_v<Request, GetObjStoreInfoRequest, ListSnapshotRequest,
368
                                  GetInstanceRequest, GetClusterStatusRequest>) {
369
        return "";
370
    } else if constexpr (is_any_v<Request, CreateRowsetRequest>) {
371
        return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id());
372
    } else if constexpr (is_any_v<Request, RemoveDeleteBitmapRequest>) {
373
        return fmt::format(" tablet_id={}", req.tablet_id());
374
    } else if constexpr (is_any_v<Request, RemoveDeleteBitmapUpdateLockRequest>) {
375
        return fmt::format(" table_id={}, tablet_id={}, lock_id={}", req.table_id(),
376
                           req.tablet_id(), req.lock_id());
377
    } else if constexpr (is_any_v<Request, GetDeleteBitmapRequest>) {
378
        return fmt::format(" tablet_id={}", req.tablet_id());
379
    } else if constexpr (is_any_v<Request, GetSchemaDictRequest>) {
380
        return fmt::format(" index_id={}", req.index_id());
381
    } else if constexpr (is_any_v<Request, RestoreJobRequest>) {
382
        return fmt::format(" tablet_id={}", req.tablet_id());
383
    } else if constexpr (is_any_v<Request, UpdatePackedFileInfoRequest>) {
384
        return fmt::format(" packed_file_path={}", req.packed_file_path());
385
    } else {
386
        static_assert(!sizeof(Request));
387
    }
388
431
}
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_35RemoveDeleteBitmapUpdateLockRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_19ListSnapshotRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_18GetInstanceRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_27UpdatePackedFileInfoRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_23GetClusterStatusRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
389
390
842k
inline std::default_random_engine make_random_engine() {
391
842k
    return std::default_random_engine(
392
842k
            static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
393
842k
}
394
395
// Convert MetaServiceRPC to LoadRelatedRpc
396
// Returns LoadRelatedRpc::COUNT if the RPC is not a load-related RPC
397
883k
LoadRelatedRpc to_load_related_rpc(MetaServiceRPC rpc) {
398
883k
    switch (rpc) {
399
383k
    case MetaServiceRPC::PREPARE_ROWSET:
400
383k
        return LoadRelatedRpc::PREPARE_ROWSET;
401
382k
    case MetaServiceRPC::COMMIT_ROWSET:
402
382k
        return LoadRelatedRpc::COMMIT_ROWSET;
403
38
    case MetaServiceRPC::UPDATE_TMP_ROWSET:
404
38
        return LoadRelatedRpc::UPDATE_TMP_ROWSET;
405
0
    case MetaServiceRPC::UPDATE_PACKED_FILE_INFO:
406
0
        return LoadRelatedRpc::UPDATE_PACKED_FILE_INFO;
407
116k
    case MetaServiceRPC::UPDATE_DELETE_BITMAP:
408
116k
        return LoadRelatedRpc::UPDATE_DELETE_BITMAP;
409
0
    default:
410
0
        return LoadRelatedRpc::COUNT; // Not a load-related RPC
411
883k
    }
412
883k
}
413
414
template <typename Request, typename Response>
415
using MetaServiceMethod = void (MetaService_Stub::*)(::google::protobuf::RpcController*,
416
                                                     const Request*, Response*,
417
                                                     ::google::protobuf::Closure*);
418
419
// Rate limiting context for retry_rpc
420
struct RpcRateLimitCtx {
421
    HostLevelMSRpcRateLimiters* host_limiters {nullptr};
422
    MSBackpressureHandler* backpressure_handler {nullptr};
423
    int64_t table_id {-1}; // For table-level backpressure, passed from caller
424
};
425
426
// Apply rate limiting before RPC (both host-level and table-level)
427
843k
void apply_rate_limit(MetaServiceRPC rpc, const RpcRateLimitCtx& ctx) {
428
    // Table-level rate limit (for load-related RPCs only)
429
843k
    if (ctx.backpressure_handler && ctx.table_id > 0) {
430
440k
        LoadRelatedRpc load_rpc = to_load_related_rpc(rpc);
431
440k
        if (load_rpc != LoadRelatedRpc::COUNT) {
432
438k
            auto wait_until = ctx.backpressure_handler->before_rpc(load_rpc, ctx.table_id);
433
438k
            auto now = std::chrono::steady_clock::now();
434
438k
            if (wait_until > now) {
435
0
                auto wait_us =
436
0
                        std::chrono::duration_cast<std::chrono::microseconds>(wait_until - now)
437
0
                                .count();
438
0
                if (wait_us > 0) {
439
0
                    if (auto* recorder = get_throttle_wait_recorder(load_rpc);
440
0
                        recorder != nullptr) {
441
0
                        *recorder << wait_us;
442
0
                    }
443
0
                    bthread_usleep(wait_us);
444
0
                }
445
0
            }
446
438k
        }
447
440k
    }
448
449
    // Host-level rate limit
450
844k
    if (ctx.host_limiters) {
451
844k
        ctx.host_limiters->limit(rpc);
452
844k
    }
453
843k
}
454
455
// Record RPC QPS statistics after RPC (for table-level tracking)
456
849k
void record_rpc_qps(MetaServiceRPC rpc, const RpcRateLimitCtx& ctx) {
457
849k
    if (ctx.backpressure_handler && ctx.table_id > 0) {
458
444k
        LoadRelatedRpc load_rpc = to_load_related_rpc(rpc);
459
444k
        if (load_rpc != LoadRelatedRpc::COUNT) {
460
444k
            ctx.backpressure_handler->after_rpc(load_rpc, ctx.table_id);
461
444k
        }
462
444k
    }
463
849k
}
464
465
template <typename Request, typename Response>
466
Status retry_rpc(MetaServiceRPC rpc, const Request& req, Response* res,
467
                 MetaServiceMethod<Request, Response> method,
468
843k
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
843k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
843k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
843k
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
843k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
843k
    int retry_times = 0;
478
843k
    uint32_t duration_ms = 0;
479
843k
    std::string error_msg;
480
843k
    std::default_random_engine rng = make_random_engine();
481
843k
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
843k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
843k
    MetaServiceProxy* proxy;
484
843k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
843k
    while (true) {
487
843k
        std::shared_ptr<MetaService_Stub> stub;
488
843k
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
843k
        apply_rate_limit(rpc, rate_limit_ctx);
492
843k
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
843k
        brpc::Controller cntl;
495
843k
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
843k
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
83.3k
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
759k
        } else {
499
759k
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
759k
        }
501
843k
        cntl.set_max_retry(kBrpcRetryTimes);
502
843k
        res->Clear();
503
843k
        int error_code = 0;
504
843k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
843k
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
843k
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
847k
        } else if (res->status().code() == MetaServiceCode::OK) {
514
847k
            return Status::OK();
515
18.4E
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
9
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
9
                                                                     res->status().msg());
518
18.4E
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
18.4E
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
941
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
941
                                                                   res->status().msg());
526
18.4E
        } else {
527
18.4E
            error_msg = res->status().msg();
528
18.4E
        }
529
530
18.4E
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
18.4E
        ++retry_times;
535
18.4E
        if (retry_times > config::meta_service_rpc_retry_times ||
536
18.4E
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
18.4E
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
18.4E
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
18.4E
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
18.4E
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
18.4E
        bthread_usleep(duration_ms * 1000);
547
18.4E
    }
548
18.4E
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
843k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_16GetTabletRequestENS0_17GetTabletResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
Line
Count
Source
468
326k
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
326k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
326k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
326k
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
326k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
326k
    int retry_times = 0;
478
326k
    uint32_t duration_ms = 0;
479
326k
    std::string error_msg;
480
326k
    std::default_random_engine rng = make_random_engine();
481
326k
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
326k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
326k
    MetaServiceProxy* proxy;
484
326k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
326k
    while (true) {
487
325k
        std::shared_ptr<MetaService_Stub> stub;
488
325k
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
325k
        apply_rate_limit(rpc, rate_limit_ctx);
492
325k
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
325k
        brpc::Controller cntl;
495
325k
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
326k
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
325k
        } else {
499
325k
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
325k
        }
501
325k
        cntl.set_max_retry(kBrpcRetryTimes);
502
325k
        res->Clear();
503
325k
        int error_code = 0;
504
325k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
325k
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
325k
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
325k
        } else if (res->status().code() == MetaServiceCode::OK) {
514
325k
            return Status::OK();
515
325k
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
0
                                                                     res->status().msg());
518
163
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
163
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
100
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
100
                                                                   res->status().msg());
526
100
        } else {
527
63
            error_msg = res->status().msg();
528
63
        }
529
530
63
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
63
        ++retry_times;
535
63
        if (retry_times > config::meta_service_rpc_retry_times ||
536
63
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
63
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
63
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
63
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
63
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
63
        bthread_usleep(duration_ms * 1000);
547
63
    }
548
302
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
326k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_22GetDeleteBitmapRequestENS0_23GetDeleteBitmapResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
Line
Count
Source
468
25.9k
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
25.9k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
25.9k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
25.9k
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
25.9k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
25.9k
    int retry_times = 0;
478
25.9k
    uint32_t duration_ms = 0;
479
25.9k
    std::string error_msg;
480
25.9k
    std::default_random_engine rng = make_random_engine();
481
25.9k
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
25.9k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
25.9k
    MetaServiceProxy* proxy;
484
25.9k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
25.9k
    while (true) {
487
25.9k
        std::shared_ptr<MetaService_Stub> stub;
488
25.9k
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
25.9k
        apply_rate_limit(rpc, rate_limit_ctx);
492
25.9k
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
25.9k
        brpc::Controller cntl;
495
25.9k
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
25.9k
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
25.9k
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
18.4E
        } else {
499
18.4E
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
18.4E
        }
501
25.9k
        cntl.set_max_retry(kBrpcRetryTimes);
502
25.9k
        res->Clear();
503
25.9k
        int error_code = 0;
504
25.9k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
25.9k
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
25.9k
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
26.0k
        } else if (res->status().code() == MetaServiceCode::OK) {
514
26.0k
            return Status::OK();
515
18.4E
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
0
                                                                     res->status().msg());
518
18.4E
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
18.4E
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
0
                                                                   res->status().msg());
526
18.4E
        } else {
527
18.4E
            error_msg = res->status().msg();
528
18.4E
        }
529
530
18.4E
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
18.4E
        ++retry_times;
535
18.4E
        if (retry_times > config::meta_service_rpc_retry_times ||
536
18.4E
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
18.4E
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
18.4E
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
18.4E
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
18.4E
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
18.4E
        bthread_usleep(duration_ms * 1000);
547
18.4E
    }
548
18.4E
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
25.9k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_19CreateRowsetRequestENS0_20CreateRowsetResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
Line
Count
Source
468
380k
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
380k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
380k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
380k
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
380k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
380k
    int retry_times = 0;
478
380k
    uint32_t duration_ms = 0;
479
380k
    std::string error_msg;
480
380k
    std::default_random_engine rng = make_random_engine();
481
380k
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
380k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
380k
    MetaServiceProxy* proxy;
484
380k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
380k
    while (true) {
487
380k
        std::shared_ptr<MetaService_Stub> stub;
488
380k
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
380k
        apply_rate_limit(rpc, rate_limit_ctx);
492
380k
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
380k
        brpc::Controller cntl;
495
380k
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
380k
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
380k
        } else {
499
380k
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
380k
        }
501
380k
        cntl.set_max_retry(kBrpcRetryTimes);
502
380k
        res->Clear();
503
380k
        int error_code = 0;
504
380k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
380k
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
380k
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
384k
        } else if (res->status().code() == MetaServiceCode::OK) {
514
384k
            return Status::OK();
515
18.4E
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
0
                                                                     res->status().msg());
518
18.4E
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
18.4E
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
2
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
2
                                                                   res->status().msg());
526
18.4E
        } else {
527
18.4E
            error_msg = res->status().msg();
528
18.4E
        }
529
530
18.4E
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
18.4E
        ++retry_times;
535
18.4E
        if (retry_times > config::meta_service_rpc_retry_times ||
536
18.4E
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
18.4E
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
18.4E
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
18.4E
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
18.4E
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
18.4E
        bthread_usleep(duration_ms * 1000);
547
18.4E
    }
548
18.4E
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
380k
}
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_16CommitTxnRequestENS0_17CommitTxnResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_15AbortTxnRequestENS0_16AbortTxnResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
Line
Count
Source
468
488
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
488
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
488
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
488
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
488
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
488
    int retry_times = 0;
478
488
    uint32_t duration_ms = 0;
479
488
    std::string error_msg;
480
488
    std::default_random_engine rng = make_random_engine();
481
488
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
488
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
488
    MetaServiceProxy* proxy;
484
488
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
488
    while (true) {
487
488
        std::shared_ptr<MetaService_Stub> stub;
488
488
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
488
        apply_rate_limit(rpc, rate_limit_ctx);
492
488
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
488
        brpc::Controller cntl;
495
488
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
488
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
488
        } else {
499
488
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
488
        }
501
488
        cntl.set_max_retry(kBrpcRetryTimes);
502
488
        res->Clear();
503
488
        int error_code = 0;
504
488
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
488
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
488
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
488
        } else if (res->status().code() == MetaServiceCode::OK) {
514
479
            return Status::OK();
515
479
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
0
                                                                     res->status().msg());
518
9
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
9
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
9
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
9
                                                                   res->status().msg());
526
9
        } else {
527
0
            error_msg = res->status().msg();
528
0
        }
529
530
0
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
0
        ++retry_times;
535
0
        if (retry_times > config::meta_service_rpc_retry_times ||
536
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
0
        bthread_usleep(duration_ms * 1000);
547
0
    }
548
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
488
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_19PrecommitTxnRequestENS0_20PrecommitTxnResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
Line
Count
Source
468
33
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
33
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
33
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
33
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
33
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
33
    int retry_times = 0;
478
33
    uint32_t duration_ms = 0;
479
33
    std::string error_msg;
480
33
    std::default_random_engine rng = make_random_engine();
481
33
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
33
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
33
    MetaServiceProxy* proxy;
484
33
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
33
    while (true) {
487
33
        std::shared_ptr<MetaService_Stub> stub;
488
33
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
33
        apply_rate_limit(rpc, rate_limit_ctx);
492
33
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
33
        brpc::Controller cntl;
495
33
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
33
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
33
        } else {
499
33
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
33
        }
501
33
        cntl.set_max_retry(kBrpcRetryTimes);
502
33
        res->Clear();
503
33
        int error_code = 0;
504
33
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
33
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
33
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
33
        } else if (res->status().code() == MetaServiceCode::OK) {
514
33
            return Status::OK();
515
33
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
0
                                                                     res->status().msg());
518
0
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
0
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
0
                                                                   res->status().msg());
526
0
        } else {
527
0
            error_msg = res->status().msg();
528
0
        }
529
530
0
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
0
        ++retry_times;
535
0
        if (retry_times > config::meta_service_rpc_retry_times ||
536
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
0
        bthread_usleep(duration_ms * 1000);
547
0
    }
548
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
33
}
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_17RestoreJobRequestENS0_18RestoreJobResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_22GetObjStoreInfoRequestENS0_23GetObjStoreInfoResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
Line
Count
Source
468
107
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
107
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
107
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
107
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
107
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
107
    int retry_times = 0;
478
107
    uint32_t duration_ms = 0;
479
107
    std::string error_msg;
480
107
    std::default_random_engine rng = make_random_engine();
481
107
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
107
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
107
    MetaServiceProxy* proxy;
484
107
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
107
    while (true) {
487
107
        std::shared_ptr<MetaService_Stub> stub;
488
107
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
107
        apply_rate_limit(rpc, rate_limit_ctx);
492
107
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
107
        brpc::Controller cntl;
495
107
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
107
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
107
        } else {
499
107
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
107
        }
501
107
        cntl.set_max_retry(kBrpcRetryTimes);
502
107
        res->Clear();
503
107
        int error_code = 0;
504
107
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
107
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
107
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
107
        } else if (res->status().code() == MetaServiceCode::OK) {
514
107
            return Status::OK();
515
107
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
0
                                                                     res->status().msg());
518
0
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
0
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
0
                                                                   res->status().msg());
526
0
        } else {
527
0
            error_msg = res->status().msg();
528
0
        }
529
530
0
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
0
        ++retry_times;
535
0
        if (retry_times > config::meta_service_rpc_retry_times ||
536
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
0
        bthread_usleep(duration_ms * 1000);
547
0
    }
548
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
107
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_21StartTabletJobRequestENS0_22StartTabletJobResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
Line
Count
Source
468
18.3k
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
18.3k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
18.3k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
18.3k
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
18.3k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
18.3k
    int retry_times = 0;
478
18.3k
    uint32_t duration_ms = 0;
479
18.3k
    std::string error_msg;
480
18.3k
    std::default_random_engine rng = make_random_engine();
481
18.3k
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
18.3k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
18.3k
    MetaServiceProxy* proxy;
484
18.3k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
18.4k
    while (true) {
487
18.4k
        std::shared_ptr<MetaService_Stub> stub;
488
18.4k
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
18.4k
        apply_rate_limit(rpc, rate_limit_ctx);
492
18.4k
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
18.4k
        brpc::Controller cntl;
495
18.4k
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
18.4k
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
18.4k
        } else {
499
18.4k
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
18.4k
        }
501
18.4k
        cntl.set_max_retry(kBrpcRetryTimes);
502
18.4k
        res->Clear();
503
18.4k
        int error_code = 0;
504
18.4k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
18.4k
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
18.4k
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
18.4k
        } else if (res->status().code() == MetaServiceCode::OK) {
514
18.2k
            return Status::OK();
515
18.2k
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
0
                                                                     res->status().msg());
518
195
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
299
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
299
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
299
                                                                   res->status().msg());
526
18.4E
        } else {
527
18.4E
            error_msg = res->status().msg();
528
18.4E
        }
529
530
18.4E
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
18.4E
        ++retry_times;
535
18.4E
        if (retry_times > config::meta_service_rpc_retry_times ||
536
18.4E
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
18.4E
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
18.4E
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
18.4E
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
18.4E
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
18.4E
        bthread_usleep(duration_ms * 1000);
547
18.4E
    }
548
18.4E
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
18.3k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_22FinishTabletJobRequestENS0_23FinishTabletJobResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
Line
Count
Source
468
16.8k
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
16.8k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
16.8k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
16.8k
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
16.8k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
16.8k
    int retry_times = 0;
478
16.8k
    uint32_t duration_ms = 0;
479
16.8k
    std::string error_msg;
480
16.8k
    std::default_random_engine rng = make_random_engine();
481
16.8k
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
16.8k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
16.8k
    MetaServiceProxy* proxy;
484
16.8k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
17.1k
    while (true) {
487
17.1k
        std::shared_ptr<MetaService_Stub> stub;
488
17.1k
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
17.1k
        apply_rate_limit(rpc, rate_limit_ctx);
492
17.1k
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
17.1k
        brpc::Controller cntl;
495
17.1k
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
17.1k
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
17.1k
        } else {
499
17.1k
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
17.1k
        }
501
17.1k
        cntl.set_max_retry(kBrpcRetryTimes);
502
17.1k
        res->Clear();
503
17.1k
        int error_code = 0;
504
17.1k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
17.1k
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
17.1k
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
17.1k
        } else if (res->status().code() == MetaServiceCode::OK) {
514
17.1k
            return Status::OK();
515
18.4E
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
9
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
9
                                                                     res->status().msg());
518
18.4E
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
18.4E
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
52
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
52
                                                                   res->status().msg());
526
18.4E
        } else {
527
18.4E
            error_msg = res->status().msg();
528
18.4E
        }
529
530
18.4E
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
18.4E
        ++retry_times;
535
18.4E
        if (retry_times > config::meta_service_rpc_retry_times ||
536
18.4E
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
18.4E
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
18.4E
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
18.4E
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
18.4E
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
18.4E
        bthread_usleep(duration_ms * 1000);
547
18.4E
    }
548
18.4E
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
16.8k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_25UpdateDeleteBitmapRequestENS0_26UpdateDeleteBitmapResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
Line
Count
Source
468
57.0k
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
57.0k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
57.0k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
57.0k
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
57.0k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
57.0k
    int retry_times = 0;
478
57.0k
    uint32_t duration_ms = 0;
479
57.0k
    std::string error_msg;
480
57.0k
    std::default_random_engine rng = make_random_engine();
481
57.0k
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
57.0k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
57.0k
    MetaServiceProxy* proxy;
484
57.0k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
57.7k
    while (true) {
487
57.7k
        std::shared_ptr<MetaService_Stub> stub;
488
57.7k
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
57.7k
        apply_rate_limit(rpc, rate_limit_ctx);
492
57.7k
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
57.7k
        brpc::Controller cntl;
495
57.7k
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
57.7k
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
57.3k
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
57.3k
        } else {
499
312
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
312
        }
501
57.7k
        cntl.set_max_retry(kBrpcRetryTimes);
502
57.7k
        res->Clear();
503
57.7k
        int error_code = 0;
504
57.7k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
57.7k
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
57.7k
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
59.0k
        } else if (res->status().code() == MetaServiceCode::OK) {
514
59.0k
            return Status::OK();
515
18.4E
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
0
                                                                     res->status().msg());
518
18.4E
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
18.4E
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
7
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
7
                                                                   res->status().msg());
526
18.4E
        } else {
527
18.4E
            error_msg = res->status().msg();
528
18.4E
        }
529
530
18.4E
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
18.4E
        ++retry_times;
535
18.4E
        if (retry_times > config::meta_service_rpc_retry_times ||
536
18.4E
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
18.4E
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
18.4E
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
18.4E
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
18.4E
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
18.4E
        bthread_usleep(duration_ms * 1000);
547
18.4E
    }
548
18.4E
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
57.0k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_32GetDeleteBitmapUpdateLockRequestENS0_33GetDeleteBitmapUpdateLockResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
Line
Count
Source
468
4.76k
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
4.76k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
4.76k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
4.76k
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
4.76k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
4.76k
    int retry_times = 0;
478
4.76k
    uint32_t duration_ms = 0;
479
4.76k
    std::string error_msg;
480
4.76k
    std::default_random_engine rng = make_random_engine();
481
4.76k
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
4.76k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
4.76k
    MetaServiceProxy* proxy;
484
4.76k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
4.76k
    while (true) {
487
4.76k
        std::shared_ptr<MetaService_Stub> stub;
488
4.76k
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
4.76k
        apply_rate_limit(rpc, rate_limit_ctx);
492
4.76k
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
4.76k
        brpc::Controller cntl;
495
4.76k
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
4.76k
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
4.76k
        } else {
499
4.76k
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
4.76k
        }
501
4.76k
        cntl.set_max_retry(kBrpcRetryTimes);
502
4.76k
        res->Clear();
503
4.76k
        int error_code = 0;
504
4.76k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
4.76k
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
4.76k
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
4.76k
        } else if (res->status().code() == MetaServiceCode::OK) {
514
4.33k
            return Status::OK();
515
4.33k
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
0
                                                                     res->status().msg());
518
430
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
431
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
431
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
431
                                                                   res->status().msg());
526
18.4E
        } else {
527
18.4E
            error_msg = res->status().msg();
528
18.4E
        }
529
530
18.4E
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
18.4E
        ++retry_times;
535
18.4E
        if (retry_times > config::meta_service_rpc_retry_times ||
536
18.4E
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
18.4E
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
18.4E
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
18.4E
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
18.4E
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
18.4E
        bthread_usleep(duration_ms * 1000);
547
18.4E
    }
548
18.4E
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
4.76k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_35RemoveDeleteBitmapUpdateLockRequestENS0_36RemoveDeleteBitmapUpdateLockResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
Line
Count
Source
468
40
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
40
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
40
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
40
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
40
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
40
    int retry_times = 0;
478
40
    uint32_t duration_ms = 0;
479
40
    std::string error_msg;
480
40
    std::default_random_engine rng = make_random_engine();
481
40
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
40
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
40
    MetaServiceProxy* proxy;
484
40
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
40
    while (true) {
487
40
        std::shared_ptr<MetaService_Stub> stub;
488
40
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
40
        apply_rate_limit(rpc, rate_limit_ctx);
492
40
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
40
        brpc::Controller cntl;
495
40
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
40
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
40
        } else {
499
40
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
40
        }
501
40
        cntl.set_max_retry(kBrpcRetryTimes);
502
40
        res->Clear();
503
40
        int error_code = 0;
504
40
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
40
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
40
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
40
        } else if (res->status().code() == MetaServiceCode::OK) {
514
1
            return Status::OK();
515
39
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
0
                                                                     res->status().msg());
518
39
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
39
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
39
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
39
                                                                   res->status().msg());
526
39
        } else {
527
0
            error_msg = res->status().msg();
528
0
        }
529
530
0
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
0
        ++retry_times;
535
0
        if (retry_times > config::meta_service_rpc_retry_times ||
536
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
0
        bthread_usleep(duration_ms * 1000);
547
0
    }
548
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
40
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_19ListSnapshotRequestENS0_20ListSnapshotResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
Line
Count
Source
468
2
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
2
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
2
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
2
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
2
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
2
    int retry_times = 0;
478
2
    uint32_t duration_ms = 0;
479
2
    std::string error_msg;
480
2
    std::default_random_engine rng = make_random_engine();
481
2
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
2
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
2
    MetaServiceProxy* proxy;
484
2
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
2
    while (true) {
487
2
        std::shared_ptr<MetaService_Stub> stub;
488
2
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
2
        apply_rate_limit(rpc, rate_limit_ctx);
492
2
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
2
        brpc::Controller cntl;
495
2
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
2
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
2
        } else {
499
2
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
2
        }
501
2
        cntl.set_max_retry(kBrpcRetryTimes);
502
2
        res->Clear();
503
2
        int error_code = 0;
504
2
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
2
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
2
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
2
        } else if (res->status().code() == MetaServiceCode::OK) {
514
0
            return Status::OK();
515
2
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
0
                                                                     res->status().msg());
518
2
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
2
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
2
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
2
                                                                   res->status().msg());
526
2
        } else {
527
0
            error_msg = res->status().msg();
528
0
        }
529
530
0
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
0
        ++retry_times;
535
0
        if (retry_times > config::meta_service_rpc_retry_times ||
536
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
0
        bthread_usleep(duration_ms * 1000);
547
0
    }
548
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
2
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_18GetInstanceRequestENS0_19GetInstanceResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
Line
Count
Source
468
1
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
1
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
1
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
1
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
1
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
1
    int retry_times = 0;
478
1
    uint32_t duration_ms = 0;
479
1
    std::string error_msg;
480
1
    std::default_random_engine rng = make_random_engine();
481
1
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
1
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
1
    MetaServiceProxy* proxy;
484
1
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
1
    while (true) {
487
1
        std::shared_ptr<MetaService_Stub> stub;
488
1
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
1
        apply_rate_limit(rpc, rate_limit_ctx);
492
1
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
1
        brpc::Controller cntl;
495
1
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
1
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
1
        } else {
499
1
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
1
        }
501
1
        cntl.set_max_retry(kBrpcRetryTimes);
502
1
        res->Clear();
503
1
        int error_code = 0;
504
1
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
1
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
1
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
1
        } else if (res->status().code() == MetaServiceCode::OK) {
514
1
            return Status::OK();
515
1
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
0
                                                                     res->status().msg());
518
0
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
0
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
0
                                                                   res->status().msg());
526
0
        } else {
527
0
            error_msg = res->status().msg();
528
0
        }
529
530
0
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
0
        ++retry_times;
535
0
        if (retry_times > config::meta_service_rpc_retry_times ||
536
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
0
        bthread_usleep(duration_ms * 1000);
547
0
    }
548
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
1
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_27UpdatePackedFileInfoRequestENS0_28UpdatePackedFileInfoResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
Line
Count
Source
468
12.2k
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
12.2k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
12.2k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
12.2k
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
12.2k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
12.2k
    int retry_times = 0;
478
12.2k
    uint32_t duration_ms = 0;
479
12.2k
    std::string error_msg;
480
12.2k
    std::default_random_engine rng = make_random_engine();
481
12.2k
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
12.2k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
12.2k
    MetaServiceProxy* proxy;
484
12.2k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
12.2k
    while (true) {
487
12.2k
        std::shared_ptr<MetaService_Stub> stub;
488
12.2k
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
12.2k
        apply_rate_limit(rpc, rate_limit_ctx);
492
12.2k
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
12.2k
        brpc::Controller cntl;
495
12.2k
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
12.2k
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
12.2k
        } else {
499
12.2k
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
12.2k
        }
501
12.2k
        cntl.set_max_retry(kBrpcRetryTimes);
502
12.2k
        res->Clear();
503
12.2k
        int error_code = 0;
504
12.2k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
12.2k
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
12.2k
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
12.2k
        } else if (res->status().code() == MetaServiceCode::OK) {
514
12.2k
            return Status::OK();
515
12.2k
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
0
                                                                     res->status().msg());
518
0
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
0
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
0
                                                                   res->status().msg());
526
0
        } else {
527
0
            error_msg = res->status().msg();
528
0
        }
529
530
0
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
0
        ++retry_times;
535
0
        if (retry_times > config::meta_service_rpc_retry_times ||
536
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
0
        bthread_usleep(duration_ms * 1000);
547
0
    }
548
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
12.2k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_23GetClusterStatusRequestENS0_24GetClusterStatusResponseEEENS_6StatusENS0_14MetaServiceRPCERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPS8_SB_PNSE_7ClosureEERKNS1_15RpcRateLimitCtxE
Line
Count
Source
468
57
                 const RpcRateLimitCtx& rate_limit_ctx = {}) {
469
57
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
470
57
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
471
472
57
    std::string_view op_name = meta_service_rpc_display_name(rpc);
473
474
    // Applies only to the current file, and all req are non-const, but passed as const types.
475
57
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
476
477
57
    int retry_times = 0;
478
57
    uint32_t duration_ms = 0;
479
57
    std::string error_msg;
480
57
    std::default_random_engine rng = make_random_engine();
481
57
    std::uniform_int_distribution<uint32_t> u(20, 200);
482
57
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
483
57
    MetaServiceProxy* proxy;
484
57
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
485
486
57
    while (true) {
487
57
        std::shared_ptr<MetaService_Stub> stub;
488
57
        RETURN_IF_ERROR(proxy->get(&stub));
489
490
        // Apply rate limiting (both host-level and table-level)
491
57
        apply_rate_limit(rpc, rate_limit_ctx);
492
57
        TEST_SYNC_POINT_CALLBACK("retry_rpc::after_rate_limit", &rpc);
493
494
57
        brpc::Controller cntl;
495
57
        if (rpc == MetaServiceRPC::GET_DELETE_BITMAP ||
496
57
            rpc == MetaServiceRPC::UPDATE_DELETE_BITMAP) {
497
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
498
57
        } else {
499
57
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
500
57
        }
501
57
        cntl.set_max_retry(kBrpcRetryTimes);
502
57
        res->Clear();
503
57
        int error_code = 0;
504
57
        (stub.get()->*method)(&cntl, &req, res, nullptr);
505
506
        // Record QPS statistics for all RPCs sent to MS (success or failure)
507
57
        record_rpc_qps(rpc, rate_limit_ctx);
508
509
57
        if (cntl.Failed()) [[unlikely]] {
510
0
            error_msg = cntl.ErrorText();
511
0
            error_code = cntl.ErrorCode();
512
0
            proxy->set_unhealthy();
513
57
        } else if (res->status().code() == MetaServiceCode::OK) {
514
57
            return Status::OK();
515
57
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
516
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
517
0
                                                                     res->status().msg());
518
0
        } else if (res->status().code() == MetaServiceCode::MS_TOO_BUSY) {
519
            // MS_BUSY should also be retried
520
0
            if (rate_limit_ctx.backpressure_handler) {
521
0
                rate_limit_ctx.backpressure_handler->on_ms_busy();
522
0
            }
523
0
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
524
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
525
0
                                                                   res->status().msg());
526
0
        } else {
527
0
            error_msg = res->status().msg();
528
0
        }
529
530
0
        if (error_code == brpc::ERPCTIMEDOUT) {
531
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
532
0
        }
533
534
0
        ++retry_times;
535
0
        if (retry_times > config::meta_service_rpc_retry_times ||
536
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
537
0
             error_code == brpc::ERPCTIMEDOUT) ||
538
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
539
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
540
0
            break;
541
0
        }
542
543
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
544
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
545
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
546
0
        bthread_usleep(duration_ms * 1000);
547
0
    }
548
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
549
57
}
550
551
} // namespace
552
553
326k
Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta) {
554
326k
    VLOG_DEBUG << "send GetTabletRequest, tablet_id: " << tablet_id;
555
326k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::get_tablet_meta", Status::OK(), tablet_id,
556
326k
                                      tablet_meta);
557
326k
    GetTabletRequest req;
558
326k
    GetTabletResponse resp;
559
326k
    req.set_cloud_unique_id(config::cloud_unique_id);
560
326k
    req.set_tablet_id(tablet_id);
561
326k
    Status st =
562
326k
            retry_rpc(MetaServiceRPC::GET_TABLET_META, req, &resp, &MetaService_Stub::get_tablet,
563
326k
                      {
564
326k
                              .host_limiters = host_level_ms_rpc_rate_limiters_,
565
326k
                              .backpressure_handler = ms_backpressure_handler_,
566
326k
                      });
567
326k
    if (!st.ok()) {
568
100
        if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
569
100
            return Status::NotFound("failed to get tablet meta: {}", resp.status().msg());
570
100
        }
571
0
        return st;
572
100
    }
573
574
326k
    *tablet_meta = std::make_shared<TabletMeta>();
575
326k
    (*tablet_meta)
576
326k
            ->init_from_pb(cloud_tablet_meta_to_doris(std::move(*resp.mutable_tablet_meta())));
577
326k
    VLOG_DEBUG << "get tablet meta, tablet_id: " << (*tablet_meta)->tablet_id();
578
326k
    return Status::OK();
579
326k
}
580
581
Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, const SyncOptions& options,
582
125k
                                         SyncRowsetStats* sync_stats) {
583
125k
    std::unique_lock lock {tablet->get_sync_meta_lock()};
584
125k
    return sync_tablet_rowsets_unlocked(tablet, lock, options, sync_stats);
585
125k
}
586
587
Status CloudMetaMgr::_log_mow_delete_bitmap(CloudTablet* tablet, GetRowsetResponse& resp,
588
                                            DeleteBitmap& delete_bitmap, int64_t old_max_version,
589
37.3k
                                            bool full_sync, int32_t read_version) {
590
37.3k
    if (config::enable_mow_verbose_log && !resp.rowset_meta().empty() &&
591
37.3k
        delete_bitmap.cardinality() > 0) {
592
0
        int64_t tablet_id = tablet->tablet_id();
593
0
        std::vector<std::string> new_rowset_msgs;
594
0
        std::vector<std::string> old_rowset_msgs;
595
0
        std::unordered_set<RowsetId> new_rowset_ids;
596
0
        int64_t new_max_version = resp.rowset_meta().rbegin()->end_version();
597
0
        for (const auto& rs : resp.rowset_meta()) {
598
0
            RowsetId rowset_id;
599
0
            rowset_id.init(rs.rowset_id_v2());
600
0
            new_rowset_ids.insert(rowset_id);
601
0
            DeleteBitmap rowset_dbm(tablet_id);
602
0
            delete_bitmap.subset({rowset_id, 0, 0},
603
0
                                 {rowset_id, std::numeric_limits<DeleteBitmap::SegmentId>::max(),
604
0
                                  std::numeric_limits<DeleteBitmap::Version>::max()},
605
0
                                 &rowset_dbm);
606
0
            size_t cardinality = rowset_dbm.cardinality();
607
0
            size_t count = rowset_dbm.get_delete_bitmap_count();
608
0
            if (cardinality > 0) {
609
0
                new_rowset_msgs.push_back(fmt::format("({}[{}-{}],{},{})", rs.rowset_id_v2(),
610
0
                                                      rs.start_version(), rs.end_version(), count,
611
0
                                                      cardinality));
612
0
            }
613
0
        }
614
615
0
        if (old_max_version > 0) {
616
0
            std::vector<RowsetSharedPtr> old_rowsets;
617
0
            RowsetIdUnorderedSet old_rowset_ids;
618
0
            {
619
0
                std::lock_guard<std::shared_mutex> rlock(tablet->get_header_lock());
620
0
                RETURN_IF_ERROR(tablet->get_all_rs_id_unlocked(old_max_version, &old_rowset_ids));
621
0
                old_rowsets = tablet->get_rowset_by_ids(&old_rowset_ids);
622
0
            }
623
0
            for (const auto& rs : old_rowsets) {
624
0
                if (!new_rowset_ids.contains(rs->rowset_id())) {
625
0
                    DeleteBitmap rowset_dbm(tablet_id);
626
0
                    delete_bitmap.subset(
627
0
                            {rs->rowset_id(), 0, 0},
628
0
                            {rs->rowset_id(), std::numeric_limits<DeleteBitmap::SegmentId>::max(),
629
0
                             std::numeric_limits<DeleteBitmap::Version>::max()},
630
0
                            &rowset_dbm);
631
0
                    size_t cardinality = rowset_dbm.cardinality();
632
0
                    size_t count = rowset_dbm.get_delete_bitmap_count();
633
0
                    if (cardinality > 0) {
634
0
                        old_rowset_msgs.push_back(
635
0
                                fmt::format("({}{},{},{})", rs->rowset_id().to_string(),
636
0
                                            rs->version().to_string(), count, cardinality));
637
0
                    }
638
0
                }
639
0
            }
640
0
        }
641
642
0
        std::string tablet_info = fmt::format(
643
0
                "tablet_id={} table_id={} index_id={} partition_id={}", tablet->tablet_id(),
644
0
                tablet->table_id(), tablet->index_id(), tablet->partition_id());
645
0
        LOG_INFO("[verbose] sync tablet delete bitmap " + tablet_info)
646
0
                .tag("full_sync", full_sync)
647
0
                .tag("read_version", read_version)
648
0
                .tag("old_max_version", old_max_version)
649
0
                .tag("new_max_version", new_max_version)
650
0
                .tag("cumu_compaction_cnt", resp.stats().cumulative_compaction_cnt())
651
0
                .tag("base_compaction_cnt", resp.stats().base_compaction_cnt())
652
0
                .tag("cumu_point", resp.stats().cumulative_point())
653
0
                .tag("rowset_num", resp.rowset_meta().size())
654
0
                .tag("delete_bitmap_cardinality", delete_bitmap.cardinality())
655
0
                .tag("old_rowsets(rowset,count,cardinality)",
656
0
                     fmt::format("[{}]", fmt::join(old_rowset_msgs, ", ")))
657
0
                .tag("new_rowsets(rowset,count,cardinality)",
658
0
                     fmt::format("[{}]", fmt::join(new_rowset_msgs, ", ")));
659
0
    }
660
37.3k
    return Status::OK();
661
37.3k
}
662
663
Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
664
                                                  std::unique_lock<bthread::Mutex>& lock,
665
                                                  const SyncOptions& options,
666
160k
                                                  SyncRowsetStats* sync_stats) {
667
160k
    using namespace std::chrono;
668
669
160k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet);
670
160k
    DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.before.inject_error", {
671
160k
        auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
672
160k
        auto target_table_id = dp->param<int64_t>("table_id", -1);
673
160k
        if (target_tablet_id == tablet->tablet_id() || target_table_id == tablet->table_id()) {
674
160k
            return Status::InternalError(
675
160k
                    "[sync_tablet_rowsets_unlocked] injected error for testing");
676
160k
        }
677
160k
    });
678
679
160k
    MetaServiceProxy* proxy;
680
160k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
681
160k
    std::string tablet_info =
682
160k
            fmt::format("tablet_id={} table_id={} index_id={} partition_id={}", tablet->tablet_id(),
683
160k
                        tablet->table_id(), tablet->index_id(), tablet->partition_id());
684
160k
    int tried = 0;
685
160k
    while (true) {
686
159k
        std::shared_ptr<MetaService_Stub> stub;
687
159k
        RETURN_IF_ERROR(proxy->get(&stub));
688
159k
        brpc::Controller cntl;
689
159k
        cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
690
159k
        GetRowsetRequest req;
691
159k
        GetRowsetResponse resp;
692
693
159k
        int64_t tablet_id = tablet->tablet_id();
694
159k
        int64_t table_id = tablet->table_id();
695
159k
        int64_t index_id = tablet->index_id();
696
159k
        req.set_cloud_unique_id(config::cloud_unique_id);
697
159k
        auto* idx = req.mutable_idx();
698
159k
        idx->set_tablet_id(tablet_id);
699
159k
        idx->set_table_id(table_id);
700
159k
        idx->set_index_id(index_id);
701
159k
        idx->set_partition_id(tablet->partition_id());
702
159k
        {
703
159k
            auto lock_start = std::chrono::steady_clock::now();
704
159k
            std::shared_lock rlock(tablet->get_header_lock());
705
159k
            if (sync_stats) {
706
14.9k
                sync_stats->meta_lock_wait_ns +=
707
14.9k
                        std::chrono::duration_cast<std::chrono::nanoseconds>(
708
14.9k
                                std::chrono::steady_clock::now() - lock_start)
709
14.9k
                                .count();
710
14.9k
            }
711
159k
            if (options.full_sync) {
712
2
                req.set_start_version(0);
713
159k
            } else {
714
159k
                req.set_start_version(tablet->max_version_unlocked() + 1);
715
159k
            }
716
159k
            req.set_base_compaction_cnt(tablet->base_compaction_cnt());
717
159k
            req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt());
718
159k
            req.set_full_compaction_cnt(tablet->full_compaction_cnt());
719
159k
            req.set_cumulative_point(tablet->cumulative_layer_point());
720
159k
        }
721
159k
        req.set_end_version(-1);
722
159k
        VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString();
723
724
        // Host-level rate limiting for get_rowset
725
159k
        if (host_level_ms_rpc_rate_limiters_) {
726
158k
            host_level_ms_rpc_rate_limiters_->limit(MetaServiceRPC::GET_ROWSET);
727
158k
        }
728
729
159k
        auto start = std::chrono::steady_clock::now();
730
159k
        stub->get_rowset(&cntl, &req, &resp, nullptr);
731
159k
        auto end = std::chrono::steady_clock::now();
732
159k
        int64_t latency = cntl.latency_us();
733
159k
        _get_rowset_latency << latency;
734
159k
        int retry_times = config::meta_service_rpc_retry_times;
735
159k
        if (cntl.Failed()) {
736
0
            proxy->set_unhealthy();
737
0
            if (tried++ < retry_times) {
738
0
                auto rng = make_random_engine();
739
0
                std::uniform_int_distribution<uint32_t> u(20, 200);
740
0
                std::uniform_int_distribution<uint32_t> u1(500, 1000);
741
0
                uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng);
742
0
                bthread_usleep(duration_ms * 1000);
743
0
                LOG_INFO("failed to get rowset meta, " + tablet_info)
744
0
                        .tag("reason", cntl.ErrorText())
745
0
                        .tag("tried", tried)
746
0
                        .tag("sleep", duration_ms);
747
0
                continue;
748
0
            }
749
0
            return Status::RpcError("failed to get rowset meta: {}", cntl.ErrorText());
750
0
        }
751
159k
        if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
752
0
            LOG(WARNING) << "failed to get rowset meta, err=" << resp.status().msg() << " "
753
0
                         << tablet_info;
754
0
            return Status::NotFound("failed to get rowset meta: {}, {}", resp.status().msg(),
755
0
                                    tablet_info);
756
0
        }
757
159k
        if (resp.status().code() == MetaServiceCode::MS_TOO_BUSY) {
758
            // MS_BUSY should also be retried
759
0
            if (ms_backpressure_handler_) {
760
0
                ms_backpressure_handler_->on_ms_busy();
761
0
            }
762
0
            if (tried++ < retry_times) {
763
0
                auto rng = make_random_engine();
764
0
                std::uniform_int_distribution<uint32_t> u(20, 200);
765
0
                std::uniform_int_distribution<uint32_t> u1(500, 1000);
766
0
                uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng);
767
0
                bthread_usleep(duration_ms * 1000);
768
0
                LOG_INFO("meta service is too busy when getting rowset meta, " + tablet_info)
769
0
                        .tag("reason", resp.status().msg())
770
0
                        .tag("tried", tried)
771
0
                        .tag("sleep", duration_ms);
772
0
                continue;
773
0
            }
774
0
            return Status::RpcError("failed to get rowset meta: {}", resp.status().msg());
775
0
        }
776
159k
        if (resp.status().code() != MetaServiceCode::OK) {
777
0
            LOG(WARNING) << " failed to get rowset meta, err=" << resp.status().msg() << " "
778
0
                         << tablet_info;
779
0
            return Status::InternalError("failed to get rowset meta: {}, {}", resp.status().msg(),
780
0
                                         tablet_info);
781
0
        }
782
159k
        if (latency > 100 * 1000) { // 100ms
783
62
            LOG(INFO) << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size()
784
62
                      << ", latency=" << latency << "us"
785
62
                      << " " << tablet_info;
786
159k
        } else {
787
159k
            LOG_EVERY_N(INFO, 100)
788
1.60k
                    << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size()
789
1.60k
                    << ", latency=" << latency << "us"
790
1.60k
                    << " " << tablet_info;
791
159k
        }
792
793
159k
        int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
794
159k
        tablet->last_sync_time_s = now;
795
796
159k
        if (sync_stats) {
797
14.9k
            sync_stats->get_remote_rowsets_rpc_ns +=
798
14.9k
                    std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
799
14.9k
            sync_stats->get_remote_rowsets_num += resp.rowset_meta().size();
800
14.9k
        }
801
802
        // If is mow, the tablet has no delete bitmap in base rowsets.
803
        // So dont need to sync it.
804
160k
        if (options.sync_delete_bitmap && tablet->enable_unique_key_merge_on_write() &&
805
159k
            tablet->tablet_state() == TABLET_RUNNING) {
806
37.3k
            DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.sync_tablet_delete_bitmap.block",
807
37.3k
                            DBUG_BLOCK);
808
37.3k
            DeleteBitmap delete_bitmap(tablet_id);
809
37.3k
            int64_t old_max_version = req.start_version() - 1;
810
37.3k
            auto read_version = config::delete_bitmap_store_read_version;
811
37.3k
            auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(),
812
37.3k
                                                resp.stats(), req.idx(), &delete_bitmap,
813
37.3k
                                                options.full_sync, sync_stats, read_version, false);
814
37.3k
            if (st.is<ErrorCode::ROWSETS_EXPIRED>() && tried++ < retry_times) {
815
0
                LOG_INFO("rowset meta is expired, need to retry, " + tablet_info)
816
0
                        .tag("tried", tried)
817
0
                        .error(st);
818
0
                continue;
819
0
            }
820
37.3k
            if (!st.ok()) {
821
0
                LOG_WARNING("failed to get delete bitmap, " + tablet_info).error(st);
822
0
                return st;
823
0
            }
824
37.3k
            tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap);
825
37.3k
            RETURN_IF_ERROR(_log_mow_delete_bitmap(tablet, resp, delete_bitmap, old_max_version,
826
37.3k
                                                   options.full_sync, read_version));
827
37.3k
            RETURN_IF_ERROR(
828
37.3k
                    _check_delete_bitmap_v2_correctness(tablet, req, resp, old_max_version));
829
37.3k
        }
830
159k
        DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.before.modify_tablet_meta", {
831
159k
            auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
832
159k
            if (target_tablet_id == tablet->tablet_id()) {
833
159k
                DBUG_BLOCK
834
159k
            }
835
159k
        });
836
159k
        {
837
159k
            const auto& stats = resp.stats();
838
159k
            auto lock_start = std::chrono::steady_clock::now();
839
159k
            std::unique_lock wlock(tablet->get_header_lock());
840
159k
            if (sync_stats) {
841
14.9k
                sync_stats->meta_lock_wait_ns +=
842
14.9k
                        std::chrono::duration_cast<std::chrono::nanoseconds>(
843
14.9k
                                std::chrono::steady_clock::now() - lock_start)
844
14.9k
                                .count();
845
14.9k
            }
846
847
            // ATTN: we are facing following data race
848
            //
849
            // resp_base_compaction_cnt=0|base_compaction_cnt=0|resp_cumulative_compaction_cnt=0|cumulative_compaction_cnt=1|resp_max_version=11|max_version=8
850
            //
851
            //   BE-compaction-thread                 meta-service                                     BE-query-thread
852
            //            |                                |                                                |
853
            //    local   |    commit cumu-compaction      |                                                |
854
            //   cc_cnt=0 |  --------------------------->  |     sync rowset (long rpc, local cc_cnt=0 )    |   local
855
            //            |                                |  <-----------------------------------------    |  cc_cnt=0
856
            //            |                                |  -.                                            |
857
            //    local   |       done cc_cnt=1            |    \                                           |
858
            //   cc_cnt=1 |  <---------------------------  |     \                                          |
859
            //            |                                |      \  returned with resp cc_cnt=0 (snapshot) |
860
            //            |                                |       '------------------------------------>   |   local
861
            //            |                                |                                                |  cc_cnt=1
862
            //            |                                |                                                |
863
            //            |                                |                                                |  CHECK FAIL
864
            //            |                                |                                                |  need retry
865
            // To get rid of just retry syncing tablet
866
159k
            if (stats.base_compaction_cnt() < tablet->base_compaction_cnt() ||
867
160k
                stats.cumulative_compaction_cnt() < tablet->cumulative_compaction_cnt())
868
0
                    [[unlikely]] {
869
                // stale request, ignore
870
0
                LOG_WARNING("stale get rowset meta request " + tablet_info)
871
0
                        .tag("resp_base_compaction_cnt", stats.base_compaction_cnt())
872
0
                        .tag("base_compaction_cnt", tablet->base_compaction_cnt())
873
0
                        .tag("resp_cumulative_compaction_cnt", stats.cumulative_compaction_cnt())
874
0
                        .tag("cumulative_compaction_cnt", tablet->cumulative_compaction_cnt())
875
0
                        .tag("tried", tried);
876
0
                if (tried++ < 10) continue;
877
0
                return Status::OK();
878
0
            }
879
159k
            std::vector<RowsetSharedPtr> rowsets;
880
159k
            rowsets.reserve(resp.rowset_meta().size());
881
159k
            for (const auto& cloud_rs_meta_pb : resp.rowset_meta()) {
882
129k
                VLOG_DEBUG << "get rowset meta, tablet_id=" << cloud_rs_meta_pb.tablet_id()
883
167
                           << ", version=[" << cloud_rs_meta_pb.start_version() << '-'
884
167
                           << cloud_rs_meta_pb.end_version() << ']';
885
129k
                auto existed_rowset = tablet->get_rowset_by_version(
886
129k
                        {cloud_rs_meta_pb.start_version(), cloud_rs_meta_pb.end_version()});
887
129k
                if (existed_rowset &&
888
129k
                    existed_rowset->rowset_id().to_string() == cloud_rs_meta_pb.rowset_id_v2()) {
889
0
                    continue; // Same rowset, skip it
890
0
                }
891
129k
                RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris(cloud_rs_meta_pb);
892
129k
                auto rs_meta = std::make_shared<RowsetMeta>();
893
129k
                rs_meta->init_from_pb(meta_pb);
894
129k
                RowsetSharedPtr rowset;
895
                // schema is nullptr implies using RowsetMeta.tablet_schema
896
129k
                Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset);
897
129k
                if (!s.ok()) {
898
0
                    LOG_WARNING("create rowset").tag("status", s);
899
0
                    return s;
900
0
                }
901
129k
                rowsets.push_back(std::move(rowset));
902
129k
            }
903
159k
            if (!rowsets.empty()) {
904
                // `rowsets.empty()` could happen after doing EMPTY_CUMULATIVE compaction. e.g.:
905
                //   BE has [0-1][2-11][12-12], [12-12] is delete predicate, cp is 2;
906
                //   after doing EMPTY_CUMULATIVE compaction, MS cp is 13, get_rowset will return [2-11][12-12].
907
119k
                bool version_overlap =
908
119k
                        tablet->max_version_unlocked() >= rowsets.front()->start_version();
909
119k
                tablet->add_rowsets(std::move(rowsets), version_overlap, wlock,
910
119k
                                    options.warmup_delta_data ||
911
119k
                                            config::enable_warmup_immediately_on_new_rowset);
912
119k
            }
913
914
            // Fill version holes
915
159k
            int64_t partition_max_version =
916
159k
                    resp.has_partition_max_version() ? resp.partition_max_version() : -1;
917
159k
            RETURN_IF_ERROR(fill_version_holes(tablet, partition_max_version, wlock));
918
919
159k
            tablet->last_base_compaction_success_time_ms = stats.last_base_compaction_time_ms();
920
159k
            tablet->last_cumu_compaction_success_time_ms = stats.last_cumu_compaction_time_ms();
921
159k
            tablet->set_base_compaction_cnt(stats.base_compaction_cnt());
922
159k
            tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
923
159k
            tablet->set_full_compaction_cnt(stats.full_compaction_cnt());
924
159k
            tablet->set_cumulative_layer_point(stats.cumulative_point());
925
159k
            tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
926
159k
                                            stats.num_rows(), stats.data_size());
927
928
            // Sync last active cluster info for compaction read-write separation
929
159k
            if (config::enable_compaction_rw_separation && stats.has_last_active_cluster_id()) {
930
12.3k
                tablet->set_last_active_cluster_info(stats.last_active_cluster_id(),
931
12.3k
                                                     stats.last_active_time_ms());
932
12.3k
            }
933
159k
        }
934
0
        return Status::OK();
935
159k
    }
936
160k
}
937
938
bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet,
939
                                                      std::ranges::range auto&& rs_metas,
940
26.4k
                                                      DeleteBitmap* delete_bitmap) {
941
26.4k
    std::set<int64_t> txn_processed;
942
26.5k
    for (auto& rs_meta : rs_metas) {
943
26.5k
        auto txn_id = rs_meta.txn_id();
944
26.5k
        if (txn_processed.find(txn_id) != txn_processed.end()) {
945
0
            continue;
946
0
        }
947
26.5k
        txn_processed.insert(txn_id);
948
26.5k
        DeleteBitmapPtr tmp_delete_bitmap;
949
26.5k
        std::shared_ptr<PublishStatus> publish_status =
950
26.5k
                std::make_shared<PublishStatus>(PublishStatus::INIT);
951
26.5k
        CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
952
26.5k
        Status status = engine.txn_delete_bitmap_cache().get_delete_bitmap(
953
26.5k
                txn_id, tablet->tablet_id(), &tmp_delete_bitmap, nullptr, &publish_status);
954
        // CloudMetaMgr::sync_tablet_delete_bitmap_by_cache() is called after we sync rowsets from meta services.
955
        // If the control flows reaches here, it's gauranteed that the rowsets is commited in meta services, so we can
956
        // use the delete bitmap from cache directly if *publish_status == PublishStatus::SUCCEED without checking other
957
        // stats(version or compaction stats)
958
26.5k
        if (status.ok() && *publish_status == PublishStatus::SUCCEED) {
959
            // tmp_delete_bitmap contains sentinel marks, we should remove it before merge it to delete bitmap.
960
            // Also, the version of delete bitmap key in tmp_delete_bitmap is DeleteBitmap::TEMP_VERSION_COMMON,
961
            // we should replace it with the rowset's real version
962
894
            DCHECK(rs_meta.start_version() == rs_meta.end_version());
963
894
            int64_t rowset_version = rs_meta.start_version();
964
3.43k
            for (const auto& [delete_bitmap_key, bitmap_value] : tmp_delete_bitmap->delete_bitmap) {
965
                // skip sentinel mark, which is used for delete bitmap correctness check
966
3.43k
                if (std::get<1>(delete_bitmap_key) != DeleteBitmap::INVALID_SEGMENT_ID) {
967
259
                    delete_bitmap->merge({std::get<0>(delete_bitmap_key),
968
259
                                          std::get<1>(delete_bitmap_key), rowset_version},
969
259
                                         bitmap_value);
970
259
                }
971
3.43k
            }
972
894
            engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id,
973
894
                                                                           tablet->tablet_id());
974
25.6k
        } else {
975
25.6k
            LOG_EVERY_N(INFO, 20)
976
1.28k
                    << "delete bitmap not found in cache, will sync rowset to get. tablet_id= "
977
1.28k
                    << tablet->tablet_id() << ", txn_id=" << txn_id << ", status=" << status;
978
25.6k
            return false;
979
25.6k
        }
980
26.5k
    }
981
839
    return true;
982
26.4k
}
983
984
Status CloudMetaMgr::_get_delete_bitmap_from_ms(GetDeleteBitmapRequest& req,
985
25.9k
                                                GetDeleteBitmapResponse& res) {
986
25.9k
    VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString();
987
25.9k
    TEST_SYNC_POINT_CALLBACK("CloudMetaMgr::_get_delete_bitmap_from_ms", &req, &res);
988
989
25.9k
    auto st = retry_rpc(MetaServiceRPC::GET_DELETE_BITMAP, req, &res,
990
25.9k
                        &MetaService_Stub::get_delete_bitmap,
991
25.9k
                        {
992
25.9k
                                .host_limiters = host_level_ms_rpc_rate_limiters_,
993
25.9k
                                .backpressure_handler = ms_backpressure_handler_,
994
25.9k
                        });
995
25.9k
    if (st.code() == ErrorCode::THRIFT_RPC_ERROR) {
996
0
        return st;
997
0
    }
998
999
25.9k
    if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
1000
1
        return Status::NotFound("failed to get delete bitmap: {}", res.status().msg());
1001
1
    }
1002
    // The delete bitmap of stale rowsets will be removed when commit compaction job,
1003
    // then delete bitmap of stale rowsets cannot be obtained. But the rowsets obtained
1004
    // by sync_tablet_rowsets may include these stale rowsets. When this case happend, the
1005
    // error code of ROWSETS_EXPIRED will be returned, we need to retry sync rowsets again.
1006
    //
1007
    // Be query thread             meta-service          Be compaction thread
1008
    //      |                            |                         |
1009
    //      |        get rowset          |                         |
1010
    //      |--------------------------->|                         |
1011
    //      |    return get rowset       |                         |
1012
    //      |<---------------------------|                         |
1013
    //      |                            |        commit job       |
1014
    //      |                            |<------------------------|
1015
    //      |                            |    return commit job    |
1016
    //      |                            |------------------------>|
1017
    //      |      get delete bitmap     |                         |
1018
    //      |--------------------------->|                         |
1019
    //      |  return get delete bitmap  |                         |
1020
    //      |<---------------------------|                         |
1021
    //      |                            |                         |
1022
25.9k
    if (res.status().code() == MetaServiceCode::ROWSETS_EXPIRED) {
1023
0
        return Status::Error<ErrorCode::ROWSETS_EXPIRED, false>("failed to get delete bitmap: {}",
1024
0
                                                                res.status().msg());
1025
0
    }
1026
25.9k
    if (res.status().code() != MetaServiceCode::OK) {
1027
0
        return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to get delete bitmap: {}",
1028
0
                                                               res.status().msg());
1029
0
    }
1030
25.9k
    return Status::OK();
1031
25.9k
}
1032
1033
Status CloudMetaMgr::_get_delete_bitmap_from_ms_by_batch(GetDeleteBitmapRequest& req,
1034
                                                         GetDeleteBitmapResponse& res,
1035
25.5k
                                                         int64_t bytes_threadhold) {
1036
25.5k
    std::unordered_set<std::string> finished_rowset_ids {};
1037
25.5k
    int count = 0;
1038
25.9k
    do {
1039
25.9k
        GetDeleteBitmapRequest cur_req;
1040
25.9k
        GetDeleteBitmapResponse cur_res;
1041
1042
25.9k
        cur_req.set_cloud_unique_id(config::cloud_unique_id);
1043
25.9k
        cur_req.set_tablet_id(req.tablet_id());
1044
25.9k
        cur_req.set_base_compaction_cnt(req.base_compaction_cnt());
1045
25.9k
        cur_req.set_cumulative_compaction_cnt(req.cumulative_compaction_cnt());
1046
25.9k
        cur_req.set_cumulative_point(req.cumulative_point());
1047
25.9k
        *(cur_req.mutable_idx()) = req.idx();
1048
25.9k
        cur_req.set_store_version(req.store_version());
1049
26.0k
        if (bytes_threadhold > 0) {
1050
26.0k
            cur_req.set_dbm_bytes_threshold(bytes_threadhold);
1051
26.0k
        }
1052
57.5k
        for (int i = 0; i < req.rowset_ids_size(); i++) {
1053
31.6k
            if (!finished_rowset_ids.contains(req.rowset_ids(i))) {
1054
30.5k
                cur_req.add_rowset_ids(req.rowset_ids(i));
1055
30.5k
                cur_req.add_begin_versions(req.begin_versions(i));
1056
30.5k
                cur_req.add_end_versions(req.end_versions(i));
1057
30.5k
            }
1058
31.6k
        }
1059
1060
25.9k
        RETURN_IF_ERROR(_get_delete_bitmap_from_ms(cur_req, cur_res));
1061
25.9k
        ++count;
1062
1063
        // v1 delete bitmap
1064
25.9k
        res.mutable_rowset_ids()->MergeFrom(cur_res.rowset_ids());
1065
25.9k
        res.mutable_segment_ids()->MergeFrom(cur_res.segment_ids());
1066
25.9k
        res.mutable_versions()->MergeFrom(cur_res.versions());
1067
25.9k
        res.mutable_segment_delete_bitmaps()->MergeFrom(cur_res.segment_delete_bitmaps());
1068
1069
        // v2 delete bitmap
1070
25.9k
        res.mutable_delta_rowset_ids()->MergeFrom(cur_res.delta_rowset_ids());
1071
25.9k
        res.mutable_delete_bitmap_storages()->MergeFrom(cur_res.delete_bitmap_storages());
1072
1073
29.6k
        for (const auto& rowset_id : cur_res.returned_rowset_ids()) {
1074
29.6k
            finished_rowset_ids.insert(rowset_id);
1075
29.6k
        }
1076
1077
25.9k
        bool has_more = cur_res.has_has_more() && cur_res.has_more();
1078
25.9k
        if (!has_more) {
1079
25.7k
            break;
1080
25.7k
        }
1081
176
        LOG_INFO("batch get delete bitmap, progress={}/{}", finished_rowset_ids.size(),
1082
176
                 req.rowset_ids_size())
1083
176
                .tag("tablet_id", req.tablet_id())
1084
176
                .tag("cur_returned_rowsets", cur_res.returned_rowset_ids_size())
1085
176
                .tag("rpc_count", count);
1086
176
    } while (finished_rowset_ids.size() < req.rowset_ids_size());
1087
25.5k
    return Status::OK();
1088
25.5k
}
1089
1090
Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version,
1091
                                               std::ranges::range auto&& rs_metas,
1092
                                               const TabletStatsPB& stats, const TabletIndexPB& idx,
1093
                                               DeleteBitmap* delete_bitmap, bool full_sync,
1094
                                               SyncRowsetStats* sync_stats, int32_t read_version,
1095
37.2k
                                               bool full_sync_v2) {
1096
37.2k
    if (rs_metas.empty()) {
1097
10.7k
        return Status::OK();
1098
10.7k
    }
1099
1100
26.5k
    if (!full_sync && config::enable_sync_tablet_delete_bitmap_by_cache &&
1101
26.5k
        sync_tablet_delete_bitmap_by_cache(tablet, rs_metas, delete_bitmap)) {
1102
859
        if (sync_stats) {
1103
394
            sync_stats->get_local_delete_bitmap_rowsets_num += rs_metas.size();
1104
394
        }
1105
859
        return Status::OK();
1106
25.6k
    } else {
1107
25.6k
        DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
1108
25.6k
        *delete_bitmap = *new_delete_bitmap;
1109
25.6k
    }
1110
1111
25.6k
    if (read_version == 2 && config::delete_bitmap_store_write_version == 1) {
1112
0
        return Status::InternalError(
1113
0
                "please set delete_bitmap_store_read_version to 1 or 3 because "
1114
0
                "delete_bitmap_store_write_version is 1");
1115
25.6k
    } else if (read_version == 1 && config::delete_bitmap_store_write_version == 2) {
1116
0
        return Status::InternalError(
1117
0
                "please set delete_bitmap_store_read_version to 2 or 3 because "
1118
0
                "delete_bitmap_store_write_version is 2");
1119
0
    }
1120
1121
25.6k
    int64_t new_max_version = std::max(old_max_version, rs_metas.rbegin()->end_version());
1122
    // When there are many delete bitmaps that need to be synchronized, it
1123
    // may take a longer time, especially when loading the tablet for the
1124
    // first time, so set a relatively long timeout time.
1125
25.6k
    GetDeleteBitmapRequest req;
1126
25.6k
    GetDeleteBitmapResponse res;
1127
25.6k
    req.set_cloud_unique_id(config::cloud_unique_id);
1128
25.6k
    req.set_tablet_id(tablet->tablet_id());
1129
25.6k
    req.set_base_compaction_cnt(stats.base_compaction_cnt());
1130
25.6k
    req.set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
1131
25.6k
    req.set_cumulative_point(stats.cumulative_point());
1132
25.6k
    *(req.mutable_idx()) = idx;
1133
25.6k
    req.set_store_version(read_version);
1134
    // New rowset sync all versions of delete bitmap
1135
29.4k
    for (const auto& rs_meta : rs_metas) {
1136
29.4k
        req.add_rowset_ids(rs_meta.rowset_id_v2());
1137
29.4k
        req.add_begin_versions(0);
1138
29.4k
        req.add_end_versions(new_max_version);
1139
29.4k
    }
1140
1141
25.6k
    if (!full_sync_v2) {
1142
        // old rowset sync incremental versions of delete bitmap
1143
25.6k
        if (old_max_version > 0 && old_max_version < new_max_version) {
1144
15
            RowsetIdUnorderedSet all_rs_ids;
1145
15
            RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids));
1146
120
            for (const auto& rs_id : all_rs_ids) {
1147
120
                req.add_rowset_ids(rs_id.to_string());
1148
120
                req.add_begin_versions(old_max_version + 1);
1149
120
                req.add_end_versions(new_max_version);
1150
120
            }
1151
15
        }
1152
18.4E
    } else {
1153
18.4E
        if (old_max_version > 0) {
1154
0
            RowsetIdUnorderedSet all_rs_ids;
1155
0
            RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids));
1156
0
            for (const auto& rs_id : all_rs_ids) {
1157
0
                req.add_rowset_ids(rs_id.to_string());
1158
0
                req.add_begin_versions(0);
1159
0
                req.add_end_versions(new_max_version);
1160
0
            }
1161
0
        }
1162
18.4E
    }
1163
25.6k
    if (sync_stats) {
1164
7.93k
        sync_stats->get_remote_delete_bitmap_rowsets_num += req.rowset_ids_size();
1165
7.93k
    }
1166
1167
25.6k
    auto start = std::chrono::steady_clock::now();
1168
25.6k
    if (config::enable_batch_get_delete_bitmap) {
1169
25.6k
        RETURN_IF_ERROR(_get_delete_bitmap_from_ms_by_batch(
1170
25.6k
                req, res, config::get_delete_bitmap_bytes_threshold));
1171
25.6k
    } else {
1172
11
        RETURN_IF_ERROR(_get_delete_bitmap_from_ms(req, res));
1173
11
    }
1174
25.6k
    auto end = std::chrono::steady_clock::now();
1175
1176
    // v1 delete bitmap
1177
25.6k
    const auto& rowset_ids = res.rowset_ids();
1178
25.6k
    const auto& segment_ids = res.segment_ids();
1179
25.6k
    const auto& vers = res.versions();
1180
25.6k
    const auto& delete_bitmaps = res.segment_delete_bitmaps();
1181
25.7k
    if (rowset_ids.size() != segment_ids.size() || rowset_ids.size() != vers.size() ||
1182
25.7k
        rowset_ids.size() != delete_bitmaps.size()) {
1183
0
        return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
1184
0
                "get delete bitmap data wrong,"
1185
0
                "rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}",
1186
0
                rowset_ids.size(), segment_ids.size(), vers.size(), delete_bitmaps.size());
1187
0
    }
1188
25.9k
    for (int i = 0; i < rowset_ids.size(); i++) {
1189
363
        RowsetId rst_id;
1190
363
        rst_id.init(rowset_ids[i]);
1191
363
        delete_bitmap->merge(
1192
363
                {rst_id, segment_ids[i], vers[i]},
1193
363
                roaring::Roaring::readSafe(delete_bitmaps[i].data(), delete_bitmaps[i].length()));
1194
363
    }
1195
    // v2 delete bitmap
1196
25.6k
    const auto& delta_rowset_ids = res.delta_rowset_ids();
1197
25.6k
    const auto& delete_bitmap_storages = res.delete_bitmap_storages();
1198
25.6k
    if (delta_rowset_ids.size() != delete_bitmap_storages.size()) {
1199
0
        return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
1200
0
                "get delete bitmap data wrong, delta_rowset_ids.size={}, "
1201
0
                "delete_bitmap_storages.size={}",
1202
0
                delta_rowset_ids.size(), delete_bitmap_storages.size());
1203
0
    }
1204
25.6k
    int64_t remote_delete_bitmap_bytes = 0;
1205
25.6k
    RETURN_IF_ERROR(_read_tablet_delete_bitmap_v2(tablet, old_max_version, rs_metas, delete_bitmap,
1206
25.6k
                                                  res, remote_delete_bitmap_bytes, full_sync_v2));
1207
1208
25.6k
    if (sync_stats) {
1209
7.93k
        sync_stats->get_remote_delete_bitmap_rpc_ns +=
1210
7.93k
                std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
1211
7.93k
        sync_stats->get_remote_delete_bitmap_key_count +=
1212
7.93k
                delete_bitmaps.size() + delete_bitmap_storages.size();
1213
7.93k
        for (const auto& dbm : delete_bitmaps) {
1214
346
            sync_stats->get_remote_delete_bitmap_bytes += dbm.length();
1215
346
        }
1216
7.93k
        sync_stats->get_remote_delete_bitmap_bytes += remote_delete_bitmap_bytes;
1217
7.93k
    }
1218
25.6k
    int64_t latency = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
1219
25.6k
    if (latency > 100 * 1000) { // 100ms
1220
15
        LOG(INFO) << "finish get_delete_bitmap rpcs. rowset_ids.size()=" << rowset_ids.size()
1221
15
                  << ", delete_bitmaps.size()=" << delete_bitmaps.size()
1222
15
                  << ", delta_delete_bitmaps.size()=" << delta_rowset_ids.size()
1223
15
                  << ", latency=" << latency << "us, read_version=" << read_version;
1224
25.6k
    } else {
1225
25.6k
        LOG_EVERY_N(INFO, 100) << "finish get_delete_bitmap rpcs. rowset_ids.size()="
1226
258
                               << rowset_ids.size()
1227
258
                               << ", delete_bitmaps.size()=" << delete_bitmaps.size()
1228
258
                               << ", delta_delete_bitmaps.size()=" << delta_rowset_ids.size()
1229
258
                               << ", latency=" << latency << "us, read_version=" << read_version;
1230
25.6k
    }
1231
25.6k
    return Status::OK();
1232
25.6k
}
1233
1234
Status CloudMetaMgr::_check_delete_bitmap_v2_correctness(CloudTablet* tablet, GetRowsetRequest& req,
1235
                                                         GetRowsetResponse& resp,
1236
37.3k
                                                         int64_t old_max_version) {
1237
37.3k
    if (!config::enable_delete_bitmap_store_v2_check_correctness ||
1238
37.3k
        config::delete_bitmap_store_write_version == 1 || resp.rowset_meta().empty()) {
1239
37.3k
        return Status::OK();
1240
37.3k
    }
1241
18.4E
    int64_t tablet_id = tablet->tablet_id();
1242
18.4E
    int64_t new_max_version = std::max(old_max_version, resp.rowset_meta().rbegin()->end_version());
1243
    // rowset_id, num_segments
1244
18.4E
    std::vector<std::pair<RowsetId, int64_t>> all_rowsets;
1245
18.4E
    std::map<std::string, std::string> rowset_to_resource;
1246
18.4E
    for (const auto& rs_meta : resp.rowset_meta()) {
1247
0
        RowsetId rowset_id;
1248
0
        rowset_id.init(rs_meta.rowset_id_v2());
1249
0
        all_rowsets.emplace_back(std::make_pair(rowset_id, rs_meta.num_segments()));
1250
0
        rowset_to_resource[rs_meta.rowset_id_v2()] = rs_meta.resource_id();
1251
0
    }
1252
18.4E
    if (old_max_version > 0) {
1253
0
        RowsetIdUnorderedSet all_rs_ids;
1254
0
        RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids));
1255
0
        for (auto& rowset : tablet->get_rowset_by_ids(&all_rs_ids)) {
1256
0
            all_rowsets.emplace_back(std::make_pair(rowset->rowset_id(), rowset->num_segments()));
1257
0
            rowset_to_resource[rowset->rowset_id().to_string()] =
1258
0
                    rowset->rowset_meta()->resource_id();
1259
0
        }
1260
0
    }
1261
1262
18.4E
    auto compare_delete_bitmap = [&](DeleteBitmap* delete_bitmap, int version) {
1263
0
        bool success = true;
1264
0
        for (auto& [rs_id, num_segments] : all_rowsets) {
1265
0
            for (int seg_id = 0; seg_id < num_segments; ++seg_id) {
1266
0
                DeleteBitmap::BitmapKey key = {rs_id, seg_id, new_max_version};
1267
0
                auto dm1 = tablet->tablet_meta()->delete_bitmap().get_agg(key);
1268
0
                auto dm2 = delete_bitmap->get_agg_without_cache(key);
1269
0
                if (*dm1 != *dm2) {
1270
0
                    success = false;
1271
0
                    LOG(WARNING) << "failed to check delete bitmap correctness by v"
1272
0
                                 << std::to_string(version) << ", tablet_id=" << tablet->tablet_id()
1273
0
                                 << ", rowset_id=" << rs_id.to_string() << ", segment_id=" << seg_id
1274
0
                                 << ", max_version=" << new_max_version
1275
0
                                 << ". size1=" << dm1->cardinality()
1276
0
                                 << ", size2=" << dm2->cardinality();
1277
0
                }
1278
0
            }
1279
0
        }
1280
0
        if (success) {
1281
0
            LOG(INFO) << "succeed to check delete bitmap correctness by v"
1282
0
                      << std::to_string(version) << ", tablet_id=" << tablet->tablet_id()
1283
0
                      << ", max_version=" << new_max_version;
1284
0
        }
1285
0
    };
1286
1287
18.4E
    DeleteBitmap full_delete_bitmap(tablet_id);
1288
18.4E
    auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(), resp.stats(),
1289
18.4E
                                        req.idx(), &full_delete_bitmap, false, nullptr, 2, true);
1290
18.4E
    if (!st.ok()) {
1291
0
        LOG_WARNING("failed to check delete bitmap correctness by v2")
1292
0
                .tag("tablet", tablet->tablet_id())
1293
0
                .error(st);
1294
18.4E
    } else {
1295
18.4E
        compare_delete_bitmap(&full_delete_bitmap, 2);
1296
18.4E
    }
1297
18.4E
    return Status::OK();
1298
18.4E
}
1299
1300
Status CloudMetaMgr::_read_tablet_delete_bitmap_v2(CloudTablet* tablet, int64_t old_max_version,
1301
                                                   std::ranges::range auto&& rs_metas,
1302
                                                   DeleteBitmap* delete_bitmap,
1303
                                                   GetDeleteBitmapResponse& res,
1304
                                                   int64_t& remote_delete_bitmap_bytes,
1305
25.6k
                                                   bool full_sync_v2) {
1306
25.7k
    if (res.delta_rowset_ids().empty()) {
1307
25.7k
        return Status::OK();
1308
25.7k
    }
1309
18.4E
    const auto& rowset_ids = res.delta_rowset_ids();
1310
18.4E
    const auto& delete_bitmap_storages = res.delete_bitmap_storages();
1311
18.4E
    RowsetIdUnorderedSet all_rs_ids;
1312
18.4E
    std::map<std::string, std::string> rowset_to_resource;
1313
18.4E
    if (old_max_version > 0) {
1314
0
        RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids));
1315
0
        if (full_sync_v2) {
1316
0
            for (auto& rowset : tablet->get_rowset_by_ids(&all_rs_ids)) {
1317
0
                rowset_to_resource[rowset->rowset_id().to_string()] =
1318
0
                        rowset->rowset_meta()->resource_id();
1319
0
            }
1320
0
        }
1321
0
    }
1322
18.4E
    for (const auto& rs_meta : rs_metas) {
1323
0
        RowsetId rs_id;
1324
0
        rs_id.init(rs_meta.rowset_id_v2());
1325
0
        all_rs_ids.emplace(rs_id);
1326
0
        rowset_to_resource[rs_meta.rowset_id_v2()] = rs_meta.resource_id();
1327
0
    }
1328
18.4E
    if (config::enable_mow_verbose_log) {
1329
0
        LOG(INFO) << "read delete bitmap for tablet_id=" << tablet->tablet_id()
1330
0
                  << ", old_max_version=" << old_max_version
1331
0
                  << ", new rowset num=" << rs_metas.size()
1332
0
                  << ", rowset has delete bitmap num=" << rowset_ids.size()
1333
0
                  << ". all rowset num=" << all_rs_ids.size();
1334
0
    }
1335
1336
18.4E
    std::mutex result_mtx;
1337
18.4E
    Status result;
1338
18.4E
    auto merge_delete_bitmap = [&](const std::string& rowset_id, DeleteBitmapPB& dbm) {
1339
0
        if (dbm.rowset_ids_size() != dbm.segment_ids_size() ||
1340
0
            dbm.rowset_ids_size() != dbm.versions_size() ||
1341
0
            dbm.rowset_ids_size() != dbm.segment_delete_bitmaps_size()) {
1342
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
1343
0
                    "get delete bitmap data wrong, rowset_id={}"
1344
0
                    "rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}",
1345
0
                    rowset_id, dbm.rowset_ids_size(), dbm.segment_ids_size(), dbm.versions_size(),
1346
0
                    dbm.segment_delete_bitmaps_size());
1347
0
        }
1348
0
        if (config::enable_mow_verbose_log) {
1349
0
            LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id()
1350
0
                      << ", rowset_id=" << rowset_id
1351
0
                      << ", delete_bitmap num=" << dbm.segment_delete_bitmaps_size();
1352
0
        }
1353
0
        std::lock_guard lock(result_mtx);
1354
0
        for (int j = 0; j < dbm.rowset_ids_size(); j++) {
1355
0
            RowsetId rst_id;
1356
0
            rst_id.init(dbm.rowset_ids(j));
1357
0
            if (!all_rs_ids.contains(rst_id)) {
1358
0
                LOG(INFO) << "skip merge delete bitmap for tablet_id=" << tablet->tablet_id()
1359
0
                          << ", rowset_id=" << rowset_id << ", unused rowset_id=" << rst_id;
1360
0
                continue;
1361
0
            }
1362
0
            delete_bitmap->merge(
1363
0
                    {rst_id, dbm.segment_ids(j), dbm.versions(j)},
1364
0
                    roaring::Roaring::readSafe(dbm.segment_delete_bitmaps(j).data(),
1365
0
                                               dbm.segment_delete_bitmaps(j).length()));
1366
0
            remote_delete_bitmap_bytes += dbm.segment_delete_bitmaps(j).length();
1367
0
        }
1368
0
        return Status::OK();
1369
0
    };
1370
18.4E
    auto get_delete_bitmap_from_file = [&](const std::string& rowset_id,
1371
18.4E
                                           const DeleteBitmapStoragePB& storage) {
1372
0
        if (config::enable_mow_verbose_log) {
1373
0
            LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id()
1374
0
                      << ", rowset_id=" << rowset_id << " from file"
1375
0
                      << ", is_packed=" << storage.has_packed_slice_location();
1376
0
        }
1377
0
        if (rowset_to_resource.find(rowset_id) == rowset_to_resource.end()) {
1378
0
            return Status::InternalError("vault id not found for tablet_id={}, rowset_id={}",
1379
0
                                         tablet->tablet_id(), rowset_id);
1380
0
        }
1381
0
        auto resource_id = rowset_to_resource[rowset_id];
1382
0
        CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
1383
0
        auto storage_resource = engine.get_storage_resource(resource_id);
1384
0
        if (!storage_resource) {
1385
0
            return Status::InternalError("vault id not found, maybe not sync, vault id {}",
1386
0
                                         resource_id);
1387
0
        }
1388
1389
        // Use packed file reader if packed_slice_location is present
1390
0
        std::unique_ptr<DeleteBitmapFileReader> reader;
1391
0
        if (storage.has_packed_slice_location() &&
1392
0
            !storage.packed_slice_location().packed_file_path().empty()) {
1393
0
            reader = std::make_unique<DeleteBitmapFileReader>(tablet->tablet_id(), rowset_id,
1394
0
                                                              storage_resource,
1395
0
                                                              storage.packed_slice_location());
1396
0
        } else {
1397
0
            reader = std::make_unique<DeleteBitmapFileReader>(tablet->tablet_id(), rowset_id,
1398
0
                                                              storage_resource);
1399
0
        }
1400
1401
0
        RETURN_IF_ERROR(reader->init());
1402
0
        DeleteBitmapPB dbm;
1403
0
        RETURN_IF_ERROR(reader->read(dbm));
1404
0
        RETURN_IF_ERROR(reader->close());
1405
0
        return merge_delete_bitmap(rowset_id, dbm);
1406
0
    };
1407
18.4E
    CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
1408
18.4E
    std::unique_ptr<ThreadPoolToken> token = engine.sync_delete_bitmap_thread_pool().new_token(
1409
18.4E
            ThreadPool::ExecutionMode::CONCURRENT);
1410
18.4E
    bthread::CountdownEvent wait {rowset_ids.size()};
1411
18.4E
    for (int i = 0; i < rowset_ids.size(); i++) {
1412
0
        auto& rowset_id = rowset_ids[i];
1413
0
        if (delete_bitmap_storages[i].store_in_fdb()) {
1414
0
            wait.signal();
1415
0
            DeleteBitmapPB dbm = delete_bitmap_storages[i].delete_bitmap();
1416
0
            RETURN_IF_ERROR(merge_delete_bitmap(rowset_id, dbm));
1417
0
        } else {
1418
0
            const auto& storage = delete_bitmap_storages[i];
1419
0
            auto submit_st = token->submit_func([&, rowset_id, storage]() {
1420
0
                auto status = get_delete_bitmap_from_file(rowset_id, storage);
1421
0
                if (!status.ok()) {
1422
0
                    LOG(WARNING) << "failed to get delete bitmap for tablet_id="
1423
0
                                 << tablet->tablet_id() << ", rowset_id=" << rowset_id
1424
0
                                 << " from file, st=" << status.to_string();
1425
0
                    std::lock_guard lock(result_mtx);
1426
0
                    if (result.ok()) {
1427
0
                        result = status;
1428
0
                    }
1429
0
                }
1430
0
                wait.signal();
1431
0
            });
1432
0
            RETURN_IF_ERROR(submit_st);
1433
0
        }
1434
0
    }
1435
    // wait for all finished
1436
18.4E
    wait.wait();
1437
18.4E
    token->wait();
1438
18.4E
    return result;
1439
18.4E
}
1440
1441
Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
1442
192k
                                    int64_t table_id, RowsetMetaSharedPtr* existed_rs_meta) {
1443
192k
    VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id()
1444
45
               << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
1445
192k
    {
1446
192k
        Status ret_st;
1447
192k
        TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_rowset", ret_st);
1448
192k
    }
1449
192k
    CreateRowsetRequest req;
1450
192k
    CreateRowsetResponse resp;
1451
192k
    req.set_cloud_unique_id(config::cloud_unique_id);
1452
192k
    req.set_txn_id(rs_meta.txn_id());
1453
192k
    req.set_tablet_job_id(job_id);
1454
1455
192k
    RowsetMetaPB doris_rs_meta = rs_meta.get_rowset_pb(/*skip_schema=*/true);
1456
192k
    doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(doris_rs_meta));
1457
1458
192k
    Status st =
1459
192k
            retry_rpc(MetaServiceRPC::PREPARE_ROWSET, req, &resp, &MetaService_Stub::prepare_rowset,
1460
192k
                      {
1461
192k
                              .host_limiters = host_level_ms_rpc_rate_limiters_,
1462
192k
                              .backpressure_handler = ms_backpressure_handler_,
1463
192k
                              .table_id = table_id,
1464
192k
                      });
1465
192k
    if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) {
1466
2
        if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) {
1467
2
            RowsetMetaPB doris_rs_meta_tmp =
1468
2
                    cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta()));
1469
2
            *existed_rs_meta = std::make_shared<RowsetMeta>();
1470
2
            (*existed_rs_meta)->init_from_pb(doris_rs_meta_tmp);
1471
2
        }
1472
2
        return Status::AlreadyExist("failed to prepare rowset: {}", resp.status().msg());
1473
2
    }
1474
192k
    return st;
1475
192k
}
1476
1477
Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_id, int64_t table_id,
1478
192k
                                   RowsetMetaSharedPtr* existed_rs_meta) {
1479
192k
    VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
1480
179
               << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
1481
192k
    {
1482
192k
        Status ret_st;
1483
192k
        TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_rowset", ret_st);
1484
192k
    }
1485
192k
    check_table_size_correctness(rs_meta);
1486
192k
    CreateRowsetRequest req;
1487
192k
    CreateRowsetResponse resp;
1488
192k
    req.set_cloud_unique_id(config::cloud_unique_id);
1489
192k
    req.set_txn_id(rs_meta.txn_id());
1490
192k
    req.set_tablet_job_id(job_id);
1491
1492
192k
    RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb();
1493
192k
    doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb));
1494
192k
    Status st =
1495
192k
            retry_rpc(MetaServiceRPC::COMMIT_ROWSET, req, &resp, &MetaService_Stub::commit_rowset,
1496
192k
                      {
1497
192k
                              .host_limiters = host_level_ms_rpc_rate_limiters_,
1498
192k
                              .backpressure_handler = ms_backpressure_handler_,
1499
192k
                              .table_id = table_id,
1500
192k
                      });
1501
192k
    if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) {
1502
0
        if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) {
1503
0
            RowsetMetaPB doris_rs_meta =
1504
0
                    cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta()));
1505
0
            *existed_rs_meta = std::make_shared<RowsetMeta>();
1506
0
            (*existed_rs_meta)->init_from_pb(doris_rs_meta);
1507
0
        }
1508
0
        return Status::AlreadyExist("failed to commit rowset: {}", resp.status().msg());
1509
0
    }
1510
192k
    int64_t timeout_ms = -1;
1511
    // if the `job_id` is not empty, it means this rowset was produced by a compaction job.
1512
192k
    if (config::enable_compaction_delay_commit_for_warm_up && !job_id.empty()) {
1513
        // 1. assume the download speed is 100MB/s
1514
        // 2. we double the download time as timeout for safety
1515
        // 3. for small rowsets, the timeout we calculate maybe quite small, so we need a min_time_out
1516
0
        const double speed_mbps = 100.0; // 100MB/s
1517
0
        const double safety_factor = 2.0;
1518
0
        timeout_ms = std::min(
1519
0
                std::max(static_cast<int64_t>(static_cast<double>(rs_meta.total_disk_size()) /
1520
0
                                              (speed_mbps * 1024 * 1024) * safety_factor * 1000),
1521
0
                         config::warm_up_rowset_sync_wait_min_timeout_ms),
1522
0
                config::warm_up_rowset_sync_wait_max_timeout_ms);
1523
0
        LOG(INFO) << "warm up rowset: " << rs_meta.version() << ", job_id: " << job_id
1524
0
                  << ", with timeout: " << timeout_ms << " ms";
1525
0
    }
1526
192k
    auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
1527
192k
    manager.warm_up_rowset(rs_meta, timeout_ms);
1528
192k
    return st;
1529
192k
}
1530
1531
124k
void CloudMetaMgr::cache_committed_rowset(RowsetMetaSharedPtr rs_meta, int64_t expiration_time) {
1532
    // For load-generated rowsets (job_id is empty), add to pending rowset manager
1533
    // so FE can notify BE to promote them later
1534
1535
    // TODO(bobhan1): copy rs_meta?
1536
124k
    int64_t txn_id = rs_meta->txn_id();
1537
124k
    int64_t tablet_id = rs_meta->tablet_id();
1538
124k
    ExecEnv::GetInstance()->storage_engine().to_cloud().committed_rs_mgr().add_committed_rowset(
1539
124k
            txn_id, tablet_id, std::move(rs_meta), expiration_time);
1540
124k
}
1541
1542
19
Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta, int64_t table_id) {
1543
19
    VLOG_DEBUG << "update committed rowset, tablet_id: " << rs_meta.tablet_id()
1544
0
               << ", rowset_id: " << rs_meta.rowset_id();
1545
19
    CreateRowsetRequest req;
1546
19
    CreateRowsetResponse resp;
1547
19
    req.set_cloud_unique_id(config::cloud_unique_id);
1548
1549
    // Variant schema maybe updated, so we need to update the schema as well.
1550
    // The updated rowset meta after `rowset->merge_rowset_meta` in `BaseTablet::update_delete_bitmap`
1551
    // will be lost in `update_tmp_rowset` if skip_schema.So in order to keep the latest schema we should keep schema in update_tmp_rowset
1552
    // for variant type
1553
19
    bool skip_schema = rs_meta.tablet_schema()->num_variant_columns() == 0;
1554
19
    RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(skip_schema);
1555
19
    doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb));
1556
19
    Status st = retry_rpc(MetaServiceRPC::UPDATE_TMP_ROWSET, req, &resp,
1557
19
                          &MetaService_Stub::update_tmp_rowset,
1558
19
                          {
1559
19
                                  .host_limiters = host_level_ms_rpc_rate_limiters_,
1560
19
                                  .backpressure_handler = ms_backpressure_handler_,
1561
19
                                  .table_id = table_id,
1562
19
                          });
1563
19
    if (!st.ok() && resp.status().code() == MetaServiceCode::ROWSET_META_NOT_FOUND) {
1564
0
        return Status::InternalError("failed to update committed rowset: {}", resp.status().msg());
1565
0
    }
1566
19
    return st;
1567
19
}
1568
1569
// async send TableStats(in res) to FE coz we are in streamload ctx, response to the user ASAP
1570
static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id,
1571
                                   const std::string& label, CommitTxnResponse& res,
1572
6.89k
                                   const std::vector<int64_t>& tablet_ids) {
1573
6.89k
    std::string protobufBytes;
1574
6.89k
    if (txn_id != -1) {
1575
0
        res.SerializeToString(&protobufBytes);
1576
0
    }
1577
6.89k
    auto st = ExecEnv::GetInstance()->send_table_stats_thread_pool()->submit_func(
1578
6.89k
            [db_id, txn_id, label, protobufBytes, tablet_ids]() -> Status {
1579
6.89k
                TReportCommitTxnResultRequest request;
1580
6.89k
                TStatus result;
1581
1582
6.89k
                if (txn_id != -1 && protobufBytes.length() <= 0) {
1583
0
                    LOG(WARNING) << "protobufBytes: " << protobufBytes.length();
1584
0
                    return Status::OK(); // nobody cares the return status
1585
0
                }
1586
1587
6.89k
                request.__set_dbId(db_id);
1588
6.89k
                request.__set_txnId(txn_id);
1589
6.89k
                request.__set_label(label);
1590
6.89k
                request.__set_payload(protobufBytes);
1591
6.89k
                request.__set_tabletIds(tablet_ids);
1592
1593
6.89k
                Status status;
1594
6.89k
                int64_t duration_ns = 0;
1595
6.89k
                TNetworkAddress master_addr =
1596
6.89k
                        ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
1597
6.89k
                if (master_addr.hostname.empty() || master_addr.port == 0) {
1598
0
                    status = Status::Error<SERVICE_UNAVAILABLE>(
1599
0
                            "Have not get FE Master heartbeat yet");
1600
6.89k
                } else {
1601
6.89k
                    SCOPED_RAW_TIMER(&duration_ns);
1602
1603
6.89k
                    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
1604
6.89k
                            master_addr.hostname, master_addr.port,
1605
6.89k
                            [&request, &result](FrontendServiceConnection& client) {
1606
6.89k
                                client->reportCommitTxnResult(result, request);
1607
6.89k
                            }));
1608
1609
6.89k
                    status = Status::create<false>(result);
1610
6.89k
                }
1611
6.89k
                g_cloud_commit_txn_resp_redirect_latency << duration_ns / 1000;
1612
1613
6.89k
                if (!status.ok()) {
1614
0
                    LOG(WARNING) << "TableStats report RPC to FE failed, errmsg=" << status
1615
0
                                 << " dbId=" << db_id << " txnId=" << txn_id << " label=" << label;
1616
0
                    return Status::OK(); // nobody cares the return status
1617
6.89k
                } else {
1618
6.89k
                    LOG(INFO) << "TableStats report RPC to FE success, msg=" << status
1619
6.89k
                              << " dbId=" << db_id << " txnId=" << txn_id << " label=" << label;
1620
6.89k
                    return Status::OK();
1621
6.89k
                }
1622
6.89k
            });
1623
6.89k
    if (!st.ok()) {
1624
0
        LOG(WARNING) << "TableStats report to FE task submission failed: " << st.to_string();
1625
0
    }
1626
6.89k
}
1627
1628
0
Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) {
1629
0
    VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
1630
0
               << ", label: " << ctx.label << ", is_2pc: " << is_2pc;
1631
0
    {
1632
0
        Status ret_st;
1633
0
        TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_txn", ret_st);
1634
0
    }
1635
0
    CommitTxnRequest req;
1636
0
    CommitTxnResponse res;
1637
0
    req.set_cloud_unique_id(config::cloud_unique_id);
1638
0
    req.set_db_id(ctx.db_id);
1639
0
    req.set_txn_id(ctx.txn_id);
1640
0
    req.set_is_2pc(is_2pc);
1641
0
    req.set_enable_txn_lazy_commit(config::enable_cloud_txn_lazy_commit);
1642
0
    auto st = retry_rpc(MetaServiceRPC::COMMIT_TXN, req, &res, &MetaService_Stub::commit_txn,
1643
0
                        {
1644
0
                                .host_limiters = host_level_ms_rpc_rate_limiters_,
1645
0
                                .backpressure_handler = ms_backpressure_handler_,
1646
0
                        });
1647
1648
0
    if (st.ok()) {
1649
0
        std::vector<int64_t> tablet_ids;
1650
0
        for (auto& commit_info : ctx.commit_infos) {
1651
0
            tablet_ids.emplace_back(commit_info.tabletId);
1652
0
        }
1653
0
        send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, res, tablet_ids);
1654
0
    }
1655
1656
0
    return st;
1657
0
}
1658
1659
488
Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) {
1660
488
    VLOG_DEBUG << "abort txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
1661
0
               << ", label: " << ctx.label;
1662
488
    {
1663
488
        Status ret_st;
1664
488
        TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::abort_txn", ret_st);
1665
488
    }
1666
488
    AbortTxnRequest req;
1667
488
    AbortTxnResponse res;
1668
488
    req.set_cloud_unique_id(config::cloud_unique_id);
1669
488
    req.set_reason(std::string(ctx.status.msg().substr(0, 1024)));
1670
488
    if (ctx.db_id > 0 && !ctx.label.empty()) {
1671
480
        req.set_db_id(ctx.db_id);
1672
480
        req.set_label(ctx.label);
1673
480
    } else if (ctx.txn_id > 0) {
1674
8
        req.set_txn_id(ctx.txn_id);
1675
8
    } else {
1676
0
        LOG(WARNING) << "failed abort txn, with illegal input, db_id=" << ctx.db_id
1677
0
                     << " txn_id=" << ctx.txn_id << " label=" << ctx.label;
1678
0
        return Status::InternalError<false>("failed to abort txn");
1679
0
    }
1680
488
    return retry_rpc(MetaServiceRPC::ABORT_TXN, req, &res, &MetaService_Stub::abort_txn,
1681
488
                     {
1682
488
                             .host_limiters = host_level_ms_rpc_rate_limiters_,
1683
488
                             .backpressure_handler = ms_backpressure_handler_,
1684
488
                     });
1685
488
}
1686
1687
33
Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) {
1688
33
    VLOG_DEBUG << "precommit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
1689
0
               << ", label: " << ctx.label;
1690
33
    {
1691
33
        Status ret_st;
1692
33
        TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::precommit_txn", ret_st);
1693
33
    }
1694
33
    PrecommitTxnRequest req;
1695
33
    PrecommitTxnResponse res;
1696
33
    req.set_cloud_unique_id(config::cloud_unique_id);
1697
33
    req.set_db_id(ctx.db_id);
1698
33
    req.set_txn_id(ctx.txn_id);
1699
33
    return retry_rpc(MetaServiceRPC::PRECOMMIT_TXN, req, &res, &MetaService_Stub::precommit_txn,
1700
33
                     {
1701
33
                             .host_limiters = host_level_ms_rpc_rate_limiters_,
1702
33
                             .backpressure_handler = ms_backpressure_handler_,
1703
33
                     });
1704
33
}
1705
1706
0
Status CloudMetaMgr::prepare_restore_job(const TabletMetaPB& tablet_meta) {
1707
0
    VLOG_DEBUG << "prepare restore job, tablet_id: " << tablet_meta.tablet_id();
1708
0
    RestoreJobRequest req;
1709
0
    RestoreJobResponse resp;
1710
0
    req.set_cloud_unique_id(config::cloud_unique_id);
1711
0
    req.set_tablet_id(tablet_meta.tablet_id());
1712
0
    req.set_expiration(config::snapshot_expire_time_sec);
1713
0
    req.set_action(RestoreJobRequest::PREPARE);
1714
1715
0
    doris_tablet_meta_to_cloud(req.mutable_tablet_meta(), std::move(tablet_meta));
1716
0
    return retry_rpc(MetaServiceRPC::PREPARE_RESTORE_JOB, req, &resp,
1717
0
                     &MetaService_Stub::prepare_restore_job,
1718
0
                     {
1719
0
                             .host_limiters = host_level_ms_rpc_rate_limiters_,
1720
0
                             .backpressure_handler = ms_backpressure_handler_,
1721
0
                     });
1722
0
}
1723
1724
0
Status CloudMetaMgr::commit_restore_job(const int64_t tablet_id) {
1725
0
    VLOG_DEBUG << "commit restore job, tablet_id: " << tablet_id;
1726
0
    RestoreJobRequest req;
1727
0
    RestoreJobResponse resp;
1728
0
    req.set_cloud_unique_id(config::cloud_unique_id);
1729
0
    req.set_tablet_id(tablet_id);
1730
0
    req.set_action(RestoreJobRequest::COMMIT);
1731
0
    req.set_store_version(config::delete_bitmap_store_write_version);
1732
1733
0
    return retry_rpc(MetaServiceRPC::COMMIT_RESTORE_JOB, req, &resp,
1734
0
                     &MetaService_Stub::commit_restore_job,
1735
0
                     {
1736
0
                             .host_limiters = host_level_ms_rpc_rate_limiters_,
1737
0
                             .backpressure_handler = ms_backpressure_handler_,
1738
0
                     });
1739
0
}
1740
1741
0
Status CloudMetaMgr::finish_restore_job(const int64_t tablet_id, bool is_completed) {
1742
0
    VLOG_DEBUG << "finish restore job, tablet_id: " << tablet_id
1743
0
               << ", is_completed: " << is_completed;
1744
0
    RestoreJobRequest req;
1745
0
    RestoreJobResponse resp;
1746
0
    req.set_cloud_unique_id(config::cloud_unique_id);
1747
0
    req.set_tablet_id(tablet_id);
1748
0
    req.set_action(is_completed ? RestoreJobRequest::COMPLETE : RestoreJobRequest::ABORT);
1749
1750
0
    return retry_rpc(MetaServiceRPC::FINISH_RESTORE_JOB, req, &resp,
1751
0
                     &MetaService_Stub::finish_restore_job,
1752
0
                     {
1753
0
                             .host_limiters = host_level_ms_rpc_rate_limiters_,
1754
0
                             .backpressure_handler = ms_backpressure_handler_,
1755
0
                     });
1756
0
}
1757
1758
107
Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos, bool* is_vault_mode) {
1759
107
    GetObjStoreInfoRequest req;
1760
107
    GetObjStoreInfoResponse resp;
1761
107
    req.set_cloud_unique_id(config::cloud_unique_id);
1762
107
    Status s = retry_rpc(MetaServiceRPC::GET_OBJ_STORE_INFO, req, &resp,
1763
107
                         &MetaService_Stub::get_obj_store_info,
1764
107
                         {
1765
107
                                 .host_limiters = host_level_ms_rpc_rate_limiters_,
1766
107
                                 .backpressure_handler = ms_backpressure_handler_,
1767
107
                         });
1768
107
    if (!s.ok()) {
1769
0
        return s;
1770
0
    }
1771
1772
107
    *is_vault_mode = resp.enable_storage_vault();
1773
1774
107
    auto add_obj_store = [&vault_infos](const auto& obj_store) {
1775
107
        vault_infos->emplace_back(obj_store.id(), S3Conf::get_s3_conf(obj_store),
1776
107
                                  StorageVaultPB_PathFormat {});
1777
107
    };
1778
1779
107
    std::ranges::for_each(resp.obj_info(), add_obj_store);
1780
107
    std::ranges::for_each(resp.storage_vault(), [&](const auto& vault) {
1781
0
        if (vault.has_hdfs_info()) {
1782
0
            vault_infos->emplace_back(vault.id(), vault.hdfs_info(), vault.path_format());
1783
0
        }
1784
0
        if (vault.has_obj_info()) {
1785
0
            add_obj_store(vault.obj_info());
1786
0
        }
1787
0
    });
1788
1789
    // desensitization, hide secret
1790
214
    for (int i = 0; i < resp.obj_info_size(); ++i) {
1791
107
        resp.mutable_obj_info(i)->set_sk(resp.obj_info(i).sk().substr(0, 2) + "xxx");
1792
107
    }
1793
107
    for (int i = 0; i < resp.storage_vault_size(); ++i) {
1794
0
        auto* j = resp.mutable_storage_vault(i);
1795
0
        if (!j->has_obj_info()) continue;
1796
0
        j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx");
1797
0
    }
1798
1799
214
    for (int i = 0; i < resp.obj_info_size(); ++i) {
1800
107
        resp.mutable_obj_info(i)->set_ak(hide_access_key(resp.obj_info(i).sk()));
1801
107
    }
1802
107
    for (int i = 0; i < resp.storage_vault_size(); ++i) {
1803
0
        auto* j = resp.mutable_storage_vault(i);
1804
0
        if (!j->has_obj_info()) continue;
1805
0
        j->mutable_obj_info()->set_sk(hide_access_key(j->obj_info().sk()));
1806
0
    }
1807
1808
107
    LOG(INFO) << "get storage vault, enable_storage_vault=" << *is_vault_mode
1809
107
              << " response=" << resp.ShortDebugString();
1810
107
    return Status::OK();
1811
107
}
1812
1813
18.4k
Status CloudMetaMgr::prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res) {
1814
18.4k
    VLOG_DEBUG << "prepare_tablet_job: " << job.ShortDebugString();
1815
18.4k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_tablet_job", Status::OK(), job, res);
1816
1817
18.4k
    StartTabletJobRequest req;
1818
18.4k
    req.mutable_job()->CopyFrom(job);
1819
18.4k
    req.set_cloud_unique_id(config::cloud_unique_id);
1820
18.4k
    return retry_rpc(MetaServiceRPC::START_TABLET_JOB, req, res,
1821
18.4k
                     &MetaService_Stub::start_tablet_job,
1822
18.4k
                     {
1823
18.4k
                             .host_limiters = host_level_ms_rpc_rate_limiters_,
1824
18.4k
                             .backpressure_handler = ms_backpressure_handler_,
1825
18.4k
                     });
1826
18.4k
}
1827
1828
16.8k
Status CloudMetaMgr::commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJobResponse* res) {
1829
18.4E
    VLOG_DEBUG << "commit_tablet_job: " << job.ShortDebugString();
1830
16.8k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_tablet_job", Status::OK(), job, res);
1831
16.8k
    DBUG_EXECUTE_IF("CloudMetaMgr::commit_tablet_job.fail", {
1832
16.8k
        return Status::InternalError<false>("inject CloudMetaMgr::commit_tablet_job.fail");
1833
16.8k
    });
1834
1835
16.8k
    FinishTabletJobRequest req;
1836
16.8k
    req.mutable_job()->CopyFrom(job);
1837
16.8k
    req.set_action(FinishTabletJobRequest::COMMIT);
1838
16.8k
    req.set_cloud_unique_id(config::cloud_unique_id);
1839
16.8k
    auto st = retry_rpc(MetaServiceRPC::FINISH_TABLET_JOB, req, res,
1840
16.8k
                        &MetaService_Stub::finish_tablet_job,
1841
16.8k
                        {
1842
16.8k
                                .host_limiters = host_level_ms_rpc_rate_limiters_,
1843
16.8k
                                .backpressure_handler = ms_backpressure_handler_,
1844
16.8k
                        });
1845
16.8k
    if (res->status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) {
1846
0
        return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
1847
0
                "txn conflict when commit tablet job {}", job.ShortDebugString());
1848
0
    }
1849
1850
16.9k
    if (st.ok() && !job.compaction().empty() && job.has_idx()) {
1851
6.89k
        CommitTxnResponse commit_txn_resp;
1852
6.89k
        std::vector<int64_t> tablet_ids = {job.idx().tablet_id()};
1853
6.89k
        send_stats_to_fe_async(-1, -1, "", commit_txn_resp, tablet_ids);
1854
6.89k
    }
1855
1856
16.8k
    return st;
1857
16.8k
}
1858
1859
53
Status CloudMetaMgr::abort_tablet_job(const TabletJobInfoPB& job) {
1860
53
    VLOG_DEBUG << "abort_tablet_job: " << job.ShortDebugString();
1861
53
    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::abort_tablet_job", Status::OK(), job);
1862
53
    FinishTabletJobRequest req;
1863
53
    FinishTabletJobResponse res;
1864
53
    req.mutable_job()->CopyFrom(job);
1865
53
    req.set_action(FinishTabletJobRequest::ABORT);
1866
53
    req.set_cloud_unique_id(config::cloud_unique_id);
1867
53
    return retry_rpc(MetaServiceRPC::FINISH_TABLET_JOB, req, &res,
1868
53
                     &MetaService_Stub::finish_tablet_job,
1869
53
                     {
1870
53
                             .host_limiters = host_level_ms_rpc_rate_limiters_,
1871
53
                             .backpressure_handler = ms_backpressure_handler_,
1872
53
                     });
1873
53
}
1874
1875
182
Status CloudMetaMgr::lease_tablet_job(const TabletJobInfoPB& job) {
1876
182
    VLOG_DEBUG << "lease_tablet_job: " << job.ShortDebugString();
1877
182
    FinishTabletJobRequest req;
1878
182
    FinishTabletJobResponse res;
1879
182
    req.mutable_job()->CopyFrom(job);
1880
182
    req.set_action(FinishTabletJobRequest::LEASE);
1881
182
    req.set_cloud_unique_id(config::cloud_unique_id);
1882
182
    return retry_rpc(MetaServiceRPC::FINISH_TABLET_JOB, req, &res,
1883
182
                     &MetaService_Stub::finish_tablet_job,
1884
182
                     {
1885
182
                             .host_limiters = host_level_ms_rpc_rate_limiters_,
1886
182
                             .backpressure_handler = ms_backpressure_handler_,
1887
182
                     });
1888
182
}
1889
1890
static void add_delete_bitmap(DeleteBitmapPB& delete_bitmap_pb, const DeleteBitmap::BitmapKey& key,
1891
0
                              roaring::Roaring& bitmap) {
1892
0
    delete_bitmap_pb.add_rowset_ids(std::get<0>(key).to_string());
1893
0
    delete_bitmap_pb.add_segment_ids(std::get<1>(key));
1894
0
    delete_bitmap_pb.add_versions(std::get<2>(key));
1895
    // To save space, convert array and bitmap containers to run containers
1896
0
    bitmap.runOptimize();
1897
0
    std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
1898
0
    bitmap.write(bitmap_data.data());
1899
0
    *(delete_bitmap_pb.add_segment_delete_bitmaps()) = std::move(bitmap_data);
1900
0
}
1901
1902
static Status store_delete_bitmap(std::string& rowset_id, DeleteBitmapPB& delete_bitmap_pb,
1903
                                  int64_t tablet_id,
1904
                                  std::optional<StorageResource> storage_resource,
1905
0
                                  UpdateDeleteBitmapRequest& req, int64_t txn_id) {
1906
0
    if (config::enable_mow_verbose_log) {
1907
0
        std::stringstream ss;
1908
0
        for (int i = 0; i < delete_bitmap_pb.rowset_ids_size(); i++) {
1909
0
            ss << "{rid=" << delete_bitmap_pb.rowset_ids(i)
1910
0
               << ", sid=" << delete_bitmap_pb.segment_ids(i)
1911
0
               << ", ver=" << delete_bitmap_pb.versions(i) << "}, ";
1912
0
        }
1913
0
        LOG(INFO) << "handle one rowset delete bitmap for tablet_id: " << tablet_id
1914
0
                  << ", rowset_id: " << rowset_id
1915
0
                  << ", delete_bitmap num: " << delete_bitmap_pb.rowset_ids_size()
1916
0
                  << ",  size: " << delete_bitmap_pb.ByteSizeLong() << ", keys=[" << ss.str()
1917
0
                  << "]";
1918
0
    }
1919
0
    if (delete_bitmap_pb.rowset_ids_size() == 0) {
1920
0
        return Status::OK();
1921
0
    }
1922
0
    DeleteBitmapStoragePB delete_bitmap_storage;
1923
0
    if (config::delete_bitmap_store_v2_max_bytes_in_fdb >= 0 &&
1924
0
        delete_bitmap_pb.ByteSizeLong() > config::delete_bitmap_store_v2_max_bytes_in_fdb) {
1925
        // Enable packed file only for load (txn_id > 0)
1926
0
        bool enable_packed = config::enable_packed_file && txn_id > 0;
1927
0
        DeleteBitmapFileWriter file_writer(tablet_id, rowset_id, storage_resource, enable_packed,
1928
0
                                           txn_id);
1929
0
        RETURN_IF_ERROR(file_writer.init());
1930
0
        RETURN_IF_ERROR(file_writer.write(delete_bitmap_pb));
1931
0
        RETURN_IF_ERROR(file_writer.close());
1932
0
        delete_bitmap_pb.Clear();
1933
0
        delete_bitmap_storage.set_store_in_fdb(false);
1934
1935
        // Store packed slice location if file was written to packed file
1936
0
        if (file_writer.is_packed()) {
1937
0
            io::PackedSliceLocation loc;
1938
0
            RETURN_IF_ERROR(file_writer.get_packed_slice_location(&loc));
1939
0
            auto* packed_loc = delete_bitmap_storage.mutable_packed_slice_location();
1940
0
            packed_loc->set_packed_file_path(loc.packed_file_path);
1941
0
            packed_loc->set_offset(loc.offset);
1942
0
            packed_loc->set_size(loc.size);
1943
0
            packed_loc->set_packed_file_size(loc.packed_file_size);
1944
0
        }
1945
0
    } else {
1946
0
        delete_bitmap_storage.set_store_in_fdb(true);
1947
0
        *(delete_bitmap_storage.mutable_delete_bitmap()) = std::move(delete_bitmap_pb);
1948
0
    }
1949
0
    req.add_delta_rowset_ids(rowset_id);
1950
0
    *(req.add_delete_bitmap_storages()) = std::move(delete_bitmap_storage);
1951
0
    return Status::OK();
1952
0
}
1953
1954
Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id,
1955
                                          int64_t initiator, DeleteBitmap* delete_bitmap,
1956
                                          DeleteBitmap* delete_bitmap_v2, std::string rowset_id,
1957
                                          std::optional<StorageResource> storage_resource,
1958
                                          int64_t store_version, int64_t table_id, int64_t txn_id,
1959
55.0k
                                          bool is_explicit_txn, int64_t next_visible_version) {
1960
55.0k
    VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet.tablet_id();
1961
55.0k
    if (config::enable_mow_verbose_log) {
1962
0
        std::stringstream ss;
1963
0
        ss << "start update delete bitmap for tablet_id: " << tablet.tablet_id()
1964
0
           << ", rowset_id: " << rowset_id
1965
0
           << ", delete_bitmap num: " << delete_bitmap->delete_bitmap.size()
1966
0
           << ", store_version: " << store_version << ", lock_id=" << lock_id
1967
0
           << ", initiator=" << initiator;
1968
0
        if (store_version == 2 || store_version == 3) {
1969
0
            ss << ", delete_bitmap v2 num: " << delete_bitmap_v2->delete_bitmap.size();
1970
0
        }
1971
0
        LOG(INFO) << ss.str();
1972
0
    }
1973
55.0k
    UpdateDeleteBitmapRequest req;
1974
55.0k
    UpdateDeleteBitmapResponse res;
1975
55.0k
    req.set_cloud_unique_id(config::cloud_unique_id);
1976
55.0k
    req.set_table_id(tablet.table_id());
1977
55.0k
    req.set_partition_id(tablet.partition_id());
1978
55.0k
    req.set_tablet_id(tablet.tablet_id());
1979
55.0k
    req.set_lock_id(lock_id);
1980
55.0k
    req.set_initiator(initiator);
1981
55.0k
    req.set_is_explicit_txn(is_explicit_txn);
1982
55.0k
    if (txn_id > 0) {
1983
49.0k
        req.set_txn_id(txn_id);
1984
49.0k
    }
1985
55.0k
    if (next_visible_version > 0) {
1986
49.0k
        req.set_next_visible_version(next_visible_version);
1987
49.0k
    }
1988
55.0k
    req.set_store_version(store_version);
1989
1990
55.0k
    bool write_v1 = store_version == 1 || store_version == 3;
1991
55.0k
    bool write_v2 = store_version == 2 || store_version == 3;
1992
    // write v1 kvs
1993
55.0k
    if (write_v1) {
1994
53.8k
        for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
1995
8.55k
            req.add_rowset_ids(std::get<0>(key).to_string());
1996
8.55k
            req.add_segment_ids(std::get<1>(key));
1997
8.55k
            req.add_versions(std::get<2>(key));
1998
            // To save space, convert array and bitmap containers to run containers
1999
8.55k
            bitmap.runOptimize();
2000
8.55k
            std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
2001
8.55k
            bitmap.write(bitmap_data.data());
2002
8.55k
            *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
2003
8.55k
        }
2004
53.8k
    }
2005
2006
    // write v2 kvs
2007
55.0k
    if (write_v2) {
2008
0
        if (config::enable_mow_verbose_log) {
2009
0
            LOG(INFO) << "update delete bitmap for tablet_id: " << tablet.tablet_id()
2010
0
                      << ", rowset_id: " << rowset_id
2011
0
                      << ", delete_bitmap num: " << delete_bitmap_v2->delete_bitmap.size()
2012
0
                      << ", lock_id=" << lock_id << ", initiator=" << initiator;
2013
0
        }
2014
0
        if (rowset_id.empty()) {
2015
0
            std::string pre_rowset_id = "";
2016
0
            std::string cur_rowset_id = "";
2017
0
            DeleteBitmapPB delete_bitmap_pb;
2018
0
            for (auto it = delete_bitmap_v2->delete_bitmap.begin();
2019
0
                 it != delete_bitmap_v2->delete_bitmap.end(); ++it) {
2020
0
                auto& key = it->first;
2021
0
                auto& bitmap = it->second;
2022
0
                cur_rowset_id = std::get<0>(key).to_string();
2023
0
                if (cur_rowset_id != pre_rowset_id) {
2024
0
                    if (!pre_rowset_id.empty() && delete_bitmap_pb.rowset_ids_size() > 0) {
2025
0
                        RETURN_IF_ERROR(store_delete_bitmap(pre_rowset_id, delete_bitmap_pb,
2026
0
                                                            tablet.tablet_id(), storage_resource,
2027
0
                                                            req, txn_id));
2028
0
                    }
2029
0
                    pre_rowset_id = cur_rowset_id;
2030
0
                    DCHECK_EQ(delete_bitmap_pb.rowset_ids_size(), 0);
2031
0
                    DCHECK_EQ(delete_bitmap_pb.segment_ids_size(), 0);
2032
0
                    DCHECK_EQ(delete_bitmap_pb.versions_size(), 0);
2033
0
                    DCHECK_EQ(delete_bitmap_pb.segment_delete_bitmaps_size(), 0);
2034
0
                }
2035
0
                add_delete_bitmap(delete_bitmap_pb, key, bitmap);
2036
0
            }
2037
0
            if (delete_bitmap_pb.rowset_ids_size() > 0) {
2038
0
                DCHECK(!cur_rowset_id.empty());
2039
0
                RETURN_IF_ERROR(store_delete_bitmap(cur_rowset_id, delete_bitmap_pb,
2040
0
                                                    tablet.tablet_id(), storage_resource, req,
2041
0
                                                    txn_id));
2042
0
            }
2043
0
        } else {
2044
0
            DeleteBitmapPB delete_bitmap_pb;
2045
0
            for (auto& [key, bitmap] : delete_bitmap_v2->delete_bitmap) {
2046
0
                add_delete_bitmap(delete_bitmap_pb, key, bitmap);
2047
0
            }
2048
0
            RETURN_IF_ERROR(store_delete_bitmap(rowset_id, delete_bitmap_pb, tablet.tablet_id(),
2049
0
                                                storage_resource, req, txn_id));
2050
0
        }
2051
0
        DCHECK_EQ(req.delta_rowset_ids_size(), req.delete_bitmap_storages_size());
2052
0
    }
2053
55.0k
    DBUG_EXECUTE_IF("CloudMetaMgr::test_update_big_delete_bitmap", {
2054
55.0k
        LOG(INFO) << "test_update_big_delete_bitmap for tablet " << tablet.tablet_id();
2055
55.0k
        auto count = dp->param<int>("count", 30000);
2056
55.0k
        if (!delete_bitmap->delete_bitmap.empty()) {
2057
55.0k
            auto& key = delete_bitmap->delete_bitmap.begin()->first;
2058
55.0k
            auto& bitmap = delete_bitmap->delete_bitmap.begin()->second;
2059
55.0k
            for (int i = 1000; i < (1000 + count); i++) {
2060
55.0k
                req.add_rowset_ids(std::get<0>(key).to_string());
2061
55.0k
                req.add_segment_ids(std::get<1>(key));
2062
55.0k
                req.add_versions(i);
2063
                // To save space, convert array and bitmap containers to run containers
2064
55.0k
                bitmap.runOptimize();
2065
55.0k
                std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
2066
55.0k
                bitmap.write(bitmap_data.data());
2067
55.0k
                *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
2068
55.0k
            }
2069
55.0k
        }
2070
55.0k
    });
2071
55.0k
    DBUG_EXECUTE_IF("CloudMetaMgr::test_update_delete_bitmap_fail", {
2072
55.0k
        return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
2073
55.0k
                "test update delete bitmap failed, tablet_id: {}, lock_id: {}", tablet.tablet_id(),
2074
55.0k
                lock_id);
2075
55.0k
    });
2076
55.0k
    auto st = retry_rpc(MetaServiceRPC::UPDATE_DELETE_BITMAP, req, &res,
2077
55.0k
                        &MetaService_Stub::update_delete_bitmap,
2078
55.0k
                        {
2079
55.0k
                                .host_limiters = host_level_ms_rpc_rate_limiters_,
2080
55.0k
                                .backpressure_handler = ms_backpressure_handler_,
2081
55.0k
                                .table_id = table_id,
2082
55.0k
                        });
2083
55.0k
    if (config::enable_update_delete_bitmap_kv_check_core &&
2084
55.0k
        res.status().code() == MetaServiceCode::UPDATE_OVERRIDE_EXISTING_KV) {
2085
0
        auto& msg = res.status().msg();
2086
0
        LOG_WARNING(msg);
2087
0
        CHECK(false) << msg;
2088
0
    }
2089
55.0k
    if (res.status().code() == MetaServiceCode::LOCK_EXPIRED) {
2090
7
        return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
2091
7
                "lock expired when update delete bitmap, tablet_id: {}, lock_id: {}, initiator: "
2092
7
                "{}, error_msg: {}",
2093
7
                tablet.tablet_id(), lock_id, initiator, res.status().msg());
2094
7
    }
2095
55.0k
    return st;
2096
55.0k
}
2097
2098
Status CloudMetaMgr::cloud_update_delete_bitmap_without_lock(
2099
        const CloudTablet& tablet, DeleteBitmap* delete_bitmap,
2100
        std::map<std::string, int64_t>& rowset_to_versions, int64_t table_id,
2101
3.07k
        int64_t pre_rowset_agg_start_version, int64_t pre_rowset_agg_end_version) {
2102
3.07k
    if (config::delete_bitmap_store_write_version == 2) {
2103
0
        VLOG_DEBUG << "no need to agg delete bitmap v1 in ms because use v2";
2104
0
        return Status::OK();
2105
0
    }
2106
3.07k
    LOG(INFO) << "cloud_update_delete_bitmap_without_lock, tablet_id: " << tablet.tablet_id()
2107
3.07k
              << ", delete_bitmap size: " << delete_bitmap->delete_bitmap.size();
2108
3.07k
    UpdateDeleteBitmapRequest req;
2109
3.07k
    UpdateDeleteBitmapResponse res;
2110
3.07k
    req.set_cloud_unique_id(config::cloud_unique_id);
2111
3.07k
    req.set_table_id(tablet.table_id());
2112
3.07k
    req.set_partition_id(tablet.partition_id());
2113
3.07k
    req.set_tablet_id(tablet.tablet_id());
2114
    // use a fake lock id to resolve compatibility issues
2115
3.07k
    req.set_lock_id(-3);
2116
3.07k
    req.set_without_lock(true);
2117
3.07k
    for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
2118
978
        req.add_rowset_ids(std::get<0>(key).to_string());
2119
978
        req.add_segment_ids(std::get<1>(key));
2120
978
        req.add_versions(std::get<2>(key));
2121
978
        if (pre_rowset_agg_end_version > 0) {
2122
978
            DCHECK(rowset_to_versions.find(std::get<0>(key).to_string()) !=
2123
0
                   rowset_to_versions.end())
2124
0
                    << "rowset_to_versions not found for key=" << std::get<0>(key).to_string();
2125
978
            req.add_pre_rowset_versions(rowset_to_versions[std::get<0>(key).to_string()]);
2126
978
        }
2127
978
        DCHECK(pre_rowset_agg_end_version <= 0 || pre_rowset_agg_end_version == std::get<2>(key))
2128
0
                << "pre_rowset_agg_end_version=" << pre_rowset_agg_end_version
2129
0
                << " not equal to version=" << std::get<2>(key);
2130
        // To save space, convert array and bitmap containers to run containers
2131
978
        bitmap.runOptimize();
2132
978
        std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
2133
978
        bitmap.write(bitmap_data.data());
2134
978
        *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
2135
978
    }
2136
3.07k
    if (pre_rowset_agg_start_version > 0 && pre_rowset_agg_end_version > 0) {
2137
3.07k
        req.set_pre_rowset_agg_start_version(pre_rowset_agg_start_version);
2138
3.07k
        req.set_pre_rowset_agg_end_version(pre_rowset_agg_end_version);
2139
3.07k
    }
2140
3.07k
    return retry_rpc(MetaServiceRPC::UPDATE_DELETE_BITMAP, req, &res,
2141
3.07k
                     &MetaService_Stub::update_delete_bitmap,
2142
3.07k
                     {
2143
3.07k
                             .host_limiters = host_level_ms_rpc_rate_limiters_,
2144
3.07k
                             .backpressure_handler = ms_backpressure_handler_,
2145
3.07k
                             .table_id = table_id,
2146
3.07k
                     });
2147
3.07k
}
2148
2149
Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
2150
4.33k
                                                   int64_t initiator) {
2151
4.33k
    DBUG_EXECUTE_IF("get_delete_bitmap_update_lock.inject_fail", {
2152
4.33k
        auto p = dp->param("percent", 0.01);
2153
4.33k
        std::mt19937 gen {std::random_device {}()};
2154
4.33k
        std::bernoulli_distribution inject_fault {p};
2155
4.33k
        if (inject_fault(gen)) {
2156
4.33k
            return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
2157
4.33k
                    "injection error when get get_delete_bitmap_update_lock, "
2158
4.33k
                    "tablet_id={}, lock_id={}, initiator={}",
2159
4.33k
                    tablet.tablet_id(), lock_id, initiator);
2160
4.33k
        }
2161
4.33k
    });
2162
4.33k
    VLOG_DEBUG << "get_delete_bitmap_update_lock , tablet_id: " << tablet.tablet_id()
2163
0
               << ",lock_id:" << lock_id;
2164
4.33k
    GetDeleteBitmapUpdateLockRequest req;
2165
4.33k
    GetDeleteBitmapUpdateLockResponse res;
2166
4.33k
    req.set_cloud_unique_id(config::cloud_unique_id);
2167
4.33k
    req.set_table_id(tablet.table_id());
2168
4.33k
    req.set_lock_id(lock_id);
2169
4.33k
    req.set_initiator(initiator);
2170
    // set expiration time for compaction and schema_change
2171
4.33k
    req.set_expiration(config::delete_bitmap_lock_expiration_seconds);
2172
4.33k
    int retry_times = 0;
2173
4.33k
    Status st;
2174
4.33k
    std::default_random_engine rng = make_random_engine();
2175
4.33k
    std::uniform_int_distribution<uint32_t> u(500, 2000);
2176
4.33k
    uint64_t backoff_sleep_time_ms {0};
2177
4.76k
    do {
2178
4.76k
        bool test_conflict = false;
2179
4.76k
        st = retry_rpc(MetaServiceRPC::GET_DELETE_BITMAP_UPDATE_LOCK, req, &res,
2180
4.76k
                       &MetaService_Stub::get_delete_bitmap_update_lock,
2181
4.76k
                       {
2182
4.76k
                               .host_limiters = host_level_ms_rpc_rate_limiters_,
2183
4.76k
                               .backpressure_handler = ms_backpressure_handler_,
2184
4.76k
                       });
2185
4.76k
        DBUG_EXECUTE_IF("CloudMetaMgr::test_get_delete_bitmap_update_lock_conflict",
2186
4.76k
                        { test_conflict = true; });
2187
4.76k
        if (!test_conflict && res.status().code() != MetaServiceCode::LOCK_CONFLICT) {
2188
4.33k
            break;
2189
4.33k
        }
2190
2191
427
        uint32_t duration_ms = u(rng);
2192
427
        LOG(WARNING) << "get delete bitmap lock conflict. " << debug_info(req)
2193
427
                     << " retry_times=" << retry_times << " sleep=" << duration_ms
2194
427
                     << "ms : " << res.status().msg();
2195
427
        auto start = std::chrono::steady_clock::now();
2196
427
        bthread_usleep(duration_ms * 1000);
2197
427
        auto end = std::chrono::steady_clock::now();
2198
427
        backoff_sleep_time_ms += duration_cast<std::chrono::milliseconds>(end - start).count();
2199
427
    } while (++retry_times <= config::get_delete_bitmap_lock_max_retry_times);
2200
0
    g_cloud_be_mow_get_dbm_lock_backoff_sleep_time << backoff_sleep_time_ms;
2201
4.33k
    DBUG_EXECUTE_IF("CloudMetaMgr.get_delete_bitmap_update_lock.inject_sleep", {
2202
4.33k
        auto p = dp->param("percent", 0.01);
2203
        // 100s > Config.calculate_delete_bitmap_task_timeout_seconds = 60s
2204
4.33k
        auto sleep_time = dp->param("sleep", 15);
2205
4.33k
        std::mt19937 gen {std::random_device {}()};
2206
4.33k
        std::bernoulli_distribution inject_fault {p};
2207
4.33k
        if (inject_fault(gen)) {
2208
4.33k
            LOG_INFO("injection sleep for {} seconds, tablet_id={}", sleep_time,
2209
4.33k
                     tablet.tablet_id());
2210
4.33k
            std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
2211
4.33k
        }
2212
4.33k
    });
2213
4.33k
    if (res.status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) {
2214
0
        return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
2215
0
                "txn conflict when get delete bitmap update lock, table_id {}, lock_id {}, "
2216
0
                "initiator {}",
2217
0
                tablet.table_id(), lock_id, initiator);
2218
4.33k
    } else if (res.status().code() == MetaServiceCode::LOCK_CONFLICT) {
2219
0
        return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
2220
0
                "lock conflict when get delete bitmap update lock, table_id {}, lock_id {}, "
2221
0
                "initiator {}",
2222
0
                tablet.table_id(), lock_id, initiator);
2223
0
    }
2224
4.33k
    return st;
2225
4.33k
}
2226
2227
void CloudMetaMgr::remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id,
2228
40
                                                    int64_t initiator, int64_t tablet_id) {
2229
40
    LOG(INFO) << "remove_delete_bitmap_update_lock ,table_id: " << table_id
2230
40
              << ",lock_id:" << lock_id << ",initiator:" << initiator << ",tablet_id:" << tablet_id;
2231
40
    RemoveDeleteBitmapUpdateLockRequest req;
2232
40
    RemoveDeleteBitmapUpdateLockResponse res;
2233
40
    req.set_cloud_unique_id(config::cloud_unique_id);
2234
40
    req.set_table_id(table_id);
2235
40
    req.set_tablet_id(tablet_id);
2236
40
    req.set_lock_id(lock_id);
2237
40
    req.set_initiator(initiator);
2238
40
    auto st = retry_rpc(MetaServiceRPC::REMOVE_DELETE_BITMAP_UPDATE_LOCK, req, &res,
2239
40
                        &MetaService_Stub::remove_delete_bitmap_update_lock,
2240
40
                        {
2241
40
                                .host_limiters = host_level_ms_rpc_rate_limiters_,
2242
40
                                .backpressure_handler = ms_backpressure_handler_,
2243
40
                        });
2244
40
    if (!st.ok()) {
2245
39
        LOG(WARNING) << "remove delete bitmap update lock fail,table_id=" << table_id
2246
39
                     << ",tablet_id=" << tablet_id << ",lock_id=" << lock_id
2247
39
                     << ",st=" << st.to_string();
2248
39
    }
2249
40
}
2250
2251
190k
void CloudMetaMgr::check_table_size_correctness(RowsetMeta& rs_meta) {
2252
191k
    if (!config::enable_table_size_correctness_check) {
2253
191k
        return;
2254
191k
    }
2255
18.4E
    int64_t total_segment_size = get_segment_file_size(rs_meta);
2256
18.4E
    int64_t total_inverted_index_size = get_inverted_index_file_size(rs_meta);
2257
18.4E
    if (rs_meta.data_disk_size() != total_segment_size ||
2258
18.4E
        rs_meta.index_disk_size() != total_inverted_index_size ||
2259
18.4E
        rs_meta.data_disk_size() + rs_meta.index_disk_size() != rs_meta.total_disk_size()) {
2260
0
        LOG(WARNING) << "[Cloud table table size check failed]:"
2261
0
                     << " tablet id: " << rs_meta.tablet_id()
2262
0
                     << ", rowset id:" << rs_meta.rowset_id()
2263
0
                     << ", rowset data disk size:" << rs_meta.data_disk_size()
2264
0
                     << ", rowset real data disk size:" << total_segment_size
2265
0
                     << ", rowset index disk size:" << rs_meta.index_disk_size()
2266
0
                     << ", rowset real index disk size:" << total_inverted_index_size
2267
0
                     << ", rowset total disk size:" << rs_meta.total_disk_size()
2268
0
                     << ", rowset segment path:"
2269
0
                     << StorageResource().remote_segment_path(rs_meta.tablet_id(),
2270
0
                                                              rs_meta.rowset_id().to_string(), 0);
2271
0
        DCHECK(false);
2272
0
    }
2273
18.4E
}
2274
2275
0
int64_t CloudMetaMgr::get_segment_file_size(RowsetMeta& rs_meta) {
2276
0
    int64_t total_segment_size = 0;
2277
0
    const auto fs = rs_meta.fs();
2278
0
    if (!fs) {
2279
0
        LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id();
2280
0
    }
2281
0
    for (int64_t seg_id = 0; seg_id < rs_meta.num_segments(); seg_id++) {
2282
0
        std::string segment_path = StorageResource().remote_segment_path(
2283
0
                rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);
2284
0
        int64_t segment_file_size = 0;
2285
0
        auto st = fs->file_size(segment_path, &segment_file_size);
2286
0
        if (!st.ok()) {
2287
0
            segment_file_size = 0;
2288
0
            if (st.is<NOT_FOUND>()) {
2289
0
                LOG(INFO) << "cloud table size correctness check get segment size 0 because "
2290
0
                             "file not exist! msg:"
2291
0
                          << st.msg() << ", segment path:" << segment_path;
2292
0
            } else {
2293
0
                LOG(WARNING) << "cloud table size correctness check get segment size failed! msg:"
2294
0
                             << st.msg() << ", segment path:" << segment_path;
2295
0
            }
2296
0
        }
2297
0
        total_segment_size += segment_file_size;
2298
0
    }
2299
0
    return total_segment_size;
2300
0
}
2301
2302
0
int64_t CloudMetaMgr::get_inverted_index_file_size(RowsetMeta& rs_meta) {
2303
0
    int64_t total_inverted_index_size = 0;
2304
0
    const auto fs = rs_meta.fs();
2305
0
    if (!fs) {
2306
0
        LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id();
2307
0
    }
2308
0
    if (rs_meta.tablet_schema()->get_inverted_index_storage_format() ==
2309
0
        InvertedIndexStorageFormatPB::V1) {
2310
0
        const auto& indices = rs_meta.tablet_schema()->inverted_indexes();
2311
0
        for (auto& index : indices) {
2312
0
            for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) {
2313
0
                std::string segment_path = StorageResource().remote_segment_path(
2314
0
                        rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);
2315
0
                int64_t file_size = 0;
2316
2317
0
                std::string inverted_index_file_path =
2318
0
                        InvertedIndexDescriptor::get_index_file_path_v1(
2319
0
                                InvertedIndexDescriptor::get_index_file_path_prefix(segment_path),
2320
0
                                index->index_id(), index->get_index_suffix());
2321
0
                auto st = fs->file_size(inverted_index_file_path, &file_size);
2322
0
                if (!st.ok()) {
2323
0
                    file_size = 0;
2324
0
                    if (st.is<NOT_FOUND>()) {
2325
0
                        LOG(INFO) << "cloud table size correctness check get inverted index v1 "
2326
0
                                     "0 because file not exist! msg:"
2327
0
                                  << st.msg()
2328
0
                                  << ", inverted index path:" << inverted_index_file_path;
2329
0
                    } else {
2330
0
                        LOG(WARNING)
2331
0
                                << "cloud table size correctness check get inverted index v1 "
2332
0
                                   "size failed! msg:"
2333
0
                                << st.msg() << ", inverted index path:" << inverted_index_file_path;
2334
0
                    }
2335
0
                }
2336
0
                total_inverted_index_size += file_size;
2337
0
            }
2338
0
        }
2339
0
    } else {
2340
0
        for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) {
2341
0
            int64_t file_size = 0;
2342
0
            std::string segment_path = StorageResource().remote_segment_path(
2343
0
                    rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);
2344
2345
0
            std::string inverted_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2(
2346
0
                    InvertedIndexDescriptor::get_index_file_path_prefix(segment_path));
2347
0
            auto st = fs->file_size(inverted_index_file_path, &file_size);
2348
0
            if (!st.ok()) {
2349
0
                file_size = 0;
2350
0
                if (st.is<NOT_FOUND>()) {
2351
0
                    LOG(INFO) << "cloud table size correctness check get inverted index v2 "
2352
0
                                 "0 because file not exist! msg:"
2353
0
                              << st.msg() << ", inverted index path:" << inverted_index_file_path;
2354
0
                } else {
2355
0
                    LOG(WARNING) << "cloud table size correctness check get inverted index v2 "
2356
0
                                    "size failed! msg:"
2357
0
                                 << st.msg()
2358
0
                                 << ", inverted index path:" << inverted_index_file_path;
2359
0
                }
2360
0
            }
2361
0
            total_inverted_index_size += file_size;
2362
0
        }
2363
0
    }
2364
0
    return total_inverted_index_size;
2365
0
}
2366
2367
Status CloudMetaMgr::fill_version_holes(CloudTablet* tablet, int64_t max_version,
2368
169k
                                        std::unique_lock<std::shared_mutex>& wlock) {
2369
169k
    if (max_version <= 0) {
2370
110k
        return Status::OK();
2371
110k
    }
2372
2373
59.1k
    Versions existing_versions;
2374
220k
    for (const auto& [_, rs] : tablet->tablet_meta()->all_rs_metas()) {
2375
220k
        existing_versions.emplace_back(rs->version());
2376
220k
    }
2377
2378
    // If there are no existing versions, it may be a new tablet for restore, so skip filling holes.
2379
59.1k
    if (existing_versions.empty()) {
2380
1
        return Status::OK();
2381
1
    }
2382
2383
59.1k
    std::vector<RowsetSharedPtr> hole_rowsets;
2384
    // sort the existing versions in ascending order
2385
59.1k
    std::sort(existing_versions.begin(), existing_versions.end(),
2386
496k
              [](const Version& a, const Version& b) {
2387
                  // simple because 2 versions are certainly not overlapping
2388
496k
                  return a.first < b.first;
2389
496k
              });
2390
2391
    // During schema change, get_tablet operations on new tablets trigger sync_tablet_rowsets which calls
2392
    // fill_version_holes. For schema change tablets (TABLET_NOTREADY state), we selectively skip hole
2393
    // filling for versions <= alter_version to prevent:
2394
    // 1. Abnormal compaction score calculations for schema change tablets
2395
    // 2. Unexpected -235 errors during load operations
2396
    // This allows schema change to proceed normally while still permitting hole filling for versions
2397
    // beyond the alter_version threshold.
2398
59.1k
    bool is_schema_change_tablet = tablet->tablet_state() == TABLET_NOTREADY;
2399
59.1k
    if (is_schema_change_tablet && tablet->alter_version() <= 1) {
2400
10.0k
        LOG(INFO) << "Skip version hole filling for new schema change tablet "
2401
10.0k
                  << tablet->tablet_id() << " with alter_version " << tablet->alter_version();
2402
10.0k
        return Status::OK();
2403
10.0k
    }
2404
2405
49.0k
    int64_t last_version = -1;
2406
210k
    for (const Version& version : existing_versions) {
2407
18.4E
        VLOG_NOTICE << "Existing version for tablet " << tablet->tablet_id() << ": ["
2408
18.4E
                    << version.first << ", " << version.second << "]";
2409
        // missing versions are those that are not in the existing_versions
2410
210k
        if (version.first > last_version + 1) {
2411
            // there is a hole between versions
2412
38
            auto prev_non_hole_rowset = tablet->get_rowset_by_version(version);
2413
1.49k
            for (int64_t ver = last_version + 1; ver < version.first; ++ver) {
2414
                // Skip hole filling for versions <= alter_version during schema change
2415
1.45k
                if (is_schema_change_tablet && ver <= tablet->alter_version()) {
2416
1.43k
                    continue;
2417
1.43k
                }
2418
14
                RowsetSharedPtr hole_rowset;
2419
14
                RETURN_IF_ERROR(create_empty_rowset_for_hole(
2420
14
                        tablet, ver, prev_non_hole_rowset->rowset_meta(), &hole_rowset));
2421
14
                hole_rowsets.push_back(hole_rowset);
2422
14
            }
2423
38
            LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1
2424
38
                      << " to " << version.first - 1 << " for tablet " << tablet->tablet_id()
2425
38
                      << (is_schema_change_tablet
2426
38
                                  ? (", schema change tablet skipped filling versions <= " +
2427
32
                                     std::to_string(tablet->alter_version()))
2428
38
                                  : "");
2429
38
        }
2430
210k
        last_version = version.second;
2431
210k
    }
2432
2433
49.0k
    if (last_version + 1 <= max_version) {
2434
5.49k
        LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1 << " to "
2435
5.49k
                  << max_version << " for tablet " << tablet->tablet_id()
2436
5.49k
                  << (is_schema_change_tablet
2437
5.49k
                              ? (", schema change tablet skipped filling versions <= " +
2438
5.08k
                                 std::to_string(tablet->alter_version()))
2439
5.49k
                              : "");
2440
        // there is a hole after the last existing version
2441
18.6k
        for (; last_version + 1 <= max_version; ++last_version) {
2442
            // Skip hole filling for versions <= alter_version during schema change
2443
13.1k
            if (is_schema_change_tablet && last_version + 1 <= tablet->alter_version()) {
2444
11.5k
                continue;
2445
11.5k
            }
2446
1.60k
            RowsetSharedPtr hole_rowset;
2447
1.60k
            auto prev_non_hole_rowset = tablet->get_rowset_by_version(existing_versions.back());
2448
1.60k
            RETURN_IF_ERROR(create_empty_rowset_for_hole(
2449
1.60k
                    tablet, last_version + 1, prev_non_hole_rowset->rowset_meta(), &hole_rowset));
2450
1.60k
            hole_rowsets.push_back(hole_rowset);
2451
1.60k
        }
2452
5.49k
    }
2453
2454
49.0k
    if (!hole_rowsets.empty()) {
2455
421
        size_t hole_count = hole_rowsets.size();
2456
421
        tablet->add_rowsets(std::move(hole_rowsets), false, wlock, false);
2457
421
        g_cloud_version_hole_filled_count << hole_count;
2458
421
    }
2459
49.0k
    return Status::OK();
2460
49.0k
}
2461
2462
Status CloudMetaMgr::create_empty_rowset_for_hole(CloudTablet* tablet, int64_t version,
2463
                                                  RowsetMetaSharedPtr prev_rowset_meta,
2464
1.61k
                                                  RowsetSharedPtr* rowset) {
2465
    // Create a RowsetMeta for the empty rowset
2466
1.61k
    auto rs_meta = std::make_shared<RowsetMeta>();
2467
2468
    // Generate a deterministic rowset ID for the hole (same tablet_id + version = same rowset_id)
2469
1.61k
    RowsetId hole_rowset_id;
2470
1.61k
    hole_rowset_id.init(2, 0, tablet->tablet_id(), version);
2471
1.61k
    rs_meta->set_rowset_id(hole_rowset_id);
2472
2473
    // Generate a deterministic load_id for the hole rowset (same tablet_id + version = same load_id)
2474
1.61k
    PUniqueId load_id;
2475
1.61k
    load_id.set_hi(tablet->tablet_id());
2476
1.61k
    load_id.set_lo(version);
2477
1.61k
    rs_meta->set_load_id(load_id);
2478
2479
    // Copy schema and other metadata from template
2480
1.61k
    rs_meta->set_tablet_schema(prev_rowset_meta->tablet_schema());
2481
1.61k
    rs_meta->set_rowset_type(prev_rowset_meta->rowset_type());
2482
1.61k
    rs_meta->set_tablet_schema_hash(prev_rowset_meta->tablet_schema_hash());
2483
1.61k
    rs_meta->set_resource_id(prev_rowset_meta->resource_id());
2484
2485
    // Basic tablet information
2486
1.61k
    rs_meta->set_tablet_id(tablet->tablet_id());
2487
1.61k
    rs_meta->set_index_id(tablet->index_id());
2488
1.61k
    rs_meta->set_partition_id(tablet->partition_id());
2489
1.61k
    rs_meta->set_tablet_uid(tablet->tablet_uid());
2490
1.61k
    rs_meta->set_version(Version(version, version));
2491
1.61k
    rs_meta->set_txn_id(version);
2492
2493
1.61k
    rs_meta->set_num_rows(0);
2494
1.61k
    rs_meta->set_total_disk_size(0);
2495
1.61k
    rs_meta->set_data_disk_size(0);
2496
1.61k
    rs_meta->set_index_disk_size(0);
2497
1.61k
    rs_meta->set_empty(true);
2498
1.61k
    rs_meta->set_num_segments(0);
2499
1.61k
    rs_meta->set_segments_overlap(NONOVERLAPPING);
2500
1.61k
    rs_meta->set_rowset_state(VISIBLE);
2501
1.61k
    rs_meta->set_creation_time(UnixSeconds());
2502
1.61k
    rs_meta->set_newest_write_timestamp(UnixSeconds());
2503
2504
1.61k
    Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, rowset);
2505
1.61k
    if (!s.ok()) {
2506
0
        LOG_WARNING("Failed to create empty rowset for hole")
2507
0
                .tag("tablet_id", tablet->tablet_id())
2508
0
                .tag("version", version)
2509
0
                .error(s);
2510
0
        return s;
2511
0
    }
2512
1.61k
    (*rowset)->set_hole_rowset(true);
2513
2514
1.61k
    return Status::OK();
2515
1.61k
}
2516
2517
2
Status CloudMetaMgr::list_snapshot(std::vector<SnapshotInfoPB>& snapshots) {
2518
2
    ListSnapshotRequest req;
2519
2
    ListSnapshotResponse res;
2520
2
    req.set_cloud_unique_id(config::cloud_unique_id);
2521
2
    req.set_include_aborted(true);
2522
2
    RETURN_IF_ERROR(retry_rpc(MetaServiceRPC::LIST_SNAPSHOTS, req, &res,
2523
2
                              &MetaService_Stub::list_snapshot,
2524
2
                              {
2525
2
                                      .host_limiters = host_level_ms_rpc_rate_limiters_,
2526
2
                                      .backpressure_handler = ms_backpressure_handler_,
2527
2
                              }));
2528
0
    for (auto& snapshot : res.snapshots()) {
2529
0
        snapshots.emplace_back(snapshot);
2530
0
    }
2531
0
    return Status::OK();
2532
2
}
2533
2534
Status CloudMetaMgr::get_snapshot_properties(SnapshotSwitchStatus& switch_status,
2535
                                             int64_t& max_reserved_snapshots,
2536
1
                                             int64_t& snapshot_interval_seconds) {
2537
1
    GetInstanceRequest req;
2538
1
    GetInstanceResponse res;
2539
1
    req.set_cloud_unique_id(config::cloud_unique_id);
2540
1
    RETURN_IF_ERROR(retry_rpc(MetaServiceRPC::GET_INSTANCE, req, &res,
2541
1
                              &MetaService_Stub::get_instance,
2542
1
                              {
2543
1
                                      .host_limiters = host_level_ms_rpc_rate_limiters_,
2544
1
                                      .backpressure_handler = ms_backpressure_handler_,
2545
1
                              }));
2546
1
    switch_status = res.instance().has_snapshot_switch_status()
2547
1
                            ? res.instance().snapshot_switch_status()
2548
1
                            : SnapshotSwitchStatus::SNAPSHOT_SWITCH_DISABLED;
2549
1
    max_reserved_snapshots =
2550
1
            res.instance().has_max_reserved_snapshot() ? res.instance().max_reserved_snapshot() : 0;
2551
1
    snapshot_interval_seconds = res.instance().has_snapshot_interval_seconds()
2552
1
                                        ? res.instance().snapshot_interval_seconds()
2553
1
                                        : 3600;
2554
1
    return Status::OK();
2555
1
}
2556
2557
Status CloudMetaMgr::update_packed_file_info(const std::string& packed_file_path,
2558
                                             const cloud::PackedFileInfoPB& packed_file_info,
2559
12.2k
                                             int64_t table_id) {
2560
12.2k
    VLOG_DEBUG << "Updating meta service for packed file: " << packed_file_path << " with "
2561
0
               << packed_file_info.total_slice_num() << " small files"
2562
0
               << ", total bytes: " << packed_file_info.total_slice_bytes();
2563
2564
    // Create request
2565
12.2k
    cloud::UpdatePackedFileInfoRequest req;
2566
12.2k
    cloud::UpdatePackedFileInfoResponse resp;
2567
2568
    // Set required fields
2569
12.2k
    req.set_cloud_unique_id(config::cloud_unique_id);
2570
12.2k
    req.set_packed_file_path(packed_file_path);
2571
12.2k
    *req.mutable_packed_file_info() = packed_file_info;
2572
2573
    // Make RPC call using retry pattern
2574
12.2k
    return retry_rpc(MetaServiceRPC::UPDATE_PACKED_FILE_INFO, req, &resp,
2575
12.2k
                     &cloud::MetaService_Stub::update_packed_file_info,
2576
12.2k
                     {
2577
12.2k
                             .host_limiters = host_level_ms_rpc_rate_limiters_,
2578
12.2k
                             .backpressure_handler = ms_backpressure_handler_,
2579
12.2k
                             .table_id = table_id,
2580
12.2k
                     });
2581
12.2k
}
2582
2583
Status CloudMetaMgr::get_cluster_status(
2584
        std::unordered_map<std::string, std::pair<int32_t, int64_t>>* result,
2585
57
        std::string* my_cluster_id) {
2586
57
    GetClusterStatusRequest req;
2587
57
    GetClusterStatusResponse resp;
2588
57
    req.add_cloud_unique_ids(config::cloud_unique_id);
2589
2590
57
    Status s = retry_rpc(MetaServiceRPC::GET_CLUSTER_STATUS, req, &resp,
2591
57
                         &MetaService_Stub::get_cluster_status,
2592
57
                         {.host_limiters = host_level_ms_rpc_rate_limiters_});
2593
57
    if (!s.ok()) {
2594
0
        return s;
2595
0
    }
2596
2597
57
    result->clear();
2598
57
    for (const auto& detail : resp.details()) {
2599
57
        for (const auto& cluster : detail.clusters()) {
2600
            // Store cluster status and mtime (mtime is in seconds from MS, convert to ms).
2601
            // If mtime is not set, use current time as a conservative default
2602
            // to avoid immediate takeover due to elapsed being huge.
2603
57
            int64_t mtime_ms = cluster.has_mtime() ? cluster.mtime() * 1000 : UnixMillis();
2604
57
            (*result)[cluster.cluster_id()] = {static_cast<int32_t>(cluster.cluster_status()),
2605
57
                                               mtime_ms};
2606
57
        }
2607
57
    }
2608
2609
57
    if (my_cluster_id && resp.has_requester_cluster_id()) {
2610
1
        *my_cluster_id = resp.requester_cluster_id();
2611
1
    }
2612
2613
57
    return Status::OK();
2614
57
}
2615
2616
} // namespace doris::cloud