Coverage Report

Created: 2026-03-16 13:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_meta_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#include "cloud/cloud_meta_mgr.h"
18
19
#include <brpc/channel.h>
20
#include <brpc/controller.h>
21
#include <brpc/errno.pb.h>
22
#include <bthread/bthread.h>
23
#include <bthread/condition_variable.h>
24
#include <bthread/mutex.h>
25
#include <gen_cpp/FrontendService.h>
26
#include <gen_cpp/HeartbeatService_types.h>
27
#include <gen_cpp/PlanNodes_types.h>
28
#include <gen_cpp/Types_types.h>
29
#include <gen_cpp/cloud.pb.h>
30
#include <gen_cpp/olap_file.pb.h>
31
#include <glog/logging.h>
32
33
#include <algorithm>
34
#include <atomic>
35
#include <chrono>
36
#include <cstdint>
37
#include <memory>
38
#include <mutex>
39
#include <random>
40
#include <shared_mutex>
41
#include <string>
42
#include <type_traits>
43
#include <vector>
44
45
#include "cloud/cloud_storage_engine.h"
46
#include "cloud/cloud_tablet.h"
47
#include "cloud/cloud_warm_up_manager.h"
48
#include "cloud/config.h"
49
#include "cloud/delete_bitmap_file_reader.h"
50
#include "cloud/delete_bitmap_file_writer.h"
51
#include "cloud/pb_convert.h"
52
#include "common/config.h"
53
#include "common/logging.h"
54
#include "common/status.h"
55
#include "cpp/sync_point.h"
56
#include "io/fs/obj_storage_client.h"
57
#include "load/stream_load/stream_load_context.h"
58
#include "runtime/exec_env.h"
59
#include "storage/olap_common.h"
60
#include "storage/rowset/rowset.h"
61
#include "storage/rowset/rowset_factory.h"
62
#include "storage/rowset/rowset_fwd.h"
63
#include "storage/storage_engine.h"
64
#include "storage/tablet/tablet_meta.h"
65
#include "util/client_cache.h"
66
#include "util/network_util.h"
67
#include "util/s3_util.h"
68
#include "util/thrift_rpc_helper.h"
69
70
namespace doris::cloud {
71
#include "common/compile_check_begin.h"
72
using namespace ErrorCode;
73
74
1.92M
void* run_bthread_work(void* arg) {
75
1.92M
    auto* f = reinterpret_cast<std::function<void()>*>(arg);
76
1.92M
    (*f)();
77
1.92M
    delete f;
78
1.92M
    return nullptr;
79
1.92M
}
80
81
364k
Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency) {
82
364k
    if (tasks.empty()) {
83
2.25k
        return Status::OK();
84
2.25k
    }
85
86
362k
    bthread::Mutex lock;
87
362k
    bthread::ConditionVariable cond;
88
362k
    Status status; // Guard by lock
89
362k
    int count = 0; // Guard by lock
90
91
1.61M
    for (const auto& task : tasks) {
92
1.61M
        {
93
1.61M
            std::unique_lock lk(lock);
94
            // Wait until there are available slots
95
1.72M
            while (status.ok() && count >= concurrency) {
96
110k
                cond.wait(lk);
97
110k
            }
98
1.61M
            if (!status.ok()) {
99
2
                break;
100
2
            }
101
102
            // Increase running task count
103
1.61M
            ++count;
104
1.61M
        }
105
106
        // dispatch task into bthreads
107
1.61M
        auto* fn = new std::function<void()>([&, &task = task] {
108
1.61M
            auto st = task();
109
1.61M
            {
110
1.61M
                std::lock_guard lk(lock);
111
1.61M
                --count;
112
1.61M
                if (!st.ok()) {
113
2
                    std::swap(st, status);
114
2
                }
115
1.61M
                cond.notify_one();
116
1.61M
            }
117
1.61M
        });
118
119
1.61M
        bthread_t bthread_id;
120
1.61M
        if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, fn) != 0) {
121
0
            run_bthread_work(fn);
122
0
        }
123
1.61M
    }
124
125
    // Wait until all running tasks have done
126
362k
    {
127
362k
        std::unique_lock lk(lock);
128
1.06M
        while (count > 0) {
129
705k
            cond.wait(lk);
130
705k
        }
131
362k
    }
132
133
362k
    return status;
134
364k
}
135
136
Status bthread_fork_join(std::vector<std::function<Status()>>&& tasks, int concurrency,
137
306k
                         std::future<Status>* fut) {
138
    // std::function will cause `copy`, we need to use heap memory to avoid copy ctor called
139
306k
    auto prom = std::make_shared<std::promise<Status>>();
140
306k
    *fut = prom->get_future();
141
306k
    std::function<void()>* fn = new std::function<void()>(
142
308k
            [tasks = std::move(tasks), concurrency, p = std::move(prom)]() mutable {
143
308k
                p->set_value(bthread_fork_join(tasks, concurrency));
144
308k
            });
145
146
306k
    bthread_t bthread_id;
147
306k
    if (bthread_start_background(&bthread_id, nullptr, run_bthread_work, fn) != 0) {
148
0
        delete fn;
149
0
        return Status::InternalError<false>("failed to create bthread");
150
0
    }
151
306k
    return Status::OK();
152
306k
}
153
154
namespace {
155
constexpr int kBrpcRetryTimes = 3;
156
157
bvar::LatencyRecorder _get_rowset_latency("doris_cloud_meta_mgr_get_rowset");
158
bvar::LatencyRecorder g_cloud_commit_txn_resp_redirect_latency("cloud_table_stats_report_latency");
159
bvar::Adder<uint64_t> g_cloud_meta_mgr_rpc_timeout_count("cloud_meta_mgr_rpc_timeout_count");
160
bvar::Window<bvar::Adder<uint64_t>> g_cloud_ms_rpc_timeout_count_window(
161
        "cloud_meta_mgr_rpc_timeout_qps", &g_cloud_meta_mgr_rpc_timeout_count, 30);
162
bvar::LatencyRecorder g_cloud_be_mow_get_dbm_lock_backoff_sleep_time(
163
        "cloud_be_mow_get_dbm_lock_backoff_sleep_time");
164
bvar::Adder<uint64_t> g_cloud_version_hole_filled_count("cloud_version_hole_filled_count");
165
166
class MetaServiceProxy {
167
public:
168
1.25M
    static Status get_proxy(MetaServiceProxy** proxy) {
169
        // The 'stub' is a useless parameter, added only to reuse the `get_pooled_client` function.
170
1.25M
        std::shared_ptr<MetaService_Stub> stub;
171
1.25M
        return get_pooled_client(&stub, proxy);
172
1.25M
    }
173
174
0
    void set_unhealthy() {
175
0
        std::unique_lock lock(_mutex);
176
0
        maybe_unhealthy = true;
177
0
    }
178
179
2.49M
    bool need_reconn(long now) {
180
2.49M
        return maybe_unhealthy && ((now - last_reconn_time_ms.front()) >
181
0
                                   config::meta_service_rpc_reconnect_interval_ms);
182
2.49M
    }
183
184
2.49M
    Status get(std::shared_ptr<MetaService_Stub>* stub) {
185
2.49M
        using namespace std::chrono;
186
187
2.49M
        auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
188
2.49M
        {
189
2.49M
            std::shared_lock lock(_mutex);
190
2.50M
            if (_deadline_ms >= now && !is_idle_timeout(now) && !need_reconn(now)) {
191
2.50M
                _last_access_at_ms.store(now, std::memory_order_relaxed);
192
2.50M
                *stub = _stub;
193
2.50M
                return Status::OK();
194
2.50M
            }
195
2.49M
        }
196
197
18.4E
        auto channel = std::make_unique<brpc::Channel>();
198
18.4E
        Status s = init_channel(channel.get());
199
18.4E
        if (!s.ok()) [[unlikely]] {
200
0
            return s;
201
0
        }
202
203
18.4E
        *stub = std::make_shared<MetaService_Stub>(channel.release(),
204
18.4E
                                                   google::protobuf::Service::STUB_OWNS_CHANNEL);
205
206
18.4E
        long deadline = now;
207
        // connection age only works without list endpoint.
208
18.4E
        if (config::meta_service_connection_age_base_seconds > 0) {
209
1.92k
            std::default_random_engine rng(static_cast<uint32_t>(now));
210
1.92k
            std::uniform_int_distribution<> uni(
211
1.92k
                    config::meta_service_connection_age_base_seconds,
212
1.92k
                    config::meta_service_connection_age_base_seconds * 2);
213
1.92k
            deadline = now + duration_cast<milliseconds>(seconds(uni(rng))).count();
214
1.92k
        }
215
216
        // Last one WIN
217
18.4E
        std::unique_lock lock(_mutex);
218
18.4E
        _last_access_at_ms.store(now, std::memory_order_relaxed);
219
18.4E
        _deadline_ms = deadline;
220
18.4E
        _stub = *stub;
221
222
18.4E
        last_reconn_time_ms.push(now);
223
18.4E
        last_reconn_time_ms.pop();
224
18.4E
        maybe_unhealthy = false;
225
226
18.4E
        return Status::OK();
227
18.4E
    }
228
229
private:
230
2.50M
    static bool is_meta_service_endpoint_list() {
231
2.50M
        return config::meta_service_endpoint.find(',') != std::string::npos;
232
2.50M
    }
233
234
    /**
235
    * This function initializes a pool of `MetaServiceProxy` objects and selects one using
236
    * round-robin. It returns a client stub via the selected proxy.
237
    *
238
    * @param stub A pointer to a shared pointer of `MetaService_Stub` to be retrieved.
239
    * @param proxy (Optional) A pointer to store the selected `MetaServiceProxy`.
240
    *
241
    * @return Status Returns `Status::OK()` on success or an error status on failure.
242
    */
243
    static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub,
244
1.25M
                                    MetaServiceProxy** proxy) {
245
1.25M
        static std::once_flag proxies_flag;
246
1.25M
        static size_t num_proxies = 1;
247
1.25M
        static std::atomic<size_t> index(0);
248
1.25M
        static std::unique_ptr<MetaServiceProxy[]> proxies;
249
1.25M
        if (config::meta_service_endpoint.empty()) {
250
21
            return Status::InvalidArgument(
251
21
                    "Meta service endpoint is empty. Please configure manually or wait for "
252
21
                    "heartbeat to obtain.");
253
21
        }
254
1.25M
        std::call_once(
255
1.25M
                proxies_flag, +[]() {
256
1
                    if (config::meta_service_connection_pooled) {
257
1
                        num_proxies = config::meta_service_connection_pool_size;
258
1
                    }
259
1
                    proxies = std::make_unique<MetaServiceProxy[]>(num_proxies);
260
1
                });
261
262
1.25M
        for (size_t i = 0; i + 1 < num_proxies; ++i) {
263
1.25M
            size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies;
264
1.25M
            Status s = proxies[next_index].get(stub);
265
1.25M
            if (proxy != nullptr) {
266
1.25M
                *proxy = &(proxies[next_index]);
267
1.25M
            }
268
1.25M
            if (s.ok()) return Status::OK();
269
1.25M
        }
270
271
3.63k
        size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % num_proxies;
272
3.63k
        if (proxy != nullptr) {
273
0
            *proxy = &(proxies[next_index]);
274
0
        }
275
3.63k
        return proxies[next_index].get(stub);
276
1.25M
    }
277
278
1.91k
    static Status init_channel(brpc::Channel* channel) {
279
1.91k
        static std::atomic<size_t> index = 1;
280
281
1.91k
        const char* load_balancer_name = nullptr;
282
1.91k
        std::string endpoint;
283
1.91k
        if (is_meta_service_endpoint_list()) {
284
0
            endpoint = fmt::format("list://{}", config::meta_service_endpoint);
285
0
            load_balancer_name = "random";
286
1.91k
        } else {
287
1.91k
            std::string ip;
288
1.91k
            uint16_t port;
289
1.91k
            Status s = get_meta_service_ip_and_port(&ip, &port);
290
1.91k
            if (!s.ok()) {
291
0
                LOG(WARNING) << "fail to get meta service ip and port: " << s;
292
0
                return s;
293
0
            }
294
295
1.91k
            endpoint = get_host_port(ip, port);
296
1.91k
        }
297
298
1.91k
        brpc::ChannelOptions options;
299
1.91k
        options.connection_group =
300
1.91k
                fmt::format("ms_{}", index.fetch_add(1, std::memory_order_relaxed));
301
1.91k
        if (channel->Init(endpoint.c_str(), load_balancer_name, &options) != 0) {
302
0
            return Status::InvalidArgument("failed to init brpc channel, endpoint: {}", endpoint);
303
0
        }
304
1.91k
        return Status::OK();
305
1.91k
    }
306
307
1.91k
    static Status get_meta_service_ip_and_port(std::string* ip, uint16_t* port) {
308
1.91k
        std::string parsed_host;
309
1.91k
        if (!parse_endpoint(config::meta_service_endpoint, &parsed_host, port)) {
310
0
            return Status::InvalidArgument("invalid meta service endpoint: {}",
311
0
                                           config::meta_service_endpoint);
312
0
        }
313
1.91k
        if (is_valid_ip(parsed_host)) {
314
1.91k
            *ip = std::move(parsed_host);
315
1.91k
            return Status::OK();
316
1.91k
        }
317
1
        return hostname_to_ip(parsed_host, *ip);
318
1.91k
    }
319
320
2.50M
    bool is_idle_timeout(long now) {
321
2.50M
        auto idle_timeout_ms = config::meta_service_idle_connection_timeout_ms;
322
        // idle timeout only works without list endpoint.
323
2.50M
        return !is_meta_service_endpoint_list() && idle_timeout_ms > 0 &&
324
2.50M
               _last_access_at_ms.load(std::memory_order_relaxed) + idle_timeout_ms < now;
325
2.50M
    }
326
327
    std::shared_mutex _mutex;
328
    std::atomic<long> _last_access_at_ms {0};
329
    long _deadline_ms {0};
330
    std::shared_ptr<MetaService_Stub> _stub;
331
332
    std::queue<long> last_reconn_time_ms {std::deque<long> {0, 0, 0}};
333
    bool maybe_unhealthy = false;
334
};
335
336
template <typename T, typename... Ts>
337
struct is_any : std::disjunction<std::is_same<T, Ts>...> {};
338
339
template <typename T, typename... Ts>
340
constexpr bool is_any_v = is_any<T, Ts...>::value;
341
342
template <typename Request>
343
667
static std::string debug_info(const Request& req) {
344
667
    if constexpr (is_any_v<Request, CommitTxnRequest, AbortTxnRequest, PrecommitTxnRequest>) {
345
0
        return fmt::format(" txn_id={}", req.txn_id());
346
0
    } else if constexpr (is_any_v<Request, StartTabletJobRequest, FinishTabletJobRequest>) {
347
0
        return fmt::format(" tablet_id={}", req.job().idx().tablet_id());
348
0
    } else if constexpr (is_any_v<Request, UpdateDeleteBitmapRequest>) {
349
0
        return fmt::format(" tablet_id={}, lock_id={}", req.tablet_id(), req.lock_id());
350
667
    } else if constexpr (is_any_v<Request, GetDeleteBitmapUpdateLockRequest>) {
351
667
        return fmt::format(" table_id={}, lock_id={}", req.table_id(), req.lock_id());
352
667
    } else if constexpr (is_any_v<Request, GetTabletRequest>) {
353
0
        return fmt::format(" tablet_id={}", req.tablet_id());
354
    } else if constexpr (is_any_v<Request, GetObjStoreInfoRequest, ListSnapshotRequest,
355
0
                                  GetInstanceRequest, GetClusterStatusRequest>) {
356
0
        return "";
357
0
    } else if constexpr (is_any_v<Request, CreateRowsetRequest>) {
358
0
        return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id());
359
    } else if constexpr (is_any_v<Request, RemoveDeleteBitmapRequest>) {
360
        return fmt::format(" tablet_id={}", req.tablet_id());
361
0
    } else if constexpr (is_any_v<Request, RemoveDeleteBitmapUpdateLockRequest>) {
362
0
        return fmt::format(" table_id={}, tablet_id={}, lock_id={}", req.table_id(),
363
0
                           req.tablet_id(), req.lock_id());
364
0
    } else if constexpr (is_any_v<Request, GetDeleteBitmapRequest>) {
365
0
        return fmt::format(" tablet_id={}", req.tablet_id());
366
    } else if constexpr (is_any_v<Request, GetSchemaDictRequest>) {
367
        return fmt::format(" index_id={}", req.index_id());
368
0
    } else if constexpr (is_any_v<Request, RestoreJobRequest>) {
369
0
        return fmt::format(" tablet_id={}", req.tablet_id());
370
0
    } else if constexpr (is_any_v<Request, UpdatePackedFileInfoRequest>) {
371
0
        return fmt::format(" packed_file_path={}", req.packed_file_path());
372
    } else {
373
        static_assert(!sizeof(Request));
374
    }
375
667
}
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_16GetTabletRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_22GetDeleteBitmapRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_19CreateRowsetRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_16CommitTxnRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_15AbortTxnRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_19PrecommitTxnRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_17RestoreJobRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_22GetObjStoreInfoRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_21StartTabletJobRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_22FinishTabletJobRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_25UpdateDeleteBitmapRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_32GetDeleteBitmapUpdateLockRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Line
Count
Source
343
667
static std::string debug_info(const Request& req) {
344
    if constexpr (is_any_v<Request, CommitTxnRequest, AbortTxnRequest, PrecommitTxnRequest>) {
345
        return fmt::format(" txn_id={}", req.txn_id());
346
    } else if constexpr (is_any_v<Request, StartTabletJobRequest, FinishTabletJobRequest>) {
347
        return fmt::format(" tablet_id={}", req.job().idx().tablet_id());
348
    } else if constexpr (is_any_v<Request, UpdateDeleteBitmapRequest>) {
349
        return fmt::format(" tablet_id={}, lock_id={}", req.tablet_id(), req.lock_id());
350
667
    } else if constexpr (is_any_v<Request, GetDeleteBitmapUpdateLockRequest>) {
351
667
        return fmt::format(" table_id={}, lock_id={}", req.table_id(), req.lock_id());
352
    } else if constexpr (is_any_v<Request, GetTabletRequest>) {
353
        return fmt::format(" tablet_id={}", req.tablet_id());
354
    } else if constexpr (is_any_v<Request, GetObjStoreInfoRequest, ListSnapshotRequest,
355
                                  GetInstanceRequest, GetClusterStatusRequest>) {
356
        return "";
357
    } else if constexpr (is_any_v<Request, CreateRowsetRequest>) {
358
        return fmt::format(" tablet_id={}", req.rowset_meta().tablet_id());
359
    } else if constexpr (is_any_v<Request, RemoveDeleteBitmapRequest>) {
360
        return fmt::format(" tablet_id={}", req.tablet_id());
361
    } else if constexpr (is_any_v<Request, RemoveDeleteBitmapUpdateLockRequest>) {
362
        return fmt::format(" table_id={}, tablet_id={}, lock_id={}", req.table_id(),
363
                           req.tablet_id(), req.lock_id());
364
    } else if constexpr (is_any_v<Request, GetDeleteBitmapRequest>) {
365
        return fmt::format(" tablet_id={}", req.tablet_id());
366
    } else if constexpr (is_any_v<Request, GetSchemaDictRequest>) {
367
        return fmt::format(" index_id={}", req.index_id());
368
    } else if constexpr (is_any_v<Request, RestoreJobRequest>) {
369
        return fmt::format(" tablet_id={}", req.tablet_id());
370
    } else if constexpr (is_any_v<Request, UpdatePackedFileInfoRequest>) {
371
        return fmt::format(" packed_file_path={}", req.packed_file_path());
372
    } else {
373
        static_assert(!sizeof(Request));
374
    }
375
667
}
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_35RemoveDeleteBitmapUpdateLockRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_19ListSnapshotRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_18GetInstanceRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_27UpdatePackedFileInfoRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_110debug_infoINS0_23GetClusterStatusRequestEEENSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
376
377
896k
inline std::default_random_engine make_random_engine() {
378
896k
    return std::default_random_engine(
379
896k
            static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
380
896k
}
381
382
template <typename Request, typename Response>
383
using MetaServiceMethod = void (MetaService_Stub::*)(::google::protobuf::RpcController*,
384
                                                     const Request*, Response*,
385
                                                     ::google::protobuf::Closure*);
386
387
template <typename Request, typename Response>
388
Status retry_rpc(std::string_view op_name, const Request& req, Response* res,
389
893k
                 MetaServiceMethod<Request, Response> method) {
390
893k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
893k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
893k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
893k
    int retry_times = 0;
397
893k
    uint32_t duration_ms = 0;
398
893k
    std::string error_msg;
399
893k
    std::default_random_engine rng = make_random_engine();
400
893k
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
893k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
893k
    MetaServiceProxy* proxy;
403
893k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
893k
    while (true) {
405
891k
        std::shared_ptr<MetaService_Stub> stub;
406
891k
        RETURN_IF_ERROR(proxy->get(&stub));
407
891k
        brpc::Controller cntl;
408
891k
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
92.4k
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
799k
        } else {
411
799k
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
799k
        }
413
891k
        cntl.set_max_retry(kBrpcRetryTimes);
414
891k
        res->Clear();
415
891k
        int error_code = 0;
416
891k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
891k
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
894k
        } else if (res->status().code() == MetaServiceCode::OK) {
422
894k
            return Status::OK();
423
18.4E
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
18
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
18
                                                                     res->status().msg());
426
18.4E
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
1.17k
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
1.17k
                                                                   res->status().msg());
429
18.4E
        } else {
430
18.4E
            error_msg = res->status().msg();
431
18.4E
        }
432
433
18.4E
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
18.4E
        ++retry_times;
438
18.4E
        if (retry_times > config::meta_service_rpc_retry_times ||
439
18.4E
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
18.4E
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
18.4E
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
18.4E
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
18.4E
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
18.4E
        bthread_usleep(duration_ms * 1000);
450
18.4E
    }
451
18.4E
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
893k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_16GetTabletRequestENS0_17GetTabletResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
Line
Count
Source
389
331k
                 MetaServiceMethod<Request, Response> method) {
390
331k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
331k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
331k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
331k
    int retry_times = 0;
397
331k
    uint32_t duration_ms = 0;
398
331k
    std::string error_msg;
399
331k
    std::default_random_engine rng = make_random_engine();
400
331k
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
331k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
331k
    MetaServiceProxy* proxy;
403
331k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
331k
    while (true) {
405
330k
        std::shared_ptr<MetaService_Stub> stub;
406
330k
        RETURN_IF_ERROR(proxy->get(&stub));
407
330k
        brpc::Controller cntl;
408
330k
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
330k
        } else {
411
330k
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
330k
        }
413
330k
        cntl.set_max_retry(kBrpcRetryTimes);
414
330k
        res->Clear();
415
330k
        int error_code = 0;
416
330k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
330k
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
330k
        } else if (res->status().code() == MetaServiceCode::OK) {
422
330k
            return Status::OK();
423
330k
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
0
                                                                     res->status().msg());
426
228
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
100
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
100
                                                                   res->status().msg());
429
128
        } else {
430
128
            error_msg = res->status().msg();
431
128
        }
432
433
128
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
128
        ++retry_times;
438
128
        if (retry_times > config::meta_service_rpc_retry_times ||
439
128
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
128
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
128
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
128
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
128
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
128
        bthread_usleep(duration_ms * 1000);
450
128
    }
451
317
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
331k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_22GetDeleteBitmapRequestENS0_23GetDeleteBitmapResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
Line
Count
Source
389
28.7k
                 MetaServiceMethod<Request, Response> method) {
390
28.7k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
28.7k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
28.7k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
28.7k
    int retry_times = 0;
397
28.7k
    uint32_t duration_ms = 0;
398
28.7k
    std::string error_msg;
399
28.7k
    std::default_random_engine rng = make_random_engine();
400
28.7k
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
28.7k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
28.7k
    MetaServiceProxy* proxy;
403
28.7k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
28.7k
    while (true) {
405
28.7k
        std::shared_ptr<MetaService_Stub> stub;
406
28.7k
        RETURN_IF_ERROR(proxy->get(&stub));
407
28.7k
        brpc::Controller cntl;
408
28.7k
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
28.7k
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
28.7k
        } else {
411
18
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
18
        }
413
28.7k
        cntl.set_max_retry(kBrpcRetryTimes);
414
28.7k
        res->Clear();
415
28.7k
        int error_code = 0;
416
28.7k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
28.7k
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
28.7k
        } else if (res->status().code() == MetaServiceCode::OK) {
422
28.7k
            return Status::OK();
423
18.4E
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
0
                                                                     res->status().msg());
426
18.4E
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
0
                                                                   res->status().msg());
429
18.4E
        } else {
430
18.4E
            error_msg = res->status().msg();
431
18.4E
        }
432
433
18.4E
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
18.4E
        ++retry_times;
438
18.4E
        if (retry_times > config::meta_service_rpc_retry_times ||
439
18.4E
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
18.4E
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
18.4E
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
18.4E
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
18.4E
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
18.4E
        bthread_usleep(duration_ms * 1000);
450
18.4E
    }
451
18.4E
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
28.7k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_19CreateRowsetRequestENS0_20CreateRowsetResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
Line
Count
Source
389
403k
                 MetaServiceMethod<Request, Response> method) {
390
403k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
403k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
403k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
403k
    int retry_times = 0;
397
403k
    uint32_t duration_ms = 0;
398
403k
    std::string error_msg;
399
403k
    std::default_random_engine rng = make_random_engine();
400
403k
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
403k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
403k
    MetaServiceProxy* proxy;
403
403k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
403k
    while (true) {
405
402k
        std::shared_ptr<MetaService_Stub> stub;
406
402k
        RETURN_IF_ERROR(proxy->get(&stub));
407
402k
        brpc::Controller cntl;
408
403k
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
402k
        } else {
411
402k
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
402k
        }
413
402k
        cntl.set_max_retry(kBrpcRetryTimes);
414
402k
        res->Clear();
415
402k
        int error_code = 0;
416
402k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
402k
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
405k
        } else if (res->status().code() == MetaServiceCode::OK) {
422
405k
            return Status::OK();
423
18.4E
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
0
                                                                     res->status().msg());
426
18.4E
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
0
                                                                   res->status().msg());
429
18.4E
        } else {
430
18.4E
            error_msg = res->status().msg();
431
18.4E
        }
432
433
18.4E
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
18.4E
        ++retry_times;
438
18.4E
        if (retry_times > config::meta_service_rpc_retry_times ||
439
18.4E
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
18.4E
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
18.4E
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
18.4E
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
18.4E
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
18.4E
        bthread_usleep(duration_ms * 1000);
450
18.4E
    }
451
18.4E
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
403k
}
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_16CommitTxnRequestENS0_17CommitTxnResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_15AbortTxnRequestENS0_16AbortTxnResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
Line
Count
Source
389
487
                 MetaServiceMethod<Request, Response> method) {
390
487
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
487
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
487
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
487
    int retry_times = 0;
397
487
    uint32_t duration_ms = 0;
398
487
    std::string error_msg;
399
487
    std::default_random_engine rng = make_random_engine();
400
487
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
487
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
487
    MetaServiceProxy* proxy;
403
487
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
487
    while (true) {
405
487
        std::shared_ptr<MetaService_Stub> stub;
406
487
        RETURN_IF_ERROR(proxy->get(&stub));
407
487
        brpc::Controller cntl;
408
487
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
487
        } else {
411
487
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
487
        }
413
487
        cntl.set_max_retry(kBrpcRetryTimes);
414
487
        res->Clear();
415
487
        int error_code = 0;
416
487
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
487
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
487
        } else if (res->status().code() == MetaServiceCode::OK) {
422
478
            return Status::OK();
423
478
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
0
                                                                     res->status().msg());
426
9
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
9
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
9
                                                                   res->status().msg());
429
9
        } else {
430
0
            error_msg = res->status().msg();
431
0
        }
432
433
0
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
0
        ++retry_times;
438
0
        if (retry_times > config::meta_service_rpc_retry_times ||
439
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
0
        bthread_usleep(duration_ms * 1000);
450
0
    }
451
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
487
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_19PrecommitTxnRequestENS0_20PrecommitTxnResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
Line
Count
Source
389
33
                 MetaServiceMethod<Request, Response> method) {
390
33
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
33
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
33
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
33
    int retry_times = 0;
397
33
    uint32_t duration_ms = 0;
398
33
    std::string error_msg;
399
33
    std::default_random_engine rng = make_random_engine();
400
33
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
33
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
33
    MetaServiceProxy* proxy;
403
33
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
33
    while (true) {
405
33
        std::shared_ptr<MetaService_Stub> stub;
406
33
        RETURN_IF_ERROR(proxy->get(&stub));
407
33
        brpc::Controller cntl;
408
33
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
33
        } else {
411
33
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
33
        }
413
33
        cntl.set_max_retry(kBrpcRetryTimes);
414
33
        res->Clear();
415
33
        int error_code = 0;
416
33
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
33
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
33
        } else if (res->status().code() == MetaServiceCode::OK) {
422
33
            return Status::OK();
423
33
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
0
                                                                     res->status().msg());
426
0
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
0
                                                                   res->status().msg());
429
0
        } else {
430
0
            error_msg = res->status().msg();
431
0
        }
432
433
0
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
0
        ++retry_times;
438
0
        if (retry_times > config::meta_service_rpc_retry_times ||
439
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
0
        bthread_usleep(duration_ms * 1000);
450
0
    }
451
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
33
}
Unexecuted instantiation: cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_17RestoreJobRequestENS0_18RestoreJobResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_22GetObjStoreInfoRequestENS0_23GetObjStoreInfoResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
Line
Count
Source
389
126
                 MetaServiceMethod<Request, Response> method) {
390
126
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
126
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
126
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
126
    int retry_times = 0;
397
126
    uint32_t duration_ms = 0;
398
126
    std::string error_msg;
399
126
    std::default_random_engine rng = make_random_engine();
400
126
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
126
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
126
    MetaServiceProxy* proxy;
403
126
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
126
    while (true) {
405
126
        std::shared_ptr<MetaService_Stub> stub;
406
126
        RETURN_IF_ERROR(proxy->get(&stub));
407
126
        brpc::Controller cntl;
408
126
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
126
        } else {
411
126
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
126
        }
413
126
        cntl.set_max_retry(kBrpcRetryTimes);
414
126
        res->Clear();
415
126
        int error_code = 0;
416
126
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
126
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
126
        } else if (res->status().code() == MetaServiceCode::OK) {
422
126
            return Status::OK();
423
126
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
0
                                                                     res->status().msg());
426
0
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
0
                                                                   res->status().msg());
429
0
        } else {
430
0
            error_msg = res->status().msg();
431
0
        }
432
433
0
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
0
        ++retry_times;
438
0
        if (retry_times > config::meta_service_rpc_retry_times ||
439
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
0
        bthread_usleep(duration_ms * 1000);
450
0
    }
451
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
126
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_21StartTabletJobRequestENS0_22StartTabletJobResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
Line
Count
Source
389
23.1k
                 MetaServiceMethod<Request, Response> method) {
390
23.1k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
23.1k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
23.1k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
23.1k
    int retry_times = 0;
397
23.1k
    uint32_t duration_ms = 0;
398
23.1k
    std::string error_msg;
399
23.1k
    std::default_random_engine rng = make_random_engine();
400
23.1k
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
23.1k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
23.1k
    MetaServiceProxy* proxy;
403
23.1k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
23.2k
    while (true) {
405
23.2k
        std::shared_ptr<MetaService_Stub> stub;
406
23.2k
        RETURN_IF_ERROR(proxy->get(&stub));
407
23.2k
        brpc::Controller cntl;
408
23.2k
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
23.2k
        } else {
411
23.2k
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
23.2k
        }
413
23.2k
        cntl.set_max_retry(kBrpcRetryTimes);
414
23.2k
        res->Clear();
415
23.2k
        int error_code = 0;
416
23.2k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
23.2k
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
23.2k
        } else if (res->status().code() == MetaServiceCode::OK) {
422
22.9k
            return Status::OK();
423
22.9k
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
0
                                                                     res->status().msg());
426
260
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
260
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
260
                                                                   res->status().msg());
429
18.4E
        } else {
430
18.4E
            error_msg = res->status().msg();
431
18.4E
        }
432
433
18.4E
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
18.4E
        ++retry_times;
438
18.4E
        if (retry_times > config::meta_service_rpc_retry_times ||
439
18.4E
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
18.4E
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
18.4E
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
18.4E
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
18.4E
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
18.4E
        bthread_usleep(duration_ms * 1000);
450
18.4E
    }
451
18.4E
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
23.1k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_22FinishTabletJobRequestENS0_23FinishTabletJobResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
Line
Count
Source
389
21.8k
                 MetaServiceMethod<Request, Response> method) {
390
21.8k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
21.8k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
21.8k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
21.8k
    int retry_times = 0;
397
21.8k
    uint32_t duration_ms = 0;
398
21.8k
    std::string error_msg;
399
21.8k
    std::default_random_engine rng = make_random_engine();
400
21.8k
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
21.8k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
21.8k
    MetaServiceProxy* proxy;
403
21.8k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
21.8k
    while (true) {
405
21.8k
        std::shared_ptr<MetaService_Stub> stub;
406
21.8k
        RETURN_IF_ERROR(proxy->get(&stub));
407
21.8k
        brpc::Controller cntl;
408
21.9k
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
21.8k
        } else {
411
21.8k
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
21.8k
        }
413
21.8k
        cntl.set_max_retry(kBrpcRetryTimes);
414
21.8k
        res->Clear();
415
21.8k
        int error_code = 0;
416
21.8k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
21.8k
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
21.9k
        } else if (res->status().code() == MetaServiceCode::OK) {
422
21.9k
            return Status::OK();
423
18.4E
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
18
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
18
                                                                     res->status().msg());
426
18.4E
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
88
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
88
                                                                   res->status().msg());
429
18.4E
        } else {
430
18.4E
            error_msg = res->status().msg();
431
18.4E
        }
432
433
18.4E
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
18.4E
        ++retry_times;
438
18.4E
        if (retry_times > config::meta_service_rpc_retry_times ||
439
18.4E
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
18.4E
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
18.4E
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
18.4E
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
18.4E
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
18.4E
        bthread_usleep(duration_ms * 1000);
450
18.4E
    }
451
18.4E
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
21.8k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_25UpdateDeleteBitmapRequestENS0_26UpdateDeleteBitmapResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
Line
Count
Source
389
63.4k
                 MetaServiceMethod<Request, Response> method) {
390
63.4k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
63.4k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
63.4k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
63.4k
    int retry_times = 0;
397
63.4k
    uint32_t duration_ms = 0;
398
63.4k
    std::string error_msg;
399
63.4k
    std::default_random_engine rng = make_random_engine();
400
63.4k
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
63.4k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
63.4k
    MetaServiceProxy* proxy;
403
63.4k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
63.4k
    while (true) {
405
63.4k
        std::shared_ptr<MetaService_Stub> stub;
406
63.4k
        RETURN_IF_ERROR(proxy->get(&stub));
407
63.4k
        brpc::Controller cntl;
408
63.7k
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
63.7k
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
18.4E
        } else {
411
18.4E
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
18.4E
        }
413
63.4k
        cntl.set_max_retry(kBrpcRetryTimes);
414
63.4k
        res->Clear();
415
63.4k
        int error_code = 0;
416
63.4k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
63.4k
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
64.1k
        } else if (res->status().code() == MetaServiceCode::OK) {
422
64.1k
            return Status::OK();
423
18.4E
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
0
                                                                     res->status().msg());
426
18.4E
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
25
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
25
                                                                   res->status().msg());
429
18.4E
        } else {
430
18.4E
            error_msg = res->status().msg();
431
18.4E
        }
432
433
18.4E
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
18.4E
        ++retry_times;
438
18.4E
        if (retry_times > config::meta_service_rpc_retry_times ||
439
18.4E
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
18.4E
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
18.4E
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
18.4E
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
18.4E
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
18.4E
        bthread_usleep(duration_ms * 1000);
450
18.4E
    }
451
18.4E
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
63.4k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_32GetDeleteBitmapUpdateLockRequestENS0_33GetDeleteBitmapUpdateLockResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
Line
Count
Source
389
5.91k
                 MetaServiceMethod<Request, Response> method) {
390
5.91k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
5.91k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
5.91k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
5.91k
    int retry_times = 0;
397
5.91k
    uint32_t duration_ms = 0;
398
5.91k
    std::string error_msg;
399
5.91k
    std::default_random_engine rng = make_random_engine();
400
5.91k
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
5.91k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
5.91k
    MetaServiceProxy* proxy;
403
5.91k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
5.91k
    while (true) {
405
5.91k
        std::shared_ptr<MetaService_Stub> stub;
406
5.91k
        RETURN_IF_ERROR(proxy->get(&stub));
407
5.91k
        brpc::Controller cntl;
408
5.91k
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
5.91k
        } else {
411
5.91k
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
5.91k
        }
413
5.91k
        cntl.set_max_retry(kBrpcRetryTimes);
414
5.91k
        res->Clear();
415
5.91k
        int error_code = 0;
416
5.91k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
5.91k
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
5.91k
        } else if (res->status().code() == MetaServiceCode::OK) {
422
5.25k
            return Status::OK();
423
5.25k
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
0
                                                                     res->status().msg());
426
667
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
667
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
667
                                                                   res->status().msg());
429
18.4E
        } else {
430
18.4E
            error_msg = res->status().msg();
431
18.4E
        }
432
433
18.4E
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
18.4E
        ++retry_times;
438
18.4E
        if (retry_times > config::meta_service_rpc_retry_times ||
439
18.4E
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
18.4E
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
18.4E
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
18.4E
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
18.4E
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
18.4E
        bthread_usleep(duration_ms * 1000);
450
18.4E
    }
451
18.4E
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
5.91k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_35RemoveDeleteBitmapUpdateLockRequestENS0_36RemoveDeleteBitmapUpdateLockResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
Line
Count
Source
389
23
                 MetaServiceMethod<Request, Response> method) {
390
23
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
23
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
23
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
23
    int retry_times = 0;
397
23
    uint32_t duration_ms = 0;
398
23
    std::string error_msg;
399
23
    std::default_random_engine rng = make_random_engine();
400
23
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
23
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
23
    MetaServiceProxy* proxy;
403
23
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
23
    while (true) {
405
23
        std::shared_ptr<MetaService_Stub> stub;
406
23
        RETURN_IF_ERROR(proxy->get(&stub));
407
23
        brpc::Controller cntl;
408
23
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
23
        } else {
411
23
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
23
        }
413
23
        cntl.set_max_retry(kBrpcRetryTimes);
414
23
        res->Clear();
415
23
        int error_code = 0;
416
23
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
23
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
23
        } else if (res->status().code() == MetaServiceCode::OK) {
422
2
            return Status::OK();
423
21
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
0
                                                                     res->status().msg());
426
21
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
21
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
21
                                                                   res->status().msg());
429
21
        } else {
430
0
            error_msg = res->status().msg();
431
0
        }
432
433
0
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
0
        ++retry_times;
438
0
        if (retry_times > config::meta_service_rpc_retry_times ||
439
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
0
        bthread_usleep(duration_ms * 1000);
450
0
    }
451
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
23
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_19ListSnapshotRequestENS0_20ListSnapshotResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
Line
Count
Source
389
2
                 MetaServiceMethod<Request, Response> method) {
390
2
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
2
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
2
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
2
    int retry_times = 0;
397
2
    uint32_t duration_ms = 0;
398
2
    std::string error_msg;
399
2
    std::default_random_engine rng = make_random_engine();
400
2
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
2
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
2
    MetaServiceProxy* proxy;
403
2
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
2
    while (true) {
405
2
        std::shared_ptr<MetaService_Stub> stub;
406
2
        RETURN_IF_ERROR(proxy->get(&stub));
407
2
        brpc::Controller cntl;
408
2
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
2
        } else {
411
2
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
2
        }
413
2
        cntl.set_max_retry(kBrpcRetryTimes);
414
2
        res->Clear();
415
2
        int error_code = 0;
416
2
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
2
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
2
        } else if (res->status().code() == MetaServiceCode::OK) {
422
0
            return Status::OK();
423
2
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
0
                                                                     res->status().msg());
426
2
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
2
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
2
                                                                   res->status().msg());
429
2
        } else {
430
0
            error_msg = res->status().msg();
431
0
        }
432
433
0
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
0
        ++retry_times;
438
0
        if (retry_times > config::meta_service_rpc_retry_times ||
439
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
0
        bthread_usleep(duration_ms * 1000);
450
0
    }
451
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
2
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_18GetInstanceRequestENS0_19GetInstanceResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
Line
Count
Source
389
1
                 MetaServiceMethod<Request, Response> method) {
390
1
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
1
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
1
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
1
    int retry_times = 0;
397
1
    uint32_t duration_ms = 0;
398
1
    std::string error_msg;
399
1
    std::default_random_engine rng = make_random_engine();
400
1
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
1
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
1
    MetaServiceProxy* proxy;
403
1
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
1
    while (true) {
405
1
        std::shared_ptr<MetaService_Stub> stub;
406
1
        RETURN_IF_ERROR(proxy->get(&stub));
407
1
        brpc::Controller cntl;
408
1
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
1
        } else {
411
1
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
1
        }
413
1
        cntl.set_max_retry(kBrpcRetryTimes);
414
1
        res->Clear();
415
1
        int error_code = 0;
416
1
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
1
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
1
        } else if (res->status().code() == MetaServiceCode::OK) {
422
1
            return Status::OK();
423
1
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
0
                                                                     res->status().msg());
426
0
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
0
                                                                   res->status().msg());
429
0
        } else {
430
0
            error_msg = res->status().msg();
431
0
        }
432
433
0
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
0
        ++retry_times;
438
0
        if (retry_times > config::meta_service_rpc_retry_times ||
439
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
0
        bthread_usleep(duration_ms * 1000);
450
0
    }
451
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
1
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_27UpdatePackedFileInfoRequestENS0_28UpdatePackedFileInfoResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
Line
Count
Source
389
14.6k
                 MetaServiceMethod<Request, Response> method) {
390
14.6k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
14.6k
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
14.6k
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
14.6k
    int retry_times = 0;
397
14.6k
    uint32_t duration_ms = 0;
398
14.6k
    std::string error_msg;
399
14.6k
    std::default_random_engine rng = make_random_engine();
400
14.6k
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
14.6k
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
14.6k
    MetaServiceProxy* proxy;
403
14.6k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
14.6k
    while (true) {
405
14.6k
        std::shared_ptr<MetaService_Stub> stub;
406
14.6k
        RETURN_IF_ERROR(proxy->get(&stub));
407
14.6k
        brpc::Controller cntl;
408
14.6k
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
14.6k
        } else {
411
14.6k
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
14.6k
        }
413
14.6k
        cntl.set_max_retry(kBrpcRetryTimes);
414
14.6k
        res->Clear();
415
14.6k
        int error_code = 0;
416
14.6k
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
14.6k
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
14.6k
        } else if (res->status().code() == MetaServiceCode::OK) {
422
14.6k
            return Status::OK();
423
14.6k
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
0
                                                                     res->status().msg());
426
0
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
0
                                                                   res->status().msg());
429
0
        } else {
430
0
            error_msg = res->status().msg();
431
0
        }
432
433
0
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
0
        ++retry_times;
438
0
        if (retry_times > config::meta_service_rpc_retry_times ||
439
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
0
        bthread_usleep(duration_ms * 1000);
450
0
    }
451
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
14.6k
}
cloud_meta_mgr.cpp:_ZN5doris5cloud12_GLOBAL__N_19retry_rpcINS0_23GetClusterStatusRequestENS0_24GetClusterStatusResponseEEENS_6StatusESt17basic_string_viewIcSt11char_traitsIcEERKT_PT0_MNS0_16MetaService_StubEFvPN6google8protobuf13RpcControllerEPSB_SE_PNSH_7ClosureEE
Line
Count
Source
389
76
                 MetaServiceMethod<Request, Response> method) {
390
76
    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
391
76
    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
392
393
    // Applies only to the current file, and all req are non-const, but passed as const types.
394
76
    const_cast<Request&>(req).set_request_ip(BackendOptions::get_be_endpoint());
395
396
76
    int retry_times = 0;
397
76
    uint32_t duration_ms = 0;
398
76
    std::string error_msg;
399
76
    std::default_random_engine rng = make_random_engine();
400
76
    std::uniform_int_distribution<uint32_t> u(20, 200);
401
76
    std::uniform_int_distribution<uint32_t> u2(500, 1000);
402
76
    MetaServiceProxy* proxy;
403
76
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
404
76
    while (true) {
405
76
        std::shared_ptr<MetaService_Stub> stub;
406
76
        RETURN_IF_ERROR(proxy->get(&stub));
407
76
        brpc::Controller cntl;
408
76
        if (op_name == "get delete bitmap" || op_name == "update delete bitmap") {
409
0
            cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
410
76
        } else {
411
76
            cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
412
76
        }
413
76
        cntl.set_max_retry(kBrpcRetryTimes);
414
76
        res->Clear();
415
76
        int error_code = 0;
416
76
        (stub.get()->*method)(&cntl, &req, res, nullptr);
417
76
        if (cntl.Failed()) [[unlikely]] {
418
0
            error_msg = cntl.ErrorText();
419
0
            error_code = cntl.ErrorCode();
420
0
            proxy->set_unhealthy();
421
76
        } else if (res->status().code() == MetaServiceCode::OK) {
422
76
            return Status::OK();
423
76
        } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
424
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed to {}: {}", op_name,
425
0
                                                                     res->status().msg());
426
0
        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
427
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name,
428
0
                                                                   res->status().msg());
429
0
        } else {
430
0
            error_msg = res->status().msg();
431
0
        }
432
433
0
        if (error_code == brpc::ERPCTIMEDOUT) {
434
0
            g_cloud_meta_mgr_rpc_timeout_count << 1;
435
0
        }
436
437
0
        ++retry_times;
438
0
        if (retry_times > config::meta_service_rpc_retry_times ||
439
0
            (retry_times > config::meta_service_rpc_timeout_retry_times &&
440
0
             error_code == brpc::ERPCTIMEDOUT) ||
441
0
            (retry_times > config::meta_service_conflict_error_retry_times &&
442
0
             res->status().code() == MetaServiceCode::KV_TXN_CONFLICT)) {
443
0
            break;
444
0
        }
445
446
0
        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
447
0
        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times
448
0
                     << " sleep=" << duration_ms << "ms : " << cntl.ErrorText();
449
0
        bthread_usleep(duration_ms * 1000);
450
0
    }
451
0
    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg);
452
76
}
453
454
} // namespace
455
456
331k
Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta) {
457
18.4E
    VLOG_DEBUG << "send GetTabletRequest, tablet_id: " << tablet_id;
458
331k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::get_tablet_meta", Status::OK(), tablet_id,
459
331k
                                      tablet_meta);
460
331k
    GetTabletRequest req;
461
331k
    GetTabletResponse resp;
462
331k
    req.set_cloud_unique_id(config::cloud_unique_id);
463
331k
    req.set_tablet_id(tablet_id);
464
331k
    Status st = retry_rpc("get tablet meta", req, &resp, &MetaService_Stub::get_tablet);
465
331k
    if (!st.ok()) {
466
100
        if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
467
100
            return Status::NotFound("failed to get tablet meta: {}", resp.status().msg());
468
100
        }
469
0
        return st;
470
100
    }
471
472
330k
    *tablet_meta = std::make_shared<TabletMeta>();
473
330k
    (*tablet_meta)
474
330k
            ->init_from_pb(cloud_tablet_meta_to_doris(std::move(*resp.mutable_tablet_meta())));
475
330k
    VLOG_DEBUG << "get tablet meta, tablet_id: " << (*tablet_meta)->tablet_id();
476
330k
    return Status::OK();
477
331k
}
478
479
Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, const SyncOptions& options,
480
141k
                                         SyncRowsetStats* sync_stats) {
481
141k
    std::unique_lock lock {tablet->get_sync_meta_lock()};
482
141k
    return sync_tablet_rowsets_unlocked(tablet, lock, options, sync_stats);
483
141k
}
484
485
Status CloudMetaMgr::_log_mow_delete_bitmap(CloudTablet* tablet, GetRowsetResponse& resp,
486
                                            DeleteBitmap& delete_bitmap, int64_t old_max_version,
487
143k
                                            bool full_sync, int32_t read_version) {
488
143k
    if (config::enable_mow_verbose_log && !resp.rowset_meta().empty() &&
489
143k
        delete_bitmap.cardinality() > 0) {
490
0
        int64_t tablet_id = tablet->tablet_id();
491
0
        std::vector<std::string> new_rowset_msgs;
492
0
        std::vector<std::string> old_rowset_msgs;
493
0
        std::unordered_set<RowsetId> new_rowset_ids;
494
0
        int64_t new_max_version = resp.rowset_meta().rbegin()->end_version();
495
0
        for (const auto& rs : resp.rowset_meta()) {
496
0
            RowsetId rowset_id;
497
0
            rowset_id.init(rs.rowset_id_v2());
498
0
            new_rowset_ids.insert(rowset_id);
499
0
            DeleteBitmap rowset_dbm(tablet_id);
500
0
            delete_bitmap.subset({rowset_id, 0, 0},
501
0
                                 {rowset_id, std::numeric_limits<DeleteBitmap::SegmentId>::max(),
502
0
                                  std::numeric_limits<DeleteBitmap::Version>::max()},
503
0
                                 &rowset_dbm);
504
0
            size_t cardinality = rowset_dbm.cardinality();
505
0
            size_t count = rowset_dbm.get_delete_bitmap_count();
506
0
            if (cardinality > 0) {
507
0
                new_rowset_msgs.push_back(fmt::format("({}[{}-{}],{},{})", rs.rowset_id_v2(),
508
0
                                                      rs.start_version(), rs.end_version(), count,
509
0
                                                      cardinality));
510
0
            }
511
0
        }
512
513
0
        if (old_max_version > 0) {
514
0
            std::vector<RowsetSharedPtr> old_rowsets;
515
0
            RowsetIdUnorderedSet old_rowset_ids;
516
0
            {
517
0
                std::lock_guard<std::shared_mutex> rlock(tablet->get_header_lock());
518
0
                RETURN_IF_ERROR(tablet->get_all_rs_id_unlocked(old_max_version, &old_rowset_ids));
519
0
                old_rowsets = tablet->get_rowset_by_ids(&old_rowset_ids);
520
0
            }
521
0
            for (const auto& rs : old_rowsets) {
522
0
                if (!new_rowset_ids.contains(rs->rowset_id())) {
523
0
                    DeleteBitmap rowset_dbm(tablet_id);
524
0
                    delete_bitmap.subset(
525
0
                            {rs->rowset_id(), 0, 0},
526
0
                            {rs->rowset_id(), std::numeric_limits<DeleteBitmap::SegmentId>::max(),
527
0
                             std::numeric_limits<DeleteBitmap::Version>::max()},
528
0
                            &rowset_dbm);
529
0
                    size_t cardinality = rowset_dbm.cardinality();
530
0
                    size_t count = rowset_dbm.get_delete_bitmap_count();
531
0
                    if (cardinality > 0) {
532
0
                        old_rowset_msgs.push_back(
533
0
                                fmt::format("({}{},{},{})", rs->rowset_id().to_string(),
534
0
                                            rs->version().to_string(), count, cardinality));
535
0
                    }
536
0
                }
537
0
            }
538
0
        }
539
540
0
        std::string tablet_info = fmt::format(
541
0
                "tablet_id={} table_id={} index_id={} partition_id={}", tablet->tablet_id(),
542
0
                tablet->table_id(), tablet->index_id(), tablet->partition_id());
543
0
        LOG_INFO("[verbose] sync tablet delete bitmap " + tablet_info)
544
0
                .tag("full_sync", full_sync)
545
0
                .tag("read_version", read_version)
546
0
                .tag("old_max_version", old_max_version)
547
0
                .tag("new_max_version", new_max_version)
548
0
                .tag("cumu_compaction_cnt", resp.stats().cumulative_compaction_cnt())
549
0
                .tag("base_compaction_cnt", resp.stats().base_compaction_cnt())
550
0
                .tag("cumu_point", resp.stats().cumulative_point())
551
0
                .tag("rowset_num", resp.rowset_meta().size())
552
0
                .tag("delete_bitmap_cardinality", delete_bitmap.cardinality())
553
0
                .tag("old_rowsets(rowset,count,cardinality)",
554
0
                     fmt::format("[{}]", fmt::join(old_rowset_msgs, ", ")))
555
0
                .tag("new_rowsets(rowset,count,cardinality)",
556
0
                     fmt::format("[{}]", fmt::join(new_rowset_msgs, ", ")));
557
0
    }
558
143k
    return Status::OK();
559
143k
}
560
561
Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
562
                                                  std::unique_lock<bthread::Mutex>& lock,
563
                                                  const SyncOptions& options,
564
361k
                                                  SyncRowsetStats* sync_stats) {
565
361k
    using namespace std::chrono;
566
567
361k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet);
568
569
361k
    MetaServiceProxy* proxy;
570
361k
    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
571
361k
    std::string tablet_info =
572
361k
            fmt::format("tablet_id={} table_id={} index_id={} partition_id={}", tablet->tablet_id(),
573
361k
                        tablet->table_id(), tablet->index_id(), tablet->partition_id());
574
361k
    int tried = 0;
575
361k
    while (true) {
576
359k
        std::shared_ptr<MetaService_Stub> stub;
577
359k
        RETURN_IF_ERROR(proxy->get(&stub));
578
359k
        brpc::Controller cntl;
579
359k
        cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
580
359k
        GetRowsetRequest req;
581
359k
        GetRowsetResponse resp;
582
583
359k
        int64_t tablet_id = tablet->tablet_id();
584
359k
        int64_t table_id = tablet->table_id();
585
359k
        int64_t index_id = tablet->index_id();
586
359k
        req.set_cloud_unique_id(config::cloud_unique_id);
587
359k
        auto* idx = req.mutable_idx();
588
359k
        idx->set_tablet_id(tablet_id);
589
359k
        idx->set_table_id(table_id);
590
359k
        idx->set_index_id(index_id);
591
359k
        idx->set_partition_id(tablet->partition_id());
592
359k
        {
593
359k
            std::shared_lock rlock(tablet->get_header_lock());
594
359k
            if (options.full_sync) {
595
2
                req.set_start_version(0);
596
359k
            } else {
597
359k
                req.set_start_version(tablet->max_version_unlocked() + 1);
598
359k
            }
599
359k
            req.set_base_compaction_cnt(tablet->base_compaction_cnt());
600
359k
            req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt());
601
359k
            req.set_full_compaction_cnt(tablet->full_compaction_cnt());
602
359k
            req.set_cumulative_point(tablet->cumulative_layer_point());
603
359k
        }
604
359k
        req.set_end_version(-1);
605
359k
        VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString();
606
359k
        auto start = std::chrono::steady_clock::now();
607
359k
        stub->get_rowset(&cntl, &req, &resp, nullptr);
608
359k
        auto end = std::chrono::steady_clock::now();
609
359k
        int64_t latency = cntl.latency_us();
610
359k
        _get_rowset_latency << latency;
611
359k
        int retry_times = config::meta_service_rpc_retry_times;
612
359k
        if (cntl.Failed()) {
613
0
            proxy->set_unhealthy();
614
0
            if (tried++ < retry_times) {
615
0
                auto rng = make_random_engine();
616
0
                std::uniform_int_distribution<uint32_t> u(20, 200);
617
0
                std::uniform_int_distribution<uint32_t> u1(500, 1000);
618
0
                uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng);
619
0
                bthread_usleep(duration_ms * 1000);
620
0
                LOG_INFO("failed to get rowset meta, " + tablet_info)
621
0
                        .tag("reason", cntl.ErrorText())
622
0
                        .tag("tried", tried)
623
0
                        .tag("sleep", duration_ms);
624
0
                continue;
625
0
            }
626
0
            return Status::RpcError("failed to get rowset meta: {}", cntl.ErrorText());
627
0
        }
628
359k
        if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
629
0
            LOG(WARNING) << "failed to get rowset meta, err=" << resp.status().msg() << " "
630
0
                         << tablet_info;
631
0
            return Status::NotFound("failed to get rowset meta: {}, {}", resp.status().msg(),
632
0
                                    tablet_info);
633
0
        }
634
359k
        if (resp.status().code() != MetaServiceCode::OK) {
635
0
            LOG(WARNING) << " failed to get rowset meta, err=" << resp.status().msg() << " "
636
0
                         << tablet_info;
637
0
            return Status::InternalError("failed to get rowset meta: {}, {}", resp.status().msg(),
638
0
                                         tablet_info);
639
0
        }
640
359k
        if (latency > 100 * 1000) { // 100ms
641
1.42k
            LOG(INFO) << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size()
642
1.42k
                      << ", latency=" << latency << "us"
643
1.42k
                      << " " << tablet_info;
644
358k
        } else {
645
358k
            LOG_EVERY_N(INFO, 100)
646
3.59k
                    << "finish get_rowset rpc. rowset_meta.size()=" << resp.rowset_meta().size()
647
3.59k
                    << ", latency=" << latency << "us"
648
3.59k
                    << " " << tablet_info;
649
358k
        }
650
651
359k
        int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
652
359k
        tablet->last_sync_time_s = now;
653
654
359k
        if (sync_stats) {
655
120k
            sync_stats->get_remote_rowsets_rpc_ns +=
656
120k
                    std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
657
120k
            sync_stats->get_remote_rowsets_num += resp.rowset_meta().size();
658
120k
        }
659
660
        // If is mow, the tablet has no delete bitmap in base rowsets.
661
        // So dont need to sync it.
662
361k
        if (options.sync_delete_bitmap && tablet->enable_unique_key_merge_on_write() &&
663
359k
            tablet->tablet_state() == TABLET_RUNNING) {
664
143k
            DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.sync_tablet_delete_bitmap.block",
665
143k
                            DBUG_BLOCK);
666
143k
            DeleteBitmap delete_bitmap(tablet_id);
667
143k
            int64_t old_max_version = req.start_version() - 1;
668
143k
            auto read_version = config::delete_bitmap_store_read_version;
669
143k
            auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(),
670
143k
                                                resp.stats(), req.idx(), &delete_bitmap,
671
143k
                                                options.full_sync, sync_stats, read_version, false);
672
143k
            if (st.is<ErrorCode::ROWSETS_EXPIRED>() && tried++ < retry_times) {
673
0
                LOG_INFO("rowset meta is expired, need to retry, " + tablet_info)
674
0
                        .tag("tried", tried)
675
0
                        .error(st);
676
0
                continue;
677
0
            }
678
143k
            if (!st.ok()) {
679
0
                LOG_WARNING("failed to get delete bitmap, " + tablet_info).error(st);
680
0
                return st;
681
0
            }
682
143k
            tablet->tablet_meta()->delete_bitmap().merge(delete_bitmap);
683
143k
            RETURN_IF_ERROR(_log_mow_delete_bitmap(tablet, resp, delete_bitmap, old_max_version,
684
143k
                                                   options.full_sync, read_version));
685
143k
            RETURN_IF_ERROR(
686
143k
                    _check_delete_bitmap_v2_correctness(tablet, req, resp, old_max_version));
687
143k
        }
688
359k
        DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.before.modify_tablet_meta", {
689
359k
            auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
690
359k
            if (target_tablet_id == tablet->tablet_id()) {
691
359k
                DBUG_BLOCK
692
359k
            }
693
359k
        });
694
359k
        {
695
359k
            const auto& stats = resp.stats();
696
359k
            std::unique_lock wlock(tablet->get_header_lock());
697
698
            // ATTN: we are facing following data race
699
            //
700
            // resp_base_compaction_cnt=0|base_compaction_cnt=0|resp_cumulative_compaction_cnt=0|cumulative_compaction_cnt=1|resp_max_version=11|max_version=8
701
            //
702
            //   BE-compaction-thread                 meta-service                                     BE-query-thread
703
            //            |                                |                                                |
704
            //    local   |    commit cumu-compaction      |                                                |
705
            //   cc_cnt=0 |  --------------------------->  |     sync rowset (long rpc, local cc_cnt=0 )    |   local
706
            //            |                                |  <-----------------------------------------    |  cc_cnt=0
707
            //            |                                |  -.                                            |
708
            //    local   |       done cc_cnt=1            |    \                                           |
709
            //   cc_cnt=1 |  <---------------------------  |     \                                          |
710
            //            |                                |      \  returned with resp cc_cnt=0 (snapshot) |
711
            //            |                                |       '------------------------------------>   |   local
712
            //            |                                |                                                |  cc_cnt=1
713
            //            |                                |                                                |
714
            //            |                                |                                                |  CHECK FAIL
715
            //            |                                |                                                |  need retry
716
            // To get rid of just retry syncing tablet
717
359k
            if (stats.base_compaction_cnt() < tablet->base_compaction_cnt() ||
718
361k
                stats.cumulative_compaction_cnt() < tablet->cumulative_compaction_cnt())
719
34
                    [[unlikely]] {
720
                // stale request, ignore
721
34
                LOG_WARNING("stale get rowset meta request " + tablet_info)
722
34
                        .tag("resp_base_compaction_cnt", stats.base_compaction_cnt())
723
34
                        .tag("base_compaction_cnt", tablet->base_compaction_cnt())
724
34
                        .tag("resp_cumulative_compaction_cnt", stats.cumulative_compaction_cnt())
725
34
                        .tag("cumulative_compaction_cnt", tablet->cumulative_compaction_cnt())
726
34
                        .tag("tried", tried);
727
34
                if (tried++ < 10) continue;
728
0
                return Status::OK();
729
34
            }
730
359k
            std::vector<RowsetSharedPtr> rowsets;
731
359k
            rowsets.reserve(resp.rowset_meta().size());
732
359k
            for (const auto& cloud_rs_meta_pb : resp.rowset_meta()) {
733
323k
                VLOG_DEBUG << "get rowset meta, tablet_id=" << cloud_rs_meta_pb.tablet_id()
734
74
                           << ", version=[" << cloud_rs_meta_pb.start_version() << '-'
735
74
                           << cloud_rs_meta_pb.end_version() << ']';
736
323k
                auto existed_rowset = tablet->get_rowset_by_version(
737
323k
                        {cloud_rs_meta_pb.start_version(), cloud_rs_meta_pb.end_version()});
738
323k
                if (existed_rowset &&
739
323k
                    existed_rowset->rowset_id().to_string() == cloud_rs_meta_pb.rowset_id_v2()) {
740
83
                    continue; // Same rowset, skip it
741
83
                }
742
323k
                RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris(cloud_rs_meta_pb);
743
323k
                auto rs_meta = std::make_shared<RowsetMeta>();
744
323k
                rs_meta->init_from_pb(meta_pb);
745
323k
                RowsetSharedPtr rowset;
746
                // schema is nullptr implies using RowsetMeta.tablet_schema
747
323k
                Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset);
748
323k
                if (!s.ok()) {
749
0
                    LOG_WARNING("create rowset").tag("status", s);
750
0
                    return s;
751
0
                }
752
323k
                rowsets.push_back(std::move(rowset));
753
323k
            }
754
359k
            if (!rowsets.empty()) {
755
                // `rowsets.empty()` could happen after doing EMPTY_CUMULATIVE compaction. e.g.:
756
                //   BE has [0-1][2-11][12-12], [12-12] is delete predicate, cp is 2;
757
                //   after doing EMPTY_CUMULATIVE compaction, MS cp is 13, get_rowset will return [2-11][12-12].
758
270k
                bool version_overlap =
759
270k
                        tablet->max_version_unlocked() >= rowsets.front()->start_version();
760
270k
                tablet->add_rowsets(std::move(rowsets), version_overlap, wlock,
761
270k
                                    options.warmup_delta_data ||
762
270k
                                            config::enable_warmup_immediately_on_new_rowset);
763
270k
            }
764
765
            // Fill version holes
766
359k
            int64_t partition_max_version =
767
18.4E
                    resp.has_partition_max_version() ? resp.partition_max_version() : -1;
768
359k
            RETURN_IF_ERROR(fill_version_holes(tablet, partition_max_version, wlock));
769
770
359k
            tablet->last_base_compaction_success_time_ms = stats.last_base_compaction_time_ms();
771
359k
            tablet->last_cumu_compaction_success_time_ms = stats.last_cumu_compaction_time_ms();
772
359k
            tablet->set_base_compaction_cnt(stats.base_compaction_cnt());
773
359k
            tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
774
359k
            tablet->set_full_compaction_cnt(stats.full_compaction_cnt());
775
359k
            tablet->set_cumulative_layer_point(stats.cumulative_point());
776
359k
            tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
777
359k
                                            stats.num_rows(), stats.data_size());
778
779
            // Sync last active cluster info for compaction read-write separation
780
360k
            if (config::enable_compaction_rw_separation && stats.has_last_active_cluster_id()) {
781
80.8k
                tablet->set_last_active_cluster_info(stats.last_active_cluster_id(),
782
80.8k
                                                     stats.last_active_time_ms());
783
80.8k
            }
784
359k
        }
785
0
        return Status::OK();
786
359k
    }
787
361k
}
788
789
bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t old_max_version,
790
                                                      std::ranges::range auto&& rs_metas,
791
81.6k
                                                      DeleteBitmap* delete_bitmap) {
792
81.6k
    std::set<int64_t> txn_processed;
793
81.7k
    for (auto& rs_meta : rs_metas) {
794
81.7k
        auto txn_id = rs_meta.txn_id();
795
81.7k
        if (txn_processed.find(txn_id) != txn_processed.end()) {
796
0
            continue;
797
0
        }
798
81.7k
        txn_processed.insert(txn_id);
799
81.7k
        DeleteBitmapPtr tmp_delete_bitmap;
800
81.7k
        std::shared_ptr<PublishStatus> publish_status =
801
81.7k
                std::make_shared<PublishStatus>(PublishStatus::INIT);
802
81.7k
        CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
803
81.7k
        Status status = engine.txn_delete_bitmap_cache().get_delete_bitmap(
804
81.7k
                txn_id, tablet->tablet_id(), &tmp_delete_bitmap, nullptr, &publish_status);
805
        // CloudMetaMgr::sync_tablet_delete_bitmap_by_cache() is called after we sync rowsets from meta services.
806
        // If the control flows reaches here, it's gauranteed that the rowsets is commited in meta services, so we can
807
        // use the delete bitmap from cache directly if *publish_status == PublishStatus::SUCCEED without checking other
808
        // stats(version or compaction stats)
809
81.7k
        if (status.ok() && *publish_status == PublishStatus::SUCCEED) {
810
            // tmp_delete_bitmap contains sentinel marks, we should remove it before merge it to delete bitmap.
811
            // Also, the version of delete bitmap key in tmp_delete_bitmap is DeleteBitmap::TEMP_VERSION_COMMON,
812
            // we should replace it with the rowset's real version
813
53.3k
            DCHECK(rs_meta.start_version() == rs_meta.end_version());
814
53.3k
            int64_t rowset_version = rs_meta.start_version();
815
240k
            for (const auto& [delete_bitmap_key, bitmap_value] : tmp_delete_bitmap->delete_bitmap) {
816
                // skip sentinel mark, which is used for delete bitmap correctness check
817
240k
                if (std::get<1>(delete_bitmap_key) != DeleteBitmap::INVALID_SEGMENT_ID) {
818
9.88k
                    delete_bitmap->merge({std::get<0>(delete_bitmap_key),
819
9.88k
                                          std::get<1>(delete_bitmap_key), rowset_version},
820
9.88k
                                         bitmap_value);
821
9.88k
                }
822
240k
            }
823
53.3k
            engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id,
824
53.3k
                                                                           tablet->tablet_id());
825
53.3k
        } else {
826
28.4k
            LOG_EVERY_N(INFO, 20)
827
1.41k
                    << "delete bitmap not found in cache, will sync rowset to get. tablet_id= "
828
1.41k
                    << tablet->tablet_id() << ", txn_id=" << txn_id << ", status=" << status;
829
28.4k
            return false;
830
28.4k
        }
831
81.7k
    }
832
53.2k
    return true;
833
81.6k
}
834
835
Status CloudMetaMgr::_get_delete_bitmap_from_ms(GetDeleteBitmapRequest& req,
836
28.7k
                                                GetDeleteBitmapResponse& res) {
837
18.4E
    VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString();
838
28.7k
    TEST_SYNC_POINT_CALLBACK("CloudMetaMgr::_get_delete_bitmap_from_ms", &req, &res);
839
840
28.7k
    auto st = retry_rpc("get delete bitmap", req, &res, &MetaService_Stub::get_delete_bitmap);
841
28.7k
    if (st.code() == ErrorCode::THRIFT_RPC_ERROR) {
842
0
        return st;
843
0
    }
844
845
28.7k
    if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
846
1
        return Status::NotFound("failed to get delete bitmap: {}", res.status().msg());
847
1
    }
848
    // The delete bitmap of stale rowsets will be removed when commit compaction job,
849
    // then delete bitmap of stale rowsets cannot be obtained. But the rowsets obtained
850
    // by sync_tablet_rowsets may include these stale rowsets. When this case happend, the
851
    // error code of ROWSETS_EXPIRED will be returned, we need to retry sync rowsets again.
852
    //
853
    // Be query thread             meta-service          Be compaction thread
854
    //      |                            |                         |
855
    //      |        get rowset          |                         |
856
    //      |--------------------------->|                         |
857
    //      |    return get rowset       |                         |
858
    //      |<---------------------------|                         |
859
    //      |                            |        commit job       |
860
    //      |                            |<------------------------|
861
    //      |                            |    return commit job    |
862
    //      |                            |------------------------>|
863
    //      |      get delete bitmap     |                         |
864
    //      |--------------------------->|                         |
865
    //      |  return get delete bitmap  |                         |
866
    //      |<---------------------------|                         |
867
    //      |                            |                         |
868
28.7k
    if (res.status().code() == MetaServiceCode::ROWSETS_EXPIRED) {
869
0
        return Status::Error<ErrorCode::ROWSETS_EXPIRED, false>("failed to get delete bitmap: {}",
870
0
                                                                res.status().msg());
871
0
    }
872
28.7k
    if (res.status().code() != MetaServiceCode::OK) {
873
0
        return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to get delete bitmap: {}",
874
0
                                                               res.status().msg());
875
0
    }
876
28.7k
    return Status::OK();
877
28.7k
}
878
879
Status CloudMetaMgr::_get_delete_bitmap_from_ms_by_batch(GetDeleteBitmapRequest& req,
880
                                                         GetDeleteBitmapResponse& res,
881
28.4k
                                                         int64_t bytes_threadhold) {
882
28.4k
    std::unordered_set<std::string> finished_rowset_ids {};
883
28.4k
    int count = 0;
884
28.7k
    do {
885
28.7k
        GetDeleteBitmapRequest cur_req;
886
28.7k
        GetDeleteBitmapResponse cur_res;
887
888
28.7k
        cur_req.set_cloud_unique_id(config::cloud_unique_id);
889
28.7k
        cur_req.set_tablet_id(req.tablet_id());
890
28.7k
        cur_req.set_base_compaction_cnt(req.base_compaction_cnt());
891
28.7k
        cur_req.set_cumulative_compaction_cnt(req.cumulative_compaction_cnt());
892
28.7k
        cur_req.set_cumulative_point(req.cumulative_point());
893
28.7k
        *(cur_req.mutable_idx()) = req.idx();
894
28.7k
        cur_req.set_store_version(req.store_version());
895
28.7k
        if (bytes_threadhold > 0) {
896
28.7k
            cur_req.set_dbm_bytes_threshold(bytes_threadhold);
897
28.7k
        }
898
65.6k
        for (int i = 0; i < req.rowset_ids_size(); i++) {
899
36.9k
            if (!finished_rowset_ids.contains(req.rowset_ids(i))) {
900
35.8k
                cur_req.add_rowset_ids(req.rowset_ids(i));
901
35.8k
                cur_req.add_begin_versions(req.begin_versions(i));
902
35.8k
                cur_req.add_end_versions(req.end_versions(i));
903
35.8k
            }
904
36.9k
        }
905
906
28.7k
        RETURN_IF_ERROR(_get_delete_bitmap_from_ms(cur_req, cur_res));
907
28.7k
        ++count;
908
909
        // v1 delete bitmap
910
28.7k
        res.mutable_rowset_ids()->MergeFrom(cur_res.rowset_ids());
911
28.7k
        res.mutable_segment_ids()->MergeFrom(cur_res.segment_ids());
912
28.7k
        res.mutable_versions()->MergeFrom(cur_res.versions());
913
28.7k
        res.mutable_segment_delete_bitmaps()->MergeFrom(cur_res.segment_delete_bitmaps());
914
915
        // v2 delete bitmap
916
28.7k
        res.mutable_delta_rowset_ids()->MergeFrom(cur_res.delta_rowset_ids());
917
28.7k
        res.mutable_delete_bitmap_storages()->MergeFrom(cur_res.delete_bitmap_storages());
918
919
34.8k
        for (const auto& rowset_id : cur_res.returned_rowset_ids()) {
920
34.8k
            finished_rowset_ids.insert(rowset_id);
921
34.8k
        }
922
923
28.7k
        bool has_more = cur_res.has_has_more() && cur_res.has_more();
924
28.7k
        if (!has_more) {
925
28.4k
            break;
926
28.4k
        }
927
210
        LOG_INFO("batch get delete bitmap, progress={}/{}", finished_rowset_ids.size(),
928
210
                 req.rowset_ids_size())
929
210
                .tag("tablet_id", req.tablet_id())
930
210
                .tag("cur_returned_rowsets", cur_res.returned_rowset_ids_size())
931
210
                .tag("rpc_count", count);
932
210
    } while (finished_rowset_ids.size() < req.rowset_ids_size());
933
28.4k
    return Status::OK();
934
28.4k
}
935
936
Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version,
937
                                               std::ranges::range auto&& rs_metas,
938
                                               const TabletStatsPB& stats, const TabletIndexPB& idx,
939
                                               DeleteBitmap* delete_bitmap, bool full_sync,
940
                                               SyncRowsetStats* sync_stats, int32_t read_version,
941
142k
                                               bool full_sync_v2) {
942
142k
    if (rs_metas.empty()) {
943
61.1k
        return Status::OK();
944
61.1k
    }
945
946
81.6k
    if (!full_sync && config::enable_sync_tablet_delete_bitmap_by_cache &&
947
81.7k
        sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, delete_bitmap)) {
948
53.5k
        if (sync_stats) {
949
23.7k
            sync_stats->get_local_delete_bitmap_rowsets_num += rs_metas.size();
950
23.7k
        }
951
53.5k
        return Status::OK();
952
53.5k
    } else {
953
28.1k
        DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
954
28.1k
        *delete_bitmap = *new_delete_bitmap;
955
28.1k
    }
956
957
28.1k
    if (read_version == 2 && config::delete_bitmap_store_write_version == 1) {
958
0
        return Status::InternalError(
959
0
                "please set delete_bitmap_store_read_version to 1 or 3 because "
960
0
                "delete_bitmap_store_write_version is 1");
961
28.3k
    } else if (read_version == 1 && config::delete_bitmap_store_write_version == 2) {
962
0
        return Status::InternalError(
963
0
                "please set delete_bitmap_store_read_version to 2 or 3 because "
964
0
                "delete_bitmap_store_write_version is 2");
965
0
    }
966
967
28.1k
    int64_t new_max_version = std::max(old_max_version, rs_metas.rbegin()->end_version());
968
    // When there are many delete bitmaps that need to be synchronized, it
969
    // may take a longer time, especially when loading the tablet for the
970
    // first time, so set a relatively long timeout time.
971
28.1k
    GetDeleteBitmapRequest req;
972
28.1k
    GetDeleteBitmapResponse res;
973
28.1k
    req.set_cloud_unique_id(config::cloud_unique_id);
974
28.1k
    req.set_tablet_id(tablet->tablet_id());
975
28.1k
    req.set_base_compaction_cnt(stats.base_compaction_cnt());
976
28.1k
    req.set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
977
28.1k
    req.set_cumulative_point(stats.cumulative_point());
978
28.1k
    *(req.mutable_idx()) = idx;
979
28.1k
    req.set_store_version(read_version);
980
    // New rowset sync all versions of delete bitmap
981
34.7k
    for (const auto& rs_meta : rs_metas) {
982
34.7k
        req.add_rowset_ids(rs_meta.rowset_id_v2());
983
34.7k
        req.add_begin_versions(0);
984
34.7k
        req.add_end_versions(new_max_version);
985
34.7k
    }
986
987
28.4k
    if (!full_sync_v2) {
988
        // old rowset sync incremental versions of delete bitmap
989
28.4k
        if (old_max_version > 0 && old_max_version < new_max_version) {
990
43
            RowsetIdUnorderedSet all_rs_ids;
991
43
            RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids));
992
98
            for (const auto& rs_id : all_rs_ids) {
993
98
                req.add_rowset_ids(rs_id.to_string());
994
98
                req.add_begin_versions(old_max_version + 1);
995
98
                req.add_end_versions(new_max_version);
996
98
            }
997
43
        }
998
18.4E
    } else {
999
18.4E
        if (old_max_version > 0) {
1000
0
            RowsetIdUnorderedSet all_rs_ids;
1001
0
            RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids));
1002
0
            for (const auto& rs_id : all_rs_ids) {
1003
0
                req.add_rowset_ids(rs_id.to_string());
1004
0
                req.add_begin_versions(0);
1005
0
                req.add_end_versions(new_max_version);
1006
0
            }
1007
0
        }
1008
18.4E
    }
1009
28.1k
    if (sync_stats) {
1010
10.5k
        sync_stats->get_remote_delete_bitmap_rowsets_num += req.rowset_ids_size();
1011
10.5k
    }
1012
1013
28.1k
    auto start = std::chrono::steady_clock::now();
1014
28.4k
    if (config::enable_batch_get_delete_bitmap) {
1015
28.4k
        RETURN_IF_ERROR(_get_delete_bitmap_from_ms_by_batch(
1016
28.4k
                req, res, config::get_delete_bitmap_bytes_threshold));
1017
18.4E
    } else {
1018
18.4E
        RETURN_IF_ERROR(_get_delete_bitmap_from_ms(req, res));
1019
18.4E
    }
1020
28.1k
    auto end = std::chrono::steady_clock::now();
1021
1022
    // v1 delete bitmap
1023
28.1k
    const auto& rowset_ids = res.rowset_ids();
1024
28.1k
    const auto& segment_ids = res.segment_ids();
1025
28.1k
    const auto& vers = res.versions();
1026
28.1k
    const auto& delete_bitmaps = res.segment_delete_bitmaps();
1027
28.4k
    if (rowset_ids.size() != segment_ids.size() || rowset_ids.size() != vers.size() ||
1028
28.4k
        rowset_ids.size() != delete_bitmaps.size()) {
1029
0
        return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
1030
0
                "get delete bitmap data wrong,"
1031
0
                "rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}",
1032
0
                rowset_ids.size(), segment_ids.size(), vers.size(), delete_bitmaps.size());
1033
0
    }
1034
28.4k
    for (int i = 0; i < rowset_ids.size(); i++) {
1035
361
        RowsetId rst_id;
1036
361
        rst_id.init(rowset_ids[i]);
1037
361
        delete_bitmap->merge(
1038
361
                {rst_id, segment_ids[i], vers[i]},
1039
361
                roaring::Roaring::readSafe(delete_bitmaps[i].data(), delete_bitmaps[i].length()));
1040
361
    }
1041
    // v2 delete bitmap
1042
28.1k
    const auto& delta_rowset_ids = res.delta_rowset_ids();
1043
28.1k
    const auto& delete_bitmap_storages = res.delete_bitmap_storages();
1044
28.1k
    if (delta_rowset_ids.size() != delete_bitmap_storages.size()) {
1045
0
        return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
1046
0
                "get delete bitmap data wrong, delta_rowset_ids.size={}, "
1047
0
                "delete_bitmap_storages.size={}",
1048
0
                delta_rowset_ids.size(), delete_bitmap_storages.size());
1049
0
    }
1050
28.1k
    int64_t remote_delete_bitmap_bytes = 0;
1051
28.1k
    RETURN_IF_ERROR(_read_tablet_delete_bitmap_v2(tablet, old_max_version, rs_metas, delete_bitmap,
1052
28.1k
                                                  res, remote_delete_bitmap_bytes, full_sync_v2));
1053
1054
28.1k
    if (sync_stats) {
1055
10.5k
        sync_stats->get_remote_delete_bitmap_rpc_ns +=
1056
10.5k
                std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
1057
10.5k
        sync_stats->get_remote_delete_bitmap_key_count +=
1058
10.5k
                delete_bitmaps.size() + delete_bitmap_storages.size();
1059
10.5k
        for (const auto& dbm : delete_bitmaps) {
1060
342
            sync_stats->get_remote_delete_bitmap_bytes += dbm.length();
1061
342
        }
1062
10.5k
        sync_stats->get_remote_delete_bitmap_bytes += remote_delete_bitmap_bytes;
1063
10.5k
    }
1064
28.1k
    int64_t latency = std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
1065
28.1k
    if (latency > 100 * 1000) { // 100ms
1066
160
        LOG(INFO) << "finish get_delete_bitmap rpcs. rowset_ids.size()=" << rowset_ids.size()
1067
160
                  << ", delete_bitmaps.size()=" << delete_bitmaps.size()
1068
160
                  << ", delta_delete_bitmaps.size()=" << delta_rowset_ids.size()
1069
160
                  << ", latency=" << latency << "us, read_version=" << read_version;
1070
27.9k
    } else {
1071
27.9k
        LOG_EVERY_N(INFO, 100) << "finish get_delete_bitmap rpcs. rowset_ids.size()="
1072
283
                               << rowset_ids.size()
1073
283
                               << ", delete_bitmaps.size()=" << delete_bitmaps.size()
1074
283
                               << ", delta_delete_bitmaps.size()=" << delta_rowset_ids.size()
1075
283
                               << ", latency=" << latency << "us, read_version=" << read_version;
1076
27.9k
    }
1077
28.1k
    return Status::OK();
1078
28.1k
}
1079
1080
Status CloudMetaMgr::_check_delete_bitmap_v2_correctness(CloudTablet* tablet, GetRowsetRequest& req,
1081
                                                         GetRowsetResponse& resp,
1082
143k
                                                         int64_t old_max_version) {
1083
143k
    if (!config::enable_delete_bitmap_store_v2_check_correctness ||
1084
143k
        config::delete_bitmap_store_write_version == 1 || resp.rowset_meta().empty()) {
1085
143k
        return Status::OK();
1086
143k
    }
1087
18.4E
    int64_t tablet_id = tablet->tablet_id();
1088
18.4E
    int64_t new_max_version = std::max(old_max_version, resp.rowset_meta().rbegin()->end_version());
1089
    // rowset_id, num_segments
1090
18.4E
    std::vector<std::pair<RowsetId, int64_t>> all_rowsets;
1091
18.4E
    std::map<std::string, std::string> rowset_to_resource;
1092
18.4E
    for (const auto& rs_meta : resp.rowset_meta()) {
1093
0
        RowsetId rowset_id;
1094
0
        rowset_id.init(rs_meta.rowset_id_v2());
1095
0
        all_rowsets.emplace_back(std::make_pair(rowset_id, rs_meta.num_segments()));
1096
0
        rowset_to_resource[rs_meta.rowset_id_v2()] = rs_meta.resource_id();
1097
0
    }
1098
18.4E
    if (old_max_version > 0) {
1099
0
        RowsetIdUnorderedSet all_rs_ids;
1100
0
        RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids));
1101
0
        for (auto& rowset : tablet->get_rowset_by_ids(&all_rs_ids)) {
1102
0
            all_rowsets.emplace_back(std::make_pair(rowset->rowset_id(), rowset->num_segments()));
1103
0
            rowset_to_resource[rowset->rowset_id().to_string()] =
1104
0
                    rowset->rowset_meta()->resource_id();
1105
0
        }
1106
0
    }
1107
1108
18.4E
    auto compare_delete_bitmap = [&](DeleteBitmap* delete_bitmap, int version) {
1109
0
        bool success = true;
1110
0
        for (auto& [rs_id, num_segments] : all_rowsets) {
1111
0
            for (int seg_id = 0; seg_id < num_segments; ++seg_id) {
1112
0
                DeleteBitmap::BitmapKey key = {rs_id, seg_id, new_max_version};
1113
0
                auto dm1 = tablet->tablet_meta()->delete_bitmap().get_agg(key);
1114
0
                auto dm2 = delete_bitmap->get_agg_without_cache(key);
1115
0
                if (*dm1 != *dm2) {
1116
0
                    success = false;
1117
0
                    LOG(WARNING) << "failed to check delete bitmap correctness by v"
1118
0
                                 << std::to_string(version) << ", tablet_id=" << tablet->tablet_id()
1119
0
                                 << ", rowset_id=" << rs_id.to_string() << ", segment_id=" << seg_id
1120
0
                                 << ", max_version=" << new_max_version
1121
0
                                 << ". size1=" << dm1->cardinality()
1122
0
                                 << ", size2=" << dm2->cardinality();
1123
0
                }
1124
0
            }
1125
0
        }
1126
0
        if (success) {
1127
0
            LOG(INFO) << "succeed to check delete bitmap correctness by v"
1128
0
                      << std::to_string(version) << ", tablet_id=" << tablet->tablet_id()
1129
0
                      << ", max_version=" << new_max_version;
1130
0
        }
1131
0
    };
1132
1133
18.4E
    DeleteBitmap full_delete_bitmap(tablet_id);
1134
18.4E
    auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(), resp.stats(),
1135
18.4E
                                        req.idx(), &full_delete_bitmap, false, nullptr, 2, true);
1136
18.4E
    if (!st.ok()) {
1137
0
        LOG_WARNING("failed to check delete bitmap correctness by v2")
1138
0
                .tag("tablet", tablet->tablet_id())
1139
0
                .error(st);
1140
18.4E
    } else {
1141
18.4E
        compare_delete_bitmap(&full_delete_bitmap, 2);
1142
18.4E
    }
1143
18.4E
    return Status::OK();
1144
18.4E
}
1145
1146
Status CloudMetaMgr::_read_tablet_delete_bitmap_v2(CloudTablet* tablet, int64_t old_max_version,
1147
                                                   std::ranges::range auto&& rs_metas,
1148
                                                   DeleteBitmap* delete_bitmap,
1149
                                                   GetDeleteBitmapResponse& res,
1150
                                                   int64_t& remote_delete_bitmap_bytes,
1151
28.4k
                                                   bool full_sync_v2) {
1152
28.5k
    if (res.delta_rowset_ids().empty()) {
1153
28.5k
        return Status::OK();
1154
28.5k
    }
1155
18.4E
    const auto& rowset_ids = res.delta_rowset_ids();
1156
18.4E
    const auto& delete_bitmap_storages = res.delete_bitmap_storages();
1157
18.4E
    RowsetIdUnorderedSet all_rs_ids;
1158
18.4E
    std::map<std::string, std::string> rowset_to_resource;
1159
18.4E
    if (old_max_version > 0) {
1160
0
        RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, &all_rs_ids));
1161
0
        if (full_sync_v2) {
1162
0
            for (auto& rowset : tablet->get_rowset_by_ids(&all_rs_ids)) {
1163
0
                rowset_to_resource[rowset->rowset_id().to_string()] =
1164
0
                        rowset->rowset_meta()->resource_id();
1165
0
            }
1166
0
        }
1167
0
    }
1168
18.4E
    for (const auto& rs_meta : rs_metas) {
1169
0
        RowsetId rs_id;
1170
0
        rs_id.init(rs_meta.rowset_id_v2());
1171
0
        all_rs_ids.emplace(rs_id);
1172
0
        rowset_to_resource[rs_meta.rowset_id_v2()] = rs_meta.resource_id();
1173
0
    }
1174
18.4E
    if (config::enable_mow_verbose_log) {
1175
0
        LOG(INFO) << "read delete bitmap for tablet_id=" << tablet->tablet_id()
1176
0
                  << ", old_max_version=" << old_max_version
1177
0
                  << ", new rowset num=" << rs_metas.size()
1178
0
                  << ", rowset has delete bitmap num=" << rowset_ids.size()
1179
0
                  << ". all rowset num=" << all_rs_ids.size();
1180
0
    }
1181
1182
18.4E
    std::mutex result_mtx;
1183
18.4E
    Status result;
1184
18.4E
    auto merge_delete_bitmap = [&](const std::string& rowset_id, DeleteBitmapPB& dbm) {
1185
0
        if (dbm.rowset_ids_size() != dbm.segment_ids_size() ||
1186
0
            dbm.rowset_ids_size() != dbm.versions_size() ||
1187
0
            dbm.rowset_ids_size() != dbm.segment_delete_bitmaps_size()) {
1188
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
1189
0
                    "get delete bitmap data wrong, rowset_id={}"
1190
0
                    "rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}",
1191
0
                    rowset_id, dbm.rowset_ids_size(), dbm.segment_ids_size(), dbm.versions_size(),
1192
0
                    dbm.segment_delete_bitmaps_size());
1193
0
        }
1194
0
        if (config::enable_mow_verbose_log) {
1195
0
            LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id()
1196
0
                      << ", rowset_id=" << rowset_id
1197
0
                      << ", delete_bitmap num=" << dbm.segment_delete_bitmaps_size();
1198
0
        }
1199
0
        std::lock_guard lock(result_mtx);
1200
0
        for (int j = 0; j < dbm.rowset_ids_size(); j++) {
1201
0
            RowsetId rst_id;
1202
0
            rst_id.init(dbm.rowset_ids(j));
1203
0
            if (!all_rs_ids.contains(rst_id)) {
1204
0
                LOG(INFO) << "skip merge delete bitmap for tablet_id=" << tablet->tablet_id()
1205
0
                          << ", rowset_id=" << rowset_id << ", unused rowset_id=" << rst_id;
1206
0
                continue;
1207
0
            }
1208
0
            delete_bitmap->merge(
1209
0
                    {rst_id, dbm.segment_ids(j), dbm.versions(j)},
1210
0
                    roaring::Roaring::readSafe(dbm.segment_delete_bitmaps(j).data(),
1211
0
                                               dbm.segment_delete_bitmaps(j).length()));
1212
0
            remote_delete_bitmap_bytes += dbm.segment_delete_bitmaps(j).length();
1213
0
        }
1214
0
        return Status::OK();
1215
0
    };
1216
18.4E
    auto get_delete_bitmap_from_file = [&](const std::string& rowset_id,
1217
18.4E
                                           const DeleteBitmapStoragePB& storage) {
1218
0
        if (config::enable_mow_verbose_log) {
1219
0
            LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id()
1220
0
                      << ", rowset_id=" << rowset_id << " from file"
1221
0
                      << ", is_packed=" << storage.has_packed_slice_location();
1222
0
        }
1223
0
        if (rowset_to_resource.find(rowset_id) == rowset_to_resource.end()) {
1224
0
            return Status::InternalError("vault id not found for tablet_id={}, rowset_id={}",
1225
0
                                         tablet->tablet_id(), rowset_id);
1226
0
        }
1227
0
        auto resource_id = rowset_to_resource[rowset_id];
1228
0
        CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
1229
0
        auto storage_resource = engine.get_storage_resource(resource_id);
1230
0
        if (!storage_resource) {
1231
0
            return Status::InternalError("vault id not found, maybe not sync, vault id {}",
1232
0
                                         resource_id);
1233
0
        }
1234
1235
        // Use packed file reader if packed_slice_location is present
1236
0
        std::unique_ptr<DeleteBitmapFileReader> reader;
1237
0
        if (storage.has_packed_slice_location() &&
1238
0
            !storage.packed_slice_location().packed_file_path().empty()) {
1239
0
            reader = std::make_unique<DeleteBitmapFileReader>(tablet->tablet_id(), rowset_id,
1240
0
                                                              storage_resource,
1241
0
                                                              storage.packed_slice_location());
1242
0
        } else {
1243
0
            reader = std::make_unique<DeleteBitmapFileReader>(tablet->tablet_id(), rowset_id,
1244
0
                                                              storage_resource);
1245
0
        }
1246
1247
0
        RETURN_IF_ERROR(reader->init());
1248
0
        DeleteBitmapPB dbm;
1249
0
        RETURN_IF_ERROR(reader->read(dbm));
1250
0
        RETURN_IF_ERROR(reader->close());
1251
0
        return merge_delete_bitmap(rowset_id, dbm);
1252
0
    };
1253
18.4E
    CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
1254
18.4E
    std::unique_ptr<ThreadPoolToken> token = engine.sync_delete_bitmap_thread_pool().new_token(
1255
18.4E
            ThreadPool::ExecutionMode::CONCURRENT);
1256
18.4E
    bthread::CountdownEvent wait {rowset_ids.size()};
1257
18.4E
    for (int i = 0; i < rowset_ids.size(); i++) {
1258
0
        auto& rowset_id = rowset_ids[i];
1259
0
        if (delete_bitmap_storages[i].store_in_fdb()) {
1260
0
            wait.signal();
1261
0
            DeleteBitmapPB dbm = delete_bitmap_storages[i].delete_bitmap();
1262
0
            RETURN_IF_ERROR(merge_delete_bitmap(rowset_id, dbm));
1263
0
        } else {
1264
0
            const auto& storage = delete_bitmap_storages[i];
1265
0
            auto submit_st = token->submit_func([&, rowset_id, storage]() {
1266
0
                auto status = get_delete_bitmap_from_file(rowset_id, storage);
1267
0
                if (!status.ok()) {
1268
0
                    LOG(WARNING) << "failed to get delete bitmap for tablet_id="
1269
0
                                 << tablet->tablet_id() << ", rowset_id=" << rowset_id
1270
0
                                 << " from file, st=" << status.to_string();
1271
0
                    std::lock_guard lock(result_mtx);
1272
0
                    if (result.ok()) {
1273
0
                        result = status;
1274
0
                    }
1275
0
                }
1276
0
                wait.signal();
1277
0
            });
1278
0
            RETURN_IF_ERROR(submit_st);
1279
0
        }
1280
0
    }
1281
    // wait for all finished
1282
18.4E
    wait.wait();
1283
18.4E
    token->wait();
1284
18.4E
    return result;
1285
18.4E
}
1286
1287
Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
1288
202k
                                    RowsetMetaSharedPtr* existed_rs_meta) {
1289
18.4E
    VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id()
1290
18.4E
               << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
1291
202k
    {
1292
202k
        Status ret_st;
1293
202k
        TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_rowset", ret_st);
1294
202k
    }
1295
202k
    CreateRowsetRequest req;
1296
202k
    CreateRowsetResponse resp;
1297
202k
    req.set_cloud_unique_id(config::cloud_unique_id);
1298
202k
    req.set_txn_id(rs_meta.txn_id());
1299
202k
    req.set_tablet_job_id(job_id);
1300
1301
202k
    RowsetMetaPB doris_rs_meta = rs_meta.get_rowset_pb(/*skip_schema=*/true);
1302
202k
    doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(doris_rs_meta));
1303
1304
202k
    Status st = retry_rpc("prepare rowset", req, &resp, &MetaService_Stub::prepare_rowset);
1305
202k
    if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) {
1306
0
        if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) {
1307
0
            RowsetMetaPB doris_rs_meta_tmp =
1308
0
                    cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta()));
1309
0
            *existed_rs_meta = std::make_shared<RowsetMeta>();
1310
0
            (*existed_rs_meta)->init_from_pb(doris_rs_meta_tmp);
1311
0
        }
1312
0
        return Status::AlreadyExist("failed to prepare rowset: {}", resp.status().msg());
1313
0
    }
1314
202k
    return st;
1315
202k
}
1316
1317
Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, const std::string& job_id,
1318
202k
                                   RowsetMetaSharedPtr* existed_rs_meta) {
1319
202k
    VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
1320
3
               << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
1321
202k
    {
1322
202k
        Status ret_st;
1323
202k
        TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_rowset", ret_st);
1324
202k
    }
1325
202k
    check_table_size_correctness(rs_meta);
1326
202k
    CreateRowsetRequest req;
1327
202k
    CreateRowsetResponse resp;
1328
202k
    req.set_cloud_unique_id(config::cloud_unique_id);
1329
202k
    req.set_txn_id(rs_meta.txn_id());
1330
202k
    req.set_tablet_job_id(job_id);
1331
1332
202k
    RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb();
1333
202k
    doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb));
1334
202k
    Status st = retry_rpc("commit rowset", req, &resp, &MetaService_Stub::commit_rowset);
1335
202k
    if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) {
1336
0
        if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) {
1337
0
            RowsetMetaPB doris_rs_meta =
1338
0
                    cloud_rowset_meta_to_doris(std::move(*resp.mutable_existed_rowset_meta()));
1339
0
            *existed_rs_meta = std::make_shared<RowsetMeta>();
1340
0
            (*existed_rs_meta)->init_from_pb(doris_rs_meta);
1341
0
        }
1342
0
        return Status::AlreadyExist("failed to commit rowset: {}", resp.status().msg());
1343
0
    }
1344
202k
    int64_t timeout_ms = -1;
1345
    // if the `job_id` is not empty, it means this rowset was produced by a compaction job.
1346
202k
    if (config::enable_compaction_delay_commit_for_warm_up && !job_id.empty()) {
1347
        // 1. assume the download speed is 100MB/s
1348
        // 2. we double the download time as timeout for safety
1349
        // 3. for small rowsets, the timeout we calculate maybe quite small, so we need a min_time_out
1350
0
        const double speed_mbps = 100.0; // 100MB/s
1351
0
        const double safety_factor = 2.0;
1352
0
        timeout_ms = std::min(
1353
0
                std::max(static_cast<int64_t>(static_cast<double>(rs_meta.data_disk_size()) /
1354
0
                                              (speed_mbps * 1024 * 1024) * safety_factor * 1000),
1355
0
                         config::warm_up_rowset_sync_wait_min_timeout_ms),
1356
0
                config::warm_up_rowset_sync_wait_max_timeout_ms);
1357
0
        LOG(INFO) << "warm up rowset: " << rs_meta.version() << ", job_id: " << job_id
1358
0
                  << ", with timeout: " << timeout_ms << " ms";
1359
0
    }
1360
202k
    auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
1361
202k
    manager.warm_up_rowset(rs_meta, timeout_ms);
1362
202k
    return st;
1363
202k
}
1364
1365
19
Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) {
1366
19
    VLOG_DEBUG << "update committed rowset, tablet_id: " << rs_meta.tablet_id()
1367
0
               << ", rowset_id: " << rs_meta.rowset_id();
1368
19
    CreateRowsetRequest req;
1369
19
    CreateRowsetResponse resp;
1370
19
    req.set_cloud_unique_id(config::cloud_unique_id);
1371
1372
    // Variant schema maybe updated, so we need to update the schema as well.
1373
    // The updated rowset meta after `rowset->merge_rowset_meta` in `BaseTablet::update_delete_bitmap`
1374
    // will be lost in `update_tmp_rowset` if skip_schema.So in order to keep the latest schema we should keep schema in update_tmp_rowset
1375
    // for variant type
1376
19
    bool skip_schema = rs_meta.tablet_schema()->num_variant_columns() == 0;
1377
19
    RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(skip_schema);
1378
19
    doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb));
1379
19
    Status st =
1380
19
            retry_rpc("update committed rowset", req, &resp, &MetaService_Stub::update_tmp_rowset);
1381
19
    if (!st.ok() && resp.status().code() == MetaServiceCode::ROWSET_META_NOT_FOUND) {
1382
0
        return Status::InternalError("failed to update committed rowset: {}", resp.status().msg());
1383
0
    }
1384
19
    return st;
1385
19
}
1386
1387
// async send TableStats(in res) to FE coz we are in streamload ctx, response to the user ASAP
1388
static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id,
1389
0
                                   const std::string& label, CommitTxnResponse& res) {
1390
0
    std::string protobufBytes;
1391
0
    res.SerializeToString(&protobufBytes);
1392
0
    auto st = ExecEnv::GetInstance()->send_table_stats_thread_pool()->submit_func(
1393
0
            [db_id, txn_id, label, protobufBytes]() -> Status {
1394
0
                TReportCommitTxnResultRequest request;
1395
0
                TStatus result;
1396
1397
0
                if (protobufBytes.length() <= 0) {
1398
0
                    LOG(WARNING) << "protobufBytes: " << protobufBytes.length();
1399
0
                    return Status::OK(); // nobody cares the return status
1400
0
                }
1401
1402
0
                request.__set_dbId(db_id);
1403
0
                request.__set_txnId(txn_id);
1404
0
                request.__set_label(label);
1405
0
                request.__set_payload(protobufBytes);
1406
1407
0
                Status status;
1408
0
                int64_t duration_ns = 0;
1409
0
                TNetworkAddress master_addr =
1410
0
                        ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
1411
0
                if (master_addr.hostname.empty() || master_addr.port == 0) {
1412
0
                    status = Status::Error<SERVICE_UNAVAILABLE>(
1413
0
                            "Have not get FE Master heartbeat yet");
1414
0
                } else {
1415
0
                    SCOPED_RAW_TIMER(&duration_ns);
1416
1417
0
                    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
1418
0
                            master_addr.hostname, master_addr.port,
1419
0
                            [&request, &result](FrontendServiceConnection& client) {
1420
0
                                client->reportCommitTxnResult(result, request);
1421
0
                            }));
1422
1423
0
                    status = Status::create<false>(result);
1424
0
                }
1425
0
                g_cloud_commit_txn_resp_redirect_latency << duration_ns / 1000;
1426
1427
0
                if (!status.ok()) {
1428
0
                    LOG(WARNING) << "TableStats report RPC to FE failed, errmsg=" << status
1429
0
                                 << " dbId=" << db_id << " txnId=" << txn_id << " label=" << label;
1430
0
                    return Status::OK(); // nobody cares the return status
1431
0
                } else {
1432
0
                    LOG(INFO) << "TableStats report RPC to FE success, msg=" << status
1433
0
                              << " dbId=" << db_id << " txnId=" << txn_id << " label=" << label;
1434
0
                    return Status::OK();
1435
0
                }
1436
0
            });
1437
0
    if (!st.ok()) {
1438
0
        LOG(WARNING) << "TableStats report to FE task submission failed: " << st.to_string();
1439
0
    }
1440
0
}
1441
1442
0
Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) {
1443
0
    VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
1444
0
               << ", label: " << ctx.label << ", is_2pc: " << is_2pc;
1445
0
    {
1446
0
        Status ret_st;
1447
0
        TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_txn", ret_st);
1448
0
    }
1449
0
    CommitTxnRequest req;
1450
0
    CommitTxnResponse res;
1451
0
    req.set_cloud_unique_id(config::cloud_unique_id);
1452
0
    req.set_db_id(ctx.db_id);
1453
0
    req.set_txn_id(ctx.txn_id);
1454
0
    req.set_is_2pc(is_2pc);
1455
0
    req.set_enable_txn_lazy_commit(config::enable_cloud_txn_lazy_commit);
1456
0
    auto st = retry_rpc("commit txn", req, &res, &MetaService_Stub::commit_txn);
1457
1458
0
    if (st.ok()) {
1459
0
        send_stats_to_fe_async(ctx.db_id, ctx.txn_id, ctx.label, res);
1460
0
    }
1461
1462
0
    return st;
1463
0
}
1464
1465
487
Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) {
1466
487
    VLOG_DEBUG << "abort txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
1467
0
               << ", label: " << ctx.label;
1468
487
    {
1469
487
        Status ret_st;
1470
487
        TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::abort_txn", ret_st);
1471
487
    }
1472
487
    AbortTxnRequest req;
1473
487
    AbortTxnResponse res;
1474
487
    req.set_cloud_unique_id(config::cloud_unique_id);
1475
487
    req.set_reason(std::string(ctx.status.msg().substr(0, 1024)));
1476
487
    if (ctx.db_id > 0 && !ctx.label.empty()) {
1477
479
        req.set_db_id(ctx.db_id);
1478
479
        req.set_label(ctx.label);
1479
479
    } else if (ctx.txn_id > 0) {
1480
8
        req.set_txn_id(ctx.txn_id);
1481
8
    } else {
1482
0
        LOG(WARNING) << "failed abort txn, with illegal input, db_id=" << ctx.db_id
1483
0
                     << " txn_id=" << ctx.txn_id << " label=" << ctx.label;
1484
0
        return Status::InternalError<false>("failed to abort txn");
1485
0
    }
1486
487
    return retry_rpc("abort txn", req, &res, &MetaService_Stub::abort_txn);
1487
487
}
1488
1489
33
Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) {
1490
33
    VLOG_DEBUG << "precommit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
1491
0
               << ", label: " << ctx.label;
1492
33
    {
1493
33
        Status ret_st;
1494
33
        TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::precommit_txn", ret_st);
1495
33
    }
1496
33
    PrecommitTxnRequest req;
1497
33
    PrecommitTxnResponse res;
1498
33
    req.set_cloud_unique_id(config::cloud_unique_id);
1499
33
    req.set_db_id(ctx.db_id);
1500
33
    req.set_txn_id(ctx.txn_id);
1501
33
    return retry_rpc("precommit txn", req, &res, &MetaService_Stub::precommit_txn);
1502
33
}
1503
1504
0
Status CloudMetaMgr::prepare_restore_job(const TabletMetaPB& tablet_meta) {
1505
0
    VLOG_DEBUG << "prepare restore job, tablet_id: " << tablet_meta.tablet_id();
1506
0
    RestoreJobRequest req;
1507
0
    RestoreJobResponse resp;
1508
0
    req.set_cloud_unique_id(config::cloud_unique_id);
1509
0
    req.set_tablet_id(tablet_meta.tablet_id());
1510
0
    req.set_expiration(config::snapshot_expire_time_sec);
1511
0
    req.set_action(RestoreJobRequest::PREPARE);
1512
1513
0
    doris_tablet_meta_to_cloud(req.mutable_tablet_meta(), std::move(tablet_meta));
1514
0
    return retry_rpc("prepare restore job", req, &resp, &MetaService_Stub::prepare_restore_job);
1515
0
}
1516
1517
0
Status CloudMetaMgr::commit_restore_job(const int64_t tablet_id) {
1518
0
    VLOG_DEBUG << "commit restore job, tablet_id: " << tablet_id;
1519
0
    RestoreJobRequest req;
1520
0
    RestoreJobResponse resp;
1521
0
    req.set_cloud_unique_id(config::cloud_unique_id);
1522
0
    req.set_tablet_id(tablet_id);
1523
0
    req.set_action(RestoreJobRequest::COMMIT);
1524
0
    req.set_store_version(config::delete_bitmap_store_write_version);
1525
1526
0
    return retry_rpc("commit restore job", req, &resp, &MetaService_Stub::commit_restore_job);
1527
0
}
1528
1529
0
Status CloudMetaMgr::finish_restore_job(const int64_t tablet_id, bool is_completed) {
1530
0
    VLOG_DEBUG << "finish restore job, tablet_id: " << tablet_id
1531
0
               << ", is_completed: " << is_completed;
1532
0
    RestoreJobRequest req;
1533
0
    RestoreJobResponse resp;
1534
0
    req.set_cloud_unique_id(config::cloud_unique_id);
1535
0
    req.set_tablet_id(tablet_id);
1536
0
    req.set_action(is_completed ? RestoreJobRequest::COMPLETE : RestoreJobRequest::ABORT);
1537
1538
0
    return retry_rpc("finish restore job", req, &resp, &MetaService_Stub::finish_restore_job);
1539
0
}
1540
1541
126
Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos, bool* is_vault_mode) {
1542
126
    GetObjStoreInfoRequest req;
1543
126
    GetObjStoreInfoResponse resp;
1544
126
    req.set_cloud_unique_id(config::cloud_unique_id);
1545
126
    Status s =
1546
126
            retry_rpc("get storage vault info", req, &resp, &MetaService_Stub::get_obj_store_info);
1547
126
    if (!s.ok()) {
1548
0
        return s;
1549
0
    }
1550
1551
126
    *is_vault_mode = resp.enable_storage_vault();
1552
1553
126
    auto add_obj_store = [&vault_infos](const auto& obj_store) {
1554
126
        vault_infos->emplace_back(obj_store.id(), S3Conf::get_s3_conf(obj_store),
1555
126
                                  StorageVaultPB_PathFormat {});
1556
126
    };
1557
1558
126
    std::ranges::for_each(resp.obj_info(), add_obj_store);
1559
126
    std::ranges::for_each(resp.storage_vault(), [&](const auto& vault) {
1560
0
        if (vault.has_hdfs_info()) {
1561
0
            vault_infos->emplace_back(vault.id(), vault.hdfs_info(), vault.path_format());
1562
0
        }
1563
0
        if (vault.has_obj_info()) {
1564
0
            add_obj_store(vault.obj_info());
1565
0
        }
1566
0
    });
1567
1568
    // desensitization, hide secret
1569
252
    for (int i = 0; i < resp.obj_info_size(); ++i) {
1570
126
        resp.mutable_obj_info(i)->set_sk(resp.obj_info(i).sk().substr(0, 2) + "xxx");
1571
126
    }
1572
126
    for (int i = 0; i < resp.storage_vault_size(); ++i) {
1573
0
        auto* j = resp.mutable_storage_vault(i);
1574
0
        if (!j->has_obj_info()) continue;
1575
0
        j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx");
1576
0
    }
1577
1578
252
    for (int i = 0; i < resp.obj_info_size(); ++i) {
1579
126
        resp.mutable_obj_info(i)->set_ak(hide_access_key(resp.obj_info(i).sk()));
1580
126
    }
1581
126
    for (int i = 0; i < resp.storage_vault_size(); ++i) {
1582
0
        auto* j = resp.mutable_storage_vault(i);
1583
0
        if (!j->has_obj_info()) continue;
1584
0
        j->mutable_obj_info()->set_sk(hide_access_key(j->obj_info().sk()));
1585
0
    }
1586
1587
126
    LOG(INFO) << "get storage vault, enable_storage_vault=" << *is_vault_mode
1588
126
              << " response=" << resp.ShortDebugString();
1589
126
    return Status::OK();
1590
126
}
1591
1592
23.2k
Status CloudMetaMgr::prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res) {
1593
23.2k
    VLOG_DEBUG << "prepare_tablet_job: " << job.ShortDebugString();
1594
23.2k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_tablet_job", Status::OK(), job, res);
1595
1596
23.2k
    StartTabletJobRequest req;
1597
23.2k
    req.mutable_job()->CopyFrom(job);
1598
23.2k
    req.set_cloud_unique_id(config::cloud_unique_id);
1599
23.2k
    return retry_rpc("start tablet job", req, res, &MetaService_Stub::start_tablet_job);
1600
23.2k
}
1601
1602
21.5k
Status CloudMetaMgr::commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJobResponse* res) {
1603
18.4E
    VLOG_DEBUG << "commit_tablet_job: " << job.ShortDebugString();
1604
21.5k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_tablet_job", Status::OK(), job, res);
1605
21.5k
    DBUG_EXECUTE_IF("CloudMetaMgr::commit_tablet_job.fail", {
1606
21.5k
        return Status::InternalError<false>("inject CloudMetaMgr::commit_tablet_job.fail");
1607
21.5k
    });
1608
1609
21.5k
    FinishTabletJobRequest req;
1610
21.5k
    req.mutable_job()->CopyFrom(job);
1611
21.5k
    req.set_action(FinishTabletJobRequest::COMMIT);
1612
21.5k
    req.set_cloud_unique_id(config::cloud_unique_id);
1613
21.5k
    auto st = retry_rpc("commit tablet job", req, res, &MetaService_Stub::finish_tablet_job);
1614
21.5k
    if (res->status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) {
1615
0
        return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
1616
0
                "txn conflict when commit tablet job {}", job.ShortDebugString());
1617
0
    }
1618
21.5k
    return st;
1619
21.5k
}
1620
1621
106
Status CloudMetaMgr::abort_tablet_job(const TabletJobInfoPB& job) {
1622
106
    VLOG_DEBUG << "abort_tablet_job: " << job.ShortDebugString();
1623
106
    FinishTabletJobRequest req;
1624
106
    FinishTabletJobResponse res;
1625
106
    req.mutable_job()->CopyFrom(job);
1626
106
    req.set_action(FinishTabletJobRequest::ABORT);
1627
106
    req.set_cloud_unique_id(config::cloud_unique_id);
1628
106
    return retry_rpc("abort tablet job", req, &res, &MetaService_Stub::finish_tablet_job);
1629
106
}
1630
1631
208
Status CloudMetaMgr::lease_tablet_job(const TabletJobInfoPB& job) {
1632
208
    VLOG_DEBUG << "lease_tablet_job: " << job.ShortDebugString();
1633
208
    FinishTabletJobRequest req;
1634
208
    FinishTabletJobResponse res;
1635
208
    req.mutable_job()->CopyFrom(job);
1636
208
    req.set_action(FinishTabletJobRequest::LEASE);
1637
208
    req.set_cloud_unique_id(config::cloud_unique_id);
1638
208
    return retry_rpc("lease tablet job", req, &res, &MetaService_Stub::finish_tablet_job);
1639
208
}
1640
1641
static void add_delete_bitmap(DeleteBitmapPB& delete_bitmap_pb, const DeleteBitmap::BitmapKey& key,
1642
0
                              roaring::Roaring& bitmap) {
1643
0
    delete_bitmap_pb.add_rowset_ids(std::get<0>(key).to_string());
1644
0
    delete_bitmap_pb.add_segment_ids(std::get<1>(key));
1645
0
    delete_bitmap_pb.add_versions(std::get<2>(key));
1646
    // To save space, convert array and bitmap containers to run containers
1647
0
    bitmap.runOptimize();
1648
0
    std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
1649
0
    bitmap.write(bitmap_data.data());
1650
0
    *(delete_bitmap_pb.add_segment_delete_bitmaps()) = std::move(bitmap_data);
1651
0
}
1652
1653
static Status store_delete_bitmap(std::string& rowset_id, DeleteBitmapPB& delete_bitmap_pb,
1654
                                  int64_t tablet_id,
1655
                                  std::optional<StorageResource> storage_resource,
1656
0
                                  UpdateDeleteBitmapRequest& req, int64_t txn_id) {
1657
0
    if (config::enable_mow_verbose_log) {
1658
0
        std::stringstream ss;
1659
0
        for (int i = 0; i < delete_bitmap_pb.rowset_ids_size(); i++) {
1660
0
            ss << "{rid=" << delete_bitmap_pb.rowset_ids(i)
1661
0
               << ", sid=" << delete_bitmap_pb.segment_ids(i)
1662
0
               << ", ver=" << delete_bitmap_pb.versions(i) << "}, ";
1663
0
        }
1664
0
        LOG(INFO) << "handle one rowset delete bitmap for tablet_id: " << tablet_id
1665
0
                  << ", rowset_id: " << rowset_id
1666
0
                  << ", delete_bitmap num: " << delete_bitmap_pb.rowset_ids_size()
1667
0
                  << ",  size: " << delete_bitmap_pb.ByteSizeLong() << ", keys=[" << ss.str()
1668
0
                  << "]";
1669
0
    }
1670
0
    if (delete_bitmap_pb.rowset_ids_size() == 0) {
1671
0
        return Status::OK();
1672
0
    }
1673
0
    DeleteBitmapStoragePB delete_bitmap_storage;
1674
0
    if (config::delete_bitmap_store_v2_max_bytes_in_fdb >= 0 &&
1675
0
        delete_bitmap_pb.ByteSizeLong() > config::delete_bitmap_store_v2_max_bytes_in_fdb) {
1676
        // Enable packed file only for load (txn_id > 0)
1677
0
        bool enable_packed = config::enable_packed_file && txn_id > 0;
1678
0
        DeleteBitmapFileWriter file_writer(tablet_id, rowset_id, storage_resource, enable_packed,
1679
0
                                           txn_id);
1680
0
        RETURN_IF_ERROR(file_writer.init());
1681
0
        RETURN_IF_ERROR(file_writer.write(delete_bitmap_pb));
1682
0
        RETURN_IF_ERROR(file_writer.close());
1683
0
        delete_bitmap_pb.Clear();
1684
0
        delete_bitmap_storage.set_store_in_fdb(false);
1685
1686
        // Store packed slice location if file was written to packed file
1687
0
        if (file_writer.is_packed()) {
1688
0
            io::PackedSliceLocation loc;
1689
0
            RETURN_IF_ERROR(file_writer.get_packed_slice_location(&loc));
1690
0
            auto* packed_loc = delete_bitmap_storage.mutable_packed_slice_location();
1691
0
            packed_loc->set_packed_file_path(loc.packed_file_path);
1692
0
            packed_loc->set_offset(loc.offset);
1693
0
            packed_loc->set_size(loc.size);
1694
0
            packed_loc->set_packed_file_size(loc.packed_file_size);
1695
0
        }
1696
0
    } else {
1697
0
        delete_bitmap_storage.set_store_in_fdb(true);
1698
0
        *(delete_bitmap_storage.mutable_delete_bitmap()) = std::move(delete_bitmap_pb);
1699
0
    }
1700
0
    req.add_delta_rowset_ids(rowset_id);
1701
0
    *(req.add_delete_bitmap_storages()) = std::move(delete_bitmap_storage);
1702
0
    return Status::OK();
1703
0
}
1704
1705
Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id,
1706
                                          int64_t initiator, DeleteBitmap* delete_bitmap,
1707
                                          DeleteBitmap* delete_bitmap_v2, std::string rowset_id,
1708
                                          std::optional<StorageResource> storage_resource,
1709
                                          int64_t store_version, int64_t txn_id,
1710
59.8k
                                          bool is_explicit_txn, int64_t next_visible_version) {
1711
59.8k
    VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet.tablet_id();
1712
59.8k
    if (config::enable_mow_verbose_log) {
1713
0
        std::stringstream ss;
1714
0
        ss << "start update delete bitmap for tablet_id: " << tablet.tablet_id()
1715
0
           << ", rowset_id: " << rowset_id
1716
0
           << ", delete_bitmap num: " << delete_bitmap->delete_bitmap.size()
1717
0
           << ", store_version: " << store_version << ", lock_id=" << lock_id
1718
0
           << ", initiator=" << initiator;
1719
0
        if (store_version == 2 || store_version == 3) {
1720
0
            ss << ", delete_bitmap v2 num: " << delete_bitmap_v2->delete_bitmap.size();
1721
0
        }
1722
0
        LOG(INFO) << ss.str();
1723
0
    }
1724
59.8k
    UpdateDeleteBitmapRequest req;
1725
59.8k
    UpdateDeleteBitmapResponse res;
1726
59.8k
    req.set_cloud_unique_id(config::cloud_unique_id);
1727
59.8k
    req.set_table_id(tablet.table_id());
1728
59.8k
    req.set_partition_id(tablet.partition_id());
1729
59.8k
    req.set_tablet_id(tablet.tablet_id());
1730
59.8k
    req.set_lock_id(lock_id);
1731
59.8k
    req.set_initiator(initiator);
1732
59.8k
    req.set_is_explicit_txn(is_explicit_txn);
1733
59.8k
    if (txn_id > 0) {
1734
54.0k
        req.set_txn_id(txn_id);
1735
54.0k
    }
1736
59.8k
    if (next_visible_version > 0) {
1737
54.1k
        req.set_next_visible_version(next_visible_version);
1738
54.1k
    }
1739
59.8k
    req.set_store_version(store_version);
1740
1741
59.8k
    bool write_v1 = store_version == 1 || store_version == 3;
1742
59.8k
    bool write_v2 = store_version == 2 || store_version == 3;
1743
    // write v1 kvs
1744
59.8k
    if (write_v1) {
1745
59.5k
        for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
1746
10.9k
            req.add_rowset_ids(std::get<0>(key).to_string());
1747
10.9k
            req.add_segment_ids(std::get<1>(key));
1748
10.9k
            req.add_versions(std::get<2>(key));
1749
            // To save space, convert array and bitmap containers to run containers
1750
10.9k
            bitmap.runOptimize();
1751
10.9k
            std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
1752
10.9k
            bitmap.write(bitmap_data.data());
1753
10.9k
            *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
1754
10.9k
        }
1755
59.5k
    }
1756
1757
    // write v2 kvs
1758
59.8k
    if (write_v2) {
1759
0
        if (config::enable_mow_verbose_log) {
1760
0
            LOG(INFO) << "update delete bitmap for tablet_id: " << tablet.tablet_id()
1761
0
                      << ", rowset_id: " << rowset_id
1762
0
                      << ", delete_bitmap num: " << delete_bitmap_v2->delete_bitmap.size()
1763
0
                      << ", lock_id=" << lock_id << ", initiator=" << initiator;
1764
0
        }
1765
0
        if (rowset_id.empty()) {
1766
0
            std::string pre_rowset_id = "";
1767
0
            std::string cur_rowset_id = "";
1768
0
            DeleteBitmapPB delete_bitmap_pb;
1769
0
            for (auto it = delete_bitmap_v2->delete_bitmap.begin();
1770
0
                 it != delete_bitmap_v2->delete_bitmap.end(); ++it) {
1771
0
                auto& key = it->first;
1772
0
                auto& bitmap = it->second;
1773
0
                cur_rowset_id = std::get<0>(key).to_string();
1774
0
                if (cur_rowset_id != pre_rowset_id) {
1775
0
                    if (!pre_rowset_id.empty() && delete_bitmap_pb.rowset_ids_size() > 0) {
1776
0
                        RETURN_IF_ERROR(store_delete_bitmap(pre_rowset_id, delete_bitmap_pb,
1777
0
                                                            tablet.tablet_id(), storage_resource,
1778
0
                                                            req, txn_id));
1779
0
                    }
1780
0
                    pre_rowset_id = cur_rowset_id;
1781
0
                    DCHECK_EQ(delete_bitmap_pb.rowset_ids_size(), 0);
1782
0
                    DCHECK_EQ(delete_bitmap_pb.segment_ids_size(), 0);
1783
0
                    DCHECK_EQ(delete_bitmap_pb.versions_size(), 0);
1784
0
                    DCHECK_EQ(delete_bitmap_pb.segment_delete_bitmaps_size(), 0);
1785
0
                }
1786
0
                add_delete_bitmap(delete_bitmap_pb, key, bitmap);
1787
0
            }
1788
0
            if (delete_bitmap_pb.rowset_ids_size() > 0) {
1789
0
                DCHECK(!cur_rowset_id.empty());
1790
0
                RETURN_IF_ERROR(store_delete_bitmap(cur_rowset_id, delete_bitmap_pb,
1791
0
                                                    tablet.tablet_id(), storage_resource, req,
1792
0
                                                    txn_id));
1793
0
            }
1794
0
        } else {
1795
0
            DeleteBitmapPB delete_bitmap_pb;
1796
0
            for (auto& [key, bitmap] : delete_bitmap_v2->delete_bitmap) {
1797
0
                add_delete_bitmap(delete_bitmap_pb, key, bitmap);
1798
0
            }
1799
0
            RETURN_IF_ERROR(store_delete_bitmap(rowset_id, delete_bitmap_pb, tablet.tablet_id(),
1800
0
                                                storage_resource, req, txn_id));
1801
0
        }
1802
0
        DCHECK_EQ(req.delta_rowset_ids_size(), req.delete_bitmap_storages_size());
1803
0
    }
1804
59.8k
    DBUG_EXECUTE_IF("CloudMetaMgr::test_update_big_delete_bitmap", {
1805
59.8k
        LOG(INFO) << "test_update_big_delete_bitmap for tablet " << tablet.tablet_id();
1806
59.8k
        auto count = dp->param<int>("count", 30000);
1807
59.8k
        if (!delete_bitmap->delete_bitmap.empty()) {
1808
59.8k
            auto& key = delete_bitmap->delete_bitmap.begin()->first;
1809
59.8k
            auto& bitmap = delete_bitmap->delete_bitmap.begin()->second;
1810
59.8k
            for (int i = 1000; i < (1000 + count); i++) {
1811
59.8k
                req.add_rowset_ids(std::get<0>(key).to_string());
1812
59.8k
                req.add_segment_ids(std::get<1>(key));
1813
59.8k
                req.add_versions(i);
1814
                // To save space, convert array and bitmap containers to run containers
1815
59.8k
                bitmap.runOptimize();
1816
59.8k
                std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
1817
59.8k
                bitmap.write(bitmap_data.data());
1818
59.8k
                *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
1819
59.8k
            }
1820
59.8k
        }
1821
59.8k
    });
1822
59.8k
    DBUG_EXECUTE_IF("CloudMetaMgr::test_update_delete_bitmap_fail", {
1823
59.8k
        return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
1824
59.8k
                "test update delete bitmap failed, tablet_id: {}, lock_id: {}", tablet.tablet_id(),
1825
59.8k
                lock_id);
1826
59.8k
    });
1827
59.8k
    auto st = retry_rpc("update delete bitmap", req, &res, &MetaService_Stub::update_delete_bitmap);
1828
59.8k
    if (config::enable_update_delete_bitmap_kv_check_core &&
1829
59.8k
        res.status().code() == MetaServiceCode::UPDATE_OVERRIDE_EXISTING_KV) {
1830
0
        auto& msg = res.status().msg();
1831
0
        LOG_WARNING(msg);
1832
0
        CHECK(false) << msg;
1833
0
    }
1834
59.8k
    if (res.status().code() == MetaServiceCode::LOCK_EXPIRED) {
1835
25
        return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
1836
25
                "lock expired when update delete bitmap, tablet_id: {}, lock_id: {}, initiator: "
1837
25
                "{}, error_msg: {}",
1838
25
                tablet.tablet_id(), lock_id, initiator, res.status().msg());
1839
25
    }
1840
59.8k
    return st;
1841
59.8k
}
1842
1843
Status CloudMetaMgr::cloud_update_delete_bitmap_without_lock(
1844
        const CloudTablet& tablet, DeleteBitmap* delete_bitmap,
1845
        std::map<std::string, int64_t>& rowset_to_versions, int64_t pre_rowset_agg_start_version,
1846
3.91k
        int64_t pre_rowset_agg_end_version) {
1847
3.91k
    if (config::delete_bitmap_store_write_version == 2) {
1848
0
        VLOG_DEBUG << "no need to agg delete bitmap v1 in ms because use v2";
1849
0
        return Status::OK();
1850
0
    }
1851
3.91k
    LOG(INFO) << "cloud_update_delete_bitmap_without_lock, tablet_id: " << tablet.tablet_id()
1852
3.91k
              << ", delete_bitmap size: " << delete_bitmap->delete_bitmap.size();
1853
3.91k
    UpdateDeleteBitmapRequest req;
1854
3.91k
    UpdateDeleteBitmapResponse res;
1855
3.91k
    req.set_cloud_unique_id(config::cloud_unique_id);
1856
3.91k
    req.set_table_id(tablet.table_id());
1857
3.91k
    req.set_partition_id(tablet.partition_id());
1858
3.91k
    req.set_tablet_id(tablet.tablet_id());
1859
    // use a fake lock id to resolve compatibility issues
1860
3.91k
    req.set_lock_id(-3);
1861
3.91k
    req.set_without_lock(true);
1862
3.91k
    for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
1863
1.22k
        req.add_rowset_ids(std::get<0>(key).to_string());
1864
1.22k
        req.add_segment_ids(std::get<1>(key));
1865
1.22k
        req.add_versions(std::get<2>(key));
1866
1.22k
        if (pre_rowset_agg_end_version > 0) {
1867
1.22k
            DCHECK(rowset_to_versions.find(std::get<0>(key).to_string()) !=
1868
0
                   rowset_to_versions.end())
1869
0
                    << "rowset_to_versions not found for key=" << std::get<0>(key).to_string();
1870
1.22k
            req.add_pre_rowset_versions(rowset_to_versions[std::get<0>(key).to_string()]);
1871
1.22k
        }
1872
1.22k
        DCHECK(pre_rowset_agg_end_version <= 0 || pre_rowset_agg_end_version == std::get<2>(key))
1873
0
                << "pre_rowset_agg_end_version=" << pre_rowset_agg_end_version
1874
0
                << " not equal to version=" << std::get<2>(key);
1875
        // To save space, convert array and bitmap containers to run containers
1876
1.22k
        bitmap.runOptimize();
1877
1.22k
        std::string bitmap_data(bitmap.getSizeInBytes(), '\0');
1878
1.22k
        bitmap.write(bitmap_data.data());
1879
1.22k
        *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
1880
1.22k
    }
1881
3.91k
    if (pre_rowset_agg_start_version > 0 && pre_rowset_agg_end_version > 0) {
1882
3.91k
        req.set_pre_rowset_agg_start_version(pre_rowset_agg_start_version);
1883
3.91k
        req.set_pre_rowset_agg_end_version(pre_rowset_agg_end_version);
1884
3.91k
    }
1885
3.91k
    return retry_rpc("update delete bitmap", req, &res, &MetaService_Stub::update_delete_bitmap);
1886
3.91k
}
1887
1888
Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
1889
5.25k
                                                   int64_t initiator) {
1890
5.25k
    DBUG_EXECUTE_IF("get_delete_bitmap_update_lock.inject_fail", {
1891
5.25k
        auto p = dp->param("percent", 0.01);
1892
5.25k
        std::mt19937 gen {std::random_device {}()};
1893
5.25k
        std::bernoulli_distribution inject_fault {p};
1894
5.25k
        if (inject_fault(gen)) {
1895
5.25k
            return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
1896
5.25k
                    "injection error when get get_delete_bitmap_update_lock, "
1897
5.25k
                    "tablet_id={}, lock_id={}, initiator={}",
1898
5.25k
                    tablet.tablet_id(), lock_id, initiator);
1899
5.25k
        }
1900
5.25k
    });
1901
18.4E
    VLOG_DEBUG << "get_delete_bitmap_update_lock , tablet_id: " << tablet.tablet_id()
1902
18.4E
               << ",lock_id:" << lock_id;
1903
5.25k
    GetDeleteBitmapUpdateLockRequest req;
1904
5.25k
    GetDeleteBitmapUpdateLockResponse res;
1905
5.25k
    req.set_cloud_unique_id(config::cloud_unique_id);
1906
5.25k
    req.set_table_id(tablet.table_id());
1907
5.25k
    req.set_lock_id(lock_id);
1908
5.25k
    req.set_initiator(initiator);
1909
    // set expiration time for compaction and schema_change
1910
5.25k
    req.set_expiration(config::delete_bitmap_lock_expiration_seconds);
1911
5.25k
    int retry_times = 0;
1912
5.25k
    Status st;
1913
5.25k
    std::default_random_engine rng = make_random_engine();
1914
5.25k
    std::uniform_int_distribution<uint32_t> u(500, 2000);
1915
5.25k
    uint64_t backoff_sleep_time_ms {0};
1916
5.91k
    do {
1917
5.91k
        bool test_conflict = false;
1918
5.91k
        st = retry_rpc("get delete bitmap update lock", req, &res,
1919
5.91k
                       &MetaService_Stub::get_delete_bitmap_update_lock);
1920
5.91k
        DBUG_EXECUTE_IF("CloudMetaMgr::test_get_delete_bitmap_update_lock_conflict",
1921
5.91k
                        { test_conflict = true; });
1922
5.91k
        if (!test_conflict && res.status().code() != MetaServiceCode::LOCK_CONFLICT) {
1923
5.25k
            break;
1924
5.25k
        }
1925
1926
665
        uint32_t duration_ms = u(rng);
1927
665
        LOG(WARNING) << "get delete bitmap lock conflict. " << debug_info(req)
1928
665
                     << " retry_times=" << retry_times << " sleep=" << duration_ms
1929
665
                     << "ms : " << res.status().msg();
1930
665
        auto start = std::chrono::steady_clock::now();
1931
665
        bthread_usleep(duration_ms * 1000);
1932
665
        auto end = std::chrono::steady_clock::now();
1933
665
        backoff_sleep_time_ms += duration_cast<std::chrono::milliseconds>(end - start).count();
1934
665
    } while (++retry_times <= config::get_delete_bitmap_lock_max_retry_times);
1935
0
    g_cloud_be_mow_get_dbm_lock_backoff_sleep_time << backoff_sleep_time_ms;
1936
5.25k
    DBUG_EXECUTE_IF("CloudMetaMgr.get_delete_bitmap_update_lock.inject_sleep", {
1937
5.25k
        auto p = dp->param("percent", 0.01);
1938
        // 100s > Config.calculate_delete_bitmap_task_timeout_seconds = 60s
1939
5.25k
        auto sleep_time = dp->param("sleep", 15);
1940
5.25k
        std::mt19937 gen {std::random_device {}()};
1941
5.25k
        std::bernoulli_distribution inject_fault {p};
1942
5.25k
        if (inject_fault(gen)) {
1943
5.25k
            LOG_INFO("injection sleep for {} seconds, tablet_id={}", sleep_time,
1944
5.25k
                     tablet.tablet_id());
1945
5.25k
            std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
1946
5.25k
        }
1947
5.25k
    });
1948
5.25k
    if (res.status().code() == MetaServiceCode::KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) {
1949
0
        return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
1950
0
                "txn conflict when get delete bitmap update lock, table_id {}, lock_id {}, "
1951
0
                "initiator {}",
1952
0
                tablet.table_id(), lock_id, initiator);
1953
5.25k
    } else if (res.status().code() == MetaServiceCode::LOCK_CONFLICT) {
1954
0
        return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
1955
0
                "lock conflict when get delete bitmap update lock, table_id {}, lock_id {}, "
1956
0
                "initiator {}",
1957
0
                tablet.table_id(), lock_id, initiator);
1958
0
    }
1959
5.25k
    return st;
1960
5.25k
}
1961
1962
void CloudMetaMgr::remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id,
1963
23
                                                    int64_t initiator, int64_t tablet_id) {
1964
23
    LOG(INFO) << "remove_delete_bitmap_update_lock ,table_id: " << table_id
1965
23
              << ",lock_id:" << lock_id << ",initiator:" << initiator << ",tablet_id:" << tablet_id;
1966
23
    RemoveDeleteBitmapUpdateLockRequest req;
1967
23
    RemoveDeleteBitmapUpdateLockResponse res;
1968
23
    req.set_cloud_unique_id(config::cloud_unique_id);
1969
23
    req.set_table_id(table_id);
1970
23
    req.set_tablet_id(tablet_id);
1971
23
    req.set_lock_id(lock_id);
1972
23
    req.set_initiator(initiator);
1973
23
    auto st = retry_rpc("remove delete bitmap update lock", req, &res,
1974
23
                        &MetaService_Stub::remove_delete_bitmap_update_lock);
1975
23
    if (!st.ok()) {
1976
21
        LOG(WARNING) << "remove delete bitmap update lock fail,table_id=" << table_id
1977
21
                     << ",tablet_id=" << tablet_id << ",lock_id=" << lock_id
1978
21
                     << ",st=" << st.to_string();
1979
21
    }
1980
23
}
1981
1982
201k
void CloudMetaMgr::check_table_size_correctness(RowsetMeta& rs_meta) {
1983
202k
    if (!config::enable_table_size_correctness_check) {
1984
202k
        return;
1985
202k
    }
1986
18.4E
    int64_t total_segment_size = get_segment_file_size(rs_meta);
1987
18.4E
    int64_t total_inverted_index_size = get_inverted_index_file_size(rs_meta);
1988
18.4E
    if (rs_meta.data_disk_size() != total_segment_size ||
1989
18.4E
        rs_meta.index_disk_size() != total_inverted_index_size ||
1990
18.4E
        rs_meta.data_disk_size() + rs_meta.index_disk_size() != rs_meta.total_disk_size()) {
1991
0
        LOG(WARNING) << "[Cloud table table size check failed]:"
1992
0
                     << " tablet id: " << rs_meta.tablet_id()
1993
0
                     << ", rowset id:" << rs_meta.rowset_id()
1994
0
                     << ", rowset data disk size:" << rs_meta.data_disk_size()
1995
0
                     << ", rowset real data disk size:" << total_segment_size
1996
0
                     << ", rowset index disk size:" << rs_meta.index_disk_size()
1997
0
                     << ", rowset real index disk size:" << total_inverted_index_size
1998
0
                     << ", rowset total disk size:" << rs_meta.total_disk_size()
1999
0
                     << ", rowset segment path:"
2000
0
                     << StorageResource().remote_segment_path(rs_meta.tablet_id(),
2001
0
                                                              rs_meta.rowset_id().to_string(), 0);
2002
0
        DCHECK(false);
2003
0
    }
2004
18.4E
}
2005
2006
0
int64_t CloudMetaMgr::get_segment_file_size(RowsetMeta& rs_meta) {
2007
0
    int64_t total_segment_size = 0;
2008
0
    const auto fs = rs_meta.fs();
2009
0
    if (!fs) {
2010
0
        LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id();
2011
0
    }
2012
0
    for (int64_t seg_id = 0; seg_id < rs_meta.num_segments(); seg_id++) {
2013
0
        std::string segment_path = StorageResource().remote_segment_path(
2014
0
                rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);
2015
0
        int64_t segment_file_size = 0;
2016
0
        auto st = fs->file_size(segment_path, &segment_file_size);
2017
0
        if (!st.ok()) {
2018
0
            segment_file_size = 0;
2019
0
            if (st.is<NOT_FOUND>()) {
2020
0
                LOG(INFO) << "cloud table size correctness check get segment size 0 because "
2021
0
                             "file not exist! msg:"
2022
0
                          << st.msg() << ", segment path:" << segment_path;
2023
0
            } else {
2024
0
                LOG(WARNING) << "cloud table size correctness check get segment size failed! msg:"
2025
0
                             << st.msg() << ", segment path:" << segment_path;
2026
0
            }
2027
0
        }
2028
0
        total_segment_size += segment_file_size;
2029
0
    }
2030
0
    return total_segment_size;
2031
0
}
2032
2033
0
int64_t CloudMetaMgr::get_inverted_index_file_size(RowsetMeta& rs_meta) {
2034
0
    int64_t total_inverted_index_size = 0;
2035
0
    const auto fs = rs_meta.fs();
2036
0
    if (!fs) {
2037
0
        LOG(WARNING) << "get fs failed, resource_id={}" << rs_meta.resource_id();
2038
0
    }
2039
0
    if (rs_meta.tablet_schema()->get_inverted_index_storage_format() ==
2040
0
        InvertedIndexStorageFormatPB::V1) {
2041
0
        const auto& indices = rs_meta.tablet_schema()->inverted_indexes();
2042
0
        for (auto& index : indices) {
2043
0
            for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) {
2044
0
                std::string segment_path = StorageResource().remote_segment_path(
2045
0
                        rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);
2046
0
                int64_t file_size = 0;
2047
2048
0
                std::string inverted_index_file_path =
2049
0
                        InvertedIndexDescriptor::get_index_file_path_v1(
2050
0
                                InvertedIndexDescriptor::get_index_file_path_prefix(segment_path),
2051
0
                                index->index_id(), index->get_index_suffix());
2052
0
                auto st = fs->file_size(inverted_index_file_path, &file_size);
2053
0
                if (!st.ok()) {
2054
0
                    file_size = 0;
2055
0
                    if (st.is<NOT_FOUND>()) {
2056
0
                        LOG(INFO) << "cloud table size correctness check get inverted index v1 "
2057
0
                                     "0 because file not exist! msg:"
2058
0
                                  << st.msg()
2059
0
                                  << ", inverted index path:" << inverted_index_file_path;
2060
0
                    } else {
2061
0
                        LOG(WARNING)
2062
0
                                << "cloud table size correctness check get inverted index v1 "
2063
0
                                   "size failed! msg:"
2064
0
                                << st.msg() << ", inverted index path:" << inverted_index_file_path;
2065
0
                    }
2066
0
                }
2067
0
                total_inverted_index_size += file_size;
2068
0
            }
2069
0
        }
2070
0
    } else {
2071
0
        for (int seg_id = 0; seg_id < rs_meta.num_segments(); ++seg_id) {
2072
0
            int64_t file_size = 0;
2073
0
            std::string segment_path = StorageResource().remote_segment_path(
2074
0
                    rs_meta.tablet_id(), rs_meta.rowset_id().to_string(), seg_id);
2075
2076
0
            std::string inverted_index_file_path = InvertedIndexDescriptor::get_index_file_path_v2(
2077
0
                    InvertedIndexDescriptor::get_index_file_path_prefix(segment_path));
2078
0
            auto st = fs->file_size(inverted_index_file_path, &file_size);
2079
0
            if (!st.ok()) {
2080
0
                file_size = 0;
2081
0
                if (st.is<NOT_FOUND>()) {
2082
0
                    LOG(INFO) << "cloud table size correctness check get inverted index v2 "
2083
0
                                 "0 because file not exist! msg:"
2084
0
                              << st.msg() << ", inverted index path:" << inverted_index_file_path;
2085
0
                } else {
2086
0
                    LOG(WARNING) << "cloud table size correctness check get inverted index v2 "
2087
0
                                    "size failed! msg:"
2088
0
                                 << st.msg()
2089
0
                                 << ", inverted index path:" << inverted_index_file_path;
2090
0
                }
2091
0
            }
2092
0
            total_inverted_index_size += file_size;
2093
0
        }
2094
0
    }
2095
0
    return total_inverted_index_size;
2096
0
}
2097
2098
Status CloudMetaMgr::fill_version_holes(CloudTablet* tablet, int64_t max_version,
2099
360k
                                        std::unique_lock<std::shared_mutex>& wlock) {
2100
360k
    if (max_version <= 0) {
2101
136k
        return Status::OK();
2102
136k
    }
2103
2104
223k
    Versions existing_versions;
2105
1.29M
    for (const auto& [_, rs] : tablet->tablet_meta()->all_rs_metas()) {
2106
1.29M
        existing_versions.emplace_back(rs->version());
2107
1.29M
    }
2108
2109
    // If there are no existing versions, it may be a new tablet for restore, so skip filling holes.
2110
223k
    if (existing_versions.empty()) {
2111
1
        return Status::OK();
2112
1
    }
2113
2114
223k
    std::vector<RowsetSharedPtr> hole_rowsets;
2115
    // sort the existing versions in ascending order
2116
223k
    std::sort(existing_versions.begin(), existing_versions.end(),
2117
3.92M
              [](const Version& a, const Version& b) {
2118
                  // simple because 2 versions are certainly not overlapping
2119
3.92M
                  return a.first < b.first;
2120
3.92M
              });
2121
2122
    // During schema change, get_tablet operations on new tablets trigger sync_tablet_rowsets which calls
2123
    // fill_version_holes. For schema change tablets (TABLET_NOTREADY state), we selectively skip hole
2124
    // filling for versions <= alter_version to prevent:
2125
    // 1. Abnormal compaction score calculations for schema change tablets
2126
    // 2. Unexpected -235 errors during load operations
2127
    // This allows schema change to proceed normally while still permitting hole filling for versions
2128
    // beyond the alter_version threshold.
2129
223k
    bool is_schema_change_tablet = tablet->tablet_state() == TABLET_NOTREADY;
2130
223k
    if (is_schema_change_tablet && tablet->alter_version() <= 1) {
2131
5.50k
        LOG(INFO) << "Skip version hole filling for new schema change tablet "
2132
5.50k
                  << tablet->tablet_id() << " with alter_version " << tablet->alter_version();
2133
5.50k
        return Status::OK();
2134
5.50k
    }
2135
2136
218k
    int64_t last_version = -1;
2137
1.28M
    for (const Version& version : existing_versions) {
2138
18.4E
        VLOG_NOTICE << "Existing version for tablet " << tablet->tablet_id() << ": ["
2139
18.4E
                    << version.first << ", " << version.second << "]";
2140
        // missing versions are those that are not in the existing_versions
2141
1.28M
        if (version.first > last_version + 1) {
2142
            // there is a hole between versions
2143
19
            auto prev_non_hole_rowset = tablet->get_rowset_by_version(version);
2144
157
            for (int64_t ver = last_version + 1; ver < version.first; ++ver) {
2145
                // Skip hole filling for versions <= alter_version during schema change
2146
138
                if (is_schema_change_tablet && ver <= tablet->alter_version()) {
2147
128
                    continue;
2148
128
                }
2149
10
                RowsetSharedPtr hole_rowset;
2150
10
                RETURN_IF_ERROR(create_empty_rowset_for_hole(
2151
10
                        tablet, ver, prev_non_hole_rowset->rowset_meta(), &hole_rowset));
2152
10
                hole_rowsets.push_back(hole_rowset);
2153
10
            }
2154
19
            LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1
2155
19
                      << " to " << version.first - 1 << " for tablet " << tablet->tablet_id()
2156
19
                      << (is_schema_change_tablet
2157
19
                                  ? (", schema change tablet skipped filling versions <= " +
2158
13
                                     std::to_string(tablet->alter_version()))
2159
19
                                  : "");
2160
19
        }
2161
1.28M
        last_version = version.second;
2162
1.28M
    }
2163
2164
218k
    if (last_version + 1 <= max_version) {
2165
711
        LOG(INFO) << "Created empty rowset for version hole, from " << last_version + 1 << " to "
2166
711
                  << max_version << " for tablet " << tablet->tablet_id()
2167
711
                  << (is_schema_change_tablet
2168
711
                              ? (", schema change tablet skipped filling versions <= " +
2169
9
                                 std::to_string(tablet->alter_version()))
2170
711
                              : "");
2171
        // there is a hole after the last existing version
2172
2.98k
        for (; last_version + 1 <= max_version; ++last_version) {
2173
            // Skip hole filling for versions <= alter_version during schema change
2174
2.26k
            if (is_schema_change_tablet && last_version + 1 <= tablet->alter_version()) {
2175
149
                continue;
2176
149
            }
2177
2.12k
            RowsetSharedPtr hole_rowset;
2178
2.12k
            auto prev_non_hole_rowset = tablet->get_rowset_by_version(existing_versions.back());
2179
2.12k
            RETURN_IF_ERROR(create_empty_rowset_for_hole(
2180
2.12k
                    tablet, last_version + 1, prev_non_hole_rowset->rowset_meta(), &hole_rowset));
2181
2.12k
            hole_rowsets.push_back(hole_rowset);
2182
2.12k
        }
2183
711
    }
2184
2185
218k
    if (!hole_rowsets.empty()) {
2186
705
        size_t hole_count = hole_rowsets.size();
2187
705
        tablet->add_rowsets(std::move(hole_rowsets), false, wlock, false);
2188
705
        g_cloud_version_hole_filled_count << hole_count;
2189
705
    }
2190
218k
    return Status::OK();
2191
218k
}
2192
2193
Status CloudMetaMgr::create_empty_rowset_for_hole(CloudTablet* tablet, int64_t version,
2194
                                                  RowsetMetaSharedPtr prev_rowset_meta,
2195
2.13k
                                                  RowsetSharedPtr* rowset) {
2196
    // Create a RowsetMeta for the empty rowset
2197
2.13k
    auto rs_meta = std::make_shared<RowsetMeta>();
2198
2199
    // Generate a deterministic rowset ID for the hole (same tablet_id + version = same rowset_id)
2200
2.13k
    RowsetId hole_rowset_id;
2201
2.13k
    hole_rowset_id.init(2, 0, tablet->tablet_id(), version);
2202
2.13k
    rs_meta->set_rowset_id(hole_rowset_id);
2203
2204
    // Generate a deterministic load_id for the hole rowset (same tablet_id + version = same load_id)
2205
2.13k
    PUniqueId load_id;
2206
2.13k
    load_id.set_hi(tablet->tablet_id());
2207
2.13k
    load_id.set_lo(version);
2208
2.13k
    rs_meta->set_load_id(load_id);
2209
2210
    // Copy schema and other metadata from template
2211
2.13k
    rs_meta->set_tablet_schema(prev_rowset_meta->tablet_schema());
2212
2.13k
    rs_meta->set_rowset_type(prev_rowset_meta->rowset_type());
2213
2.13k
    rs_meta->set_tablet_schema_hash(prev_rowset_meta->tablet_schema_hash());
2214
2.13k
    rs_meta->set_resource_id(prev_rowset_meta->resource_id());
2215
2216
    // Basic tablet information
2217
2.13k
    rs_meta->set_tablet_id(tablet->tablet_id());
2218
2.13k
    rs_meta->set_index_id(tablet->index_id());
2219
2.13k
    rs_meta->set_partition_id(tablet->partition_id());
2220
2.13k
    rs_meta->set_tablet_uid(tablet->tablet_uid());
2221
2.13k
    rs_meta->set_version(Version(version, version));
2222
2.13k
    rs_meta->set_txn_id(version);
2223
2224
2.13k
    rs_meta->set_num_rows(0);
2225
2.13k
    rs_meta->set_total_disk_size(0);
2226
2.13k
    rs_meta->set_data_disk_size(0);
2227
2.13k
    rs_meta->set_index_disk_size(0);
2228
2.13k
    rs_meta->set_empty(true);
2229
2.13k
    rs_meta->set_num_segments(0);
2230
2.13k
    rs_meta->set_segments_overlap(NONOVERLAPPING);
2231
2.13k
    rs_meta->set_rowset_state(VISIBLE);
2232
2.13k
    rs_meta->set_creation_time(UnixSeconds());
2233
2.13k
    rs_meta->set_newest_write_timestamp(UnixSeconds());
2234
2235
2.13k
    Status s = RowsetFactory::create_rowset(nullptr, "", rs_meta, rowset);
2236
2.13k
    if (!s.ok()) {
2237
0
        LOG_WARNING("Failed to create empty rowset for hole")
2238
0
                .tag("tablet_id", tablet->tablet_id())
2239
0
                .tag("version", version)
2240
0
                .error(s);
2241
0
        return s;
2242
0
    }
2243
2.13k
    (*rowset)->set_hole_rowset(true);
2244
2245
2.13k
    return Status::OK();
2246
2.13k
}
2247
2248
2
Status CloudMetaMgr::list_snapshot(std::vector<SnapshotInfoPB>& snapshots) {
2249
2
    ListSnapshotRequest req;
2250
2
    ListSnapshotResponse res;
2251
2
    req.set_cloud_unique_id(config::cloud_unique_id);
2252
2
    req.set_include_aborted(true);
2253
2
    RETURN_IF_ERROR(retry_rpc("list snapshot", req, &res, &MetaService_Stub::list_snapshot));
2254
0
    for (auto& snapshot : res.snapshots()) {
2255
0
        snapshots.emplace_back(snapshot);
2256
0
    }
2257
0
    return Status::OK();
2258
2
}
2259
2260
Status CloudMetaMgr::get_snapshot_properties(SnapshotSwitchStatus& switch_status,
2261
                                             int64_t& max_reserved_snapshots,
2262
1
                                             int64_t& snapshot_interval_seconds) {
2263
1
    GetInstanceRequest req;
2264
1
    GetInstanceResponse res;
2265
1
    req.set_cloud_unique_id(config::cloud_unique_id);
2266
1
    RETURN_IF_ERROR(
2267
1
            retry_rpc("get snapshot properties", req, &res, &MetaService_Stub::get_instance));
2268
1
    switch_status = res.instance().has_snapshot_switch_status()
2269
1
                            ? res.instance().snapshot_switch_status()
2270
1
                            : SnapshotSwitchStatus::SNAPSHOT_SWITCH_DISABLED;
2271
1
    max_reserved_snapshots =
2272
1
            res.instance().has_max_reserved_snapshot() ? res.instance().max_reserved_snapshot() : 0;
2273
1
    snapshot_interval_seconds = res.instance().has_snapshot_interval_seconds()
2274
1
                                        ? res.instance().snapshot_interval_seconds()
2275
1
                                        : 3600;
2276
1
    return Status::OK();
2277
1
}
2278
2279
Status CloudMetaMgr::update_packed_file_info(const std::string& packed_file_path,
2280
14.6k
                                             const cloud::PackedFileInfoPB& packed_file_info) {
2281
14.6k
    VLOG_DEBUG << "Updating meta service for packed file: " << packed_file_path << " with "
2282
0
               << packed_file_info.total_slice_num() << " small files"
2283
0
               << ", total bytes: " << packed_file_info.total_slice_bytes();
2284
2285
    // Create request
2286
14.6k
    cloud::UpdatePackedFileInfoRequest req;
2287
14.6k
    cloud::UpdatePackedFileInfoResponse resp;
2288
2289
    // Set required fields
2290
14.6k
    req.set_cloud_unique_id(config::cloud_unique_id);
2291
14.6k
    req.set_packed_file_path(packed_file_path);
2292
14.6k
    *req.mutable_packed_file_info() = packed_file_info;
2293
2294
    // Make RPC call using retry pattern
2295
14.6k
    return retry_rpc("update packed file info", req, &resp,
2296
14.6k
                     &cloud::MetaService_Stub::update_packed_file_info);
2297
14.6k
}
2298
2299
Status CloudMetaMgr::get_cluster_status(
2300
        std::unordered_map<std::string, std::pair<int32_t, int64_t>>* result,
2301
76
        std::string* my_cluster_id) {
2302
76
    GetClusterStatusRequest req;
2303
76
    GetClusterStatusResponse resp;
2304
76
    req.add_cloud_unique_ids(config::cloud_unique_id);
2305
2306
76
    Status s = retry_rpc("get cluster status", req, &resp, &MetaService_Stub::get_cluster_status);
2307
76
    if (!s.ok()) {
2308
0
        return s;
2309
0
    }
2310
2311
76
    result->clear();
2312
76
    for (const auto& detail : resp.details()) {
2313
76
        for (const auto& cluster : detail.clusters()) {
2314
            // Store cluster status and mtime (mtime is in seconds from MS, convert to ms).
2315
            // If mtime is not set, use current time as a conservative default
2316
            // to avoid immediate takeover due to elapsed being huge.
2317
76
            int64_t mtime_ms = cluster.has_mtime() ? cluster.mtime() * 1000 : UnixMillis();
2318
76
            (*result)[cluster.cluster_id()] = {static_cast<int32_t>(cluster.cluster_status()),
2319
76
                                               mtime_ms};
2320
76
        }
2321
76
    }
2322
2323
76
    if (my_cluster_id && resp.has_requester_cluster_id()) {
2324
1
        *my_cluster_id = resp.requester_cluster_id();
2325
1
    }
2326
2327
76
    return Status::OK();
2328
76
}
2329
2330
#include "common/compile_check_end.h"
2331
} // namespace doris::cloud