Coverage Report

Created: 2026-03-12 14:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset_version_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <brpc/controller.h>
19
#include <bthread/bthread.h>
20
#include <bthread/countdown_event.h>
21
#include <bthread/mutex.h>
22
#include <bthread/types.h>
23
#include <bvar/latency_recorder.h>
24
#include <gen_cpp/FrontendService_types.h>
25
#include <gen_cpp/HeartbeatService_types.h>
26
#include <gen_cpp/Types_types.h>
27
#include <gen_cpp/internal_service.pb.h>
28
#include <gen_cpp/olap_file.pb.h>
29
#include <glog/logging.h>
30
31
#include <cstdint>
32
#include <memory>
33
#include <mutex>
34
#include <optional>
35
#include <ranges>
36
#include <sstream>
37
#include <utility>
38
39
#include "cloud/config.h"
40
#include "common/status.h"
41
#include "cpp/sync_point.h"
42
#include "service/backend_options.h"
43
#include "service/internal_service.h"
44
#include "storage/olap_common.h"
45
#include "storage/rowset/rowset.h"
46
#include "storage/rowset/rowset_factory.h"
47
#include "storage/rowset/rowset_reader.h"
48
#include "storage/tablet/base_tablet.h"
49
#include "util/brpc_client_cache.h"
50
#include "util/client_cache.h"
51
#include "util/debug_points.h"
52
#include "util/thrift_rpc_helper.h"
53
#include "util/time.h"
54
55
namespace doris {
56
57
using namespace ErrorCode;
58
using namespace std::ranges;
59
60
static bvar::LatencyRecorder g_remote_fetch_tablet_rowsets_single_request_latency(
61
        "remote_fetch_rowsets_single_rpc");
62
static bvar::LatencyRecorder g_remote_fetch_tablet_rowsets_latency("remote_fetch_rowsets");
63
64
[[nodiscard]] Result<std::vector<Version>> BaseTablet::capture_consistent_versions_unlocked(
65
1.27M
        const Version& version_range, const CaptureRowsetOps& options) const {
66
1.27M
    std::vector<Version> version_path;
67
1.27M
    auto st =
68
1.27M
            _timestamped_version_tracker.capture_consistent_versions(version_range, &version_path);
69
1.27M
    if (!st && !options.quiet) {
70
1
        auto missed_versions = get_missed_versions_unlocked(version_range.second);
71
1
        if (missed_versions.empty()) {
72
0
            LOG(WARNING) << fmt::format(
73
0
                    "version already has been merged. version_range={}, max_version={}, "
74
0
                    "tablet_id={}",
75
0
                    version_range.to_string(), _tablet_meta->max_version().second, tablet_id());
76
0
            return ResultError(Status::Error<VERSION_ALREADY_MERGED>(
77
0
                    "missed versions is empty, version_range={}, max_version={}, tablet_id={}",
78
0
                    version_range.to_string(), _tablet_meta->max_version().second, tablet_id()));
79
0
        }
80
1
        LOG(WARNING) << fmt::format("missed version for version_range={}, tablet_id={}, st={}",
81
1
                                    version_range.to_string(), tablet_id(), st);
82
1
        _print_missed_versions(missed_versions);
83
1
        if (!options.skip_missing_versions) {
84
1
            return ResultError(std::move(st));
85
1
        }
86
1
        LOG(WARNING) << "force skipping missing version for tablet:" << tablet_id();
87
0
    }
88
1.27M
    DBUG_EXECUTE_IF("Tablet::capture_consistent_versions.inject_failure", {
89
1.27M
        auto tablet_id = dp->param<int64_t>("tablet_id", -1);
90
1.27M
        auto skip_by_option = dp->param<bool>("skip_by_option", false);
91
1.27M
        if (skip_by_option && !options.enable_fetch_rowsets_from_peers) {
92
1.27M
            return version_path;
93
1.27M
        }
94
1.27M
        if ((tablet_id != -1 && (tablet_id == _tablet_meta->tablet_id())) || tablet_id == -2) {
95
1.27M
            return ResultError(Status::Error<VERSION_ALREADY_MERGED>("version already merged"));
96
1.27M
        }
97
1.27M
    });
98
1.27M
    return version_path;
99
1.27M
}
100
101
[[nodiscard]] Result<CaptureRowsetResult> BaseTablet::capture_consistent_rowsets_unlocked(
102
1.27M
        const Version& version_range, const CaptureRowsetOps& options) const {
103
1.27M
    CaptureRowsetResult result;
104
1.27M
    auto& rowsets = result.rowsets;
105
1.27M
    auto maybe_versions = capture_consistent_versions_unlocked(version_range, options);
106
1.27M
    if (maybe_versions) {
107
1.27M
        const auto& version_paths = maybe_versions.value();
108
1.27M
        rowsets.reserve(version_paths.size());
109
110
1.27M
        auto rowset_for_version = [&](const Version& version,
111
4.14M
                                      bool include_stale) -> Result<RowsetSharedPtr> {
112
4.14M
            if (auto it = _rs_version_map.find(version); it != _rs_version_map.end()) {
113
4.14M
                return it->second;
114
4.14M
            } else {
115
193
                VLOG_NOTICE << "fail to find Rowset in rs_version for version. tablet="
116
55
                            << tablet_id() << ", version='" << version.first << "-"
117
55
                            << version.second;
118
193
            }
119
193
            if (include_stale) {
120
138
                if (auto it = _stale_rs_version_map.find(version);
121
138
                    it != _stale_rs_version_map.end()) {
122
138
                    return it->second;
123
138
                } else {
124
0
                    LOG(WARNING) << fmt::format(
125
0
                            "fail to find Rowset in stale_rs_version for version. tablet={}, "
126
0
                            "version={}-{}",
127
0
                            tablet_id(), version.first, version.second);
128
0
                }
129
138
            }
130
55
            return ResultError(Status::Error<CAPTURE_ROWSET_ERROR>(
131
55
                    "failed to find rowset for version={}", version.to_string()));
132
193
        };
133
134
4.14M
        for (const auto& version : version_paths) {
135
4.14M
            auto ret = rowset_for_version(version, options.include_stale_rowsets);
136
4.14M
            if (!ret) {
137
0
                return ResultError(std::move(ret.error()));
138
0
            }
139
140
4.14M
            rowsets.push_back(std::move(ret.value()));
141
4.14M
        }
142
1.27M
        if (keys_type() == KeysType::UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
143
454k
            result.delete_bitmap = _tablet_meta->delete_bitmap_ptr();
144
454k
        }
145
1.27M
        return result;
146
1.27M
    }
147
148
18.4E
    if (!config::is_cloud_mode() || !options.enable_fetch_rowsets_from_peers) {
149
1
        return ResultError(std::move(maybe_versions.error()));
150
1
    }
151
18.4E
    auto ret = _remote_capture_rowsets(version_range);
152
18.4E
    if (!ret) {
153
0
        auto st = Status::Error<VERSION_ALREADY_MERGED>(
154
0
                "version already merged, meet error during remote capturing rowsets, "
155
0
                "error={}, version_range={}",
156
0
                ret.error().to_string(), version_range.to_string());
157
0
        return ResultError(std::move(st));
158
0
    }
159
18.4E
    return ret;
160
18.4E
}
161
162
[[nodiscard]] Result<std::vector<RowSetSplits>> BaseTablet::capture_rs_readers_unlocked(
163
4.99k
        const Version& version_range, const CaptureRowsetOps& options) const {
164
4.99k
    auto maybe_rs_list = capture_consistent_rowsets_unlocked(version_range, options);
165
4.99k
    if (!maybe_rs_list) {
166
1
        return ResultError(std::move(maybe_rs_list.error()));
167
1
    }
168
4.99k
    const auto& rs_list = maybe_rs_list.value().rowsets;
169
4.99k
    std::vector<RowSetSplits> rs_splits;
170
4.99k
    rs_splits.reserve(rs_list.size());
171
8.65k
    for (const auto& rs : rs_list) {
172
8.65k
        RowsetReaderSharedPtr rs_reader;
173
8.65k
        auto st = rs->create_reader(&rs_reader);
174
8.65k
        if (!st) {
175
0
            return ResultError(Status::Error<CAPTURE_ROWSET_READER_ERROR>(
176
0
                    "failed to create reader for rowset={}, reason={}", rs->rowset_id().to_string(),
177
0
                    st.to_string()));
178
0
        }
179
8.65k
        rs_splits.emplace_back(std::move(rs_reader));
180
8.65k
    }
181
4.99k
    return rs_splits;
182
4.99k
}
183
184
[[nodiscard]] Result<TabletReadSource> BaseTablet::capture_read_source(
185
1.27M
        const Version& version_range, const CaptureRowsetOps& options) {
186
1.27M
    std::shared_lock rdlock(get_header_lock());
187
1.27M
    auto maybe_result = capture_consistent_rowsets_unlocked(version_range, options);
188
1.27M
    if (!maybe_result) {
189
0
        return ResultError(std::move(maybe_result.error()));
190
0
    }
191
1.27M
    auto rowsets_result = std::move(maybe_result.value());
192
1.27M
    TabletReadSource read_source;
193
1.27M
    read_source.delete_bitmap = std::move(rowsets_result.delete_bitmap);
194
1.27M
    const auto& rowsets = rowsets_result.rowsets;
195
1.27M
    read_source.rs_splits.reserve(rowsets.size());
196
4.13M
    for (const auto& rs : rowsets) {
197
4.13M
        RowsetReaderSharedPtr rs_reader;
198
4.13M
        auto st = rs->create_reader(&rs_reader);
199
4.13M
        if (!st) {
200
0
            return ResultError(Status::Error<CAPTURE_ROWSET_READER_ERROR>(
201
0
                    "failed to create reader for rowset={}, reason={}", rs->rowset_id().to_string(),
202
0
                    st.to_string()));
203
0
        }
204
4.13M
        read_source.rs_splits.emplace_back(std::move(rs_reader));
205
4.13M
    }
206
1.27M
    return read_source;
207
1.27M
}
208
209
template <typename Fn, typename... Args>
210
0
bool call_bthread(bthread_t& th, const bthread_attr_t* attr, Fn&& fn, Args&&... args) {
211
0
    auto p_wrap_fn = new auto([=] { fn(args...); });
212
0
    auto call_back = [](void* ar) -> void* {
213
0
        auto f = reinterpret_cast<decltype(p_wrap_fn)>(ar);
214
0
        (*f)();
215
0
        delete f;
216
0
        return nullptr;
217
0
    };
218
0
    return bthread_start_background(&th, attr, call_back, p_wrap_fn) == 0;
219
0
}
220
221
struct GetRowsetsCntl : std::enable_shared_from_this<GetRowsetsCntl> {
222
    struct RemoteGetRowsetResult {
223
        std::vector<RowsetMetaSharedPtr> rowsets;
224
        std::unique_ptr<DeleteBitmap> delete_bitmap;
225
    };
226
227
0
    Status start_req_bg() {
228
0
        task_cnt = req_addrs.size();
229
0
        for (const auto& [ip, port] : req_addrs) {
230
0
            bthread_t tid;
231
0
            bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
232
233
0
            bool succ = call_bthread(tid, &attr, [self = shared_from_this(), &ip, port]() {
234
0
                LOG(INFO) << "start to get tablet rowsets from peer BE, ip=" << ip;
235
0
                Defer defer_log {[&ip, port]() {
236
0
                    LOG(INFO) << "finish to get rowsets from peer BE, ip=" << ip
237
0
                              << ", port=" << port;
238
0
                }};
239
240
0
                PGetTabletRowsetsRequest req;
241
0
                req.set_tablet_id(self->tablet_id);
242
0
                req.set_version_start(self->version_range.first);
243
0
                req.set_version_end(self->version_range.second);
244
0
                if (self->delete_bitmap_keys.has_value()) {
245
0
                    req.mutable_delete_bitmap_keys()->CopyFrom(self->delete_bitmap_keys.value());
246
0
                }
247
0
                brpc::Controller cntl;
248
0
                cntl.set_timeout_ms(60000);
249
0
                cntl.set_max_retry(3);
250
0
                PGetTabletRowsetsResponse response;
251
0
                auto start_tm_us = MonotonicMicros();
252
0
#ifndef BE_TEST
253
0
                std::shared_ptr<PBackendService_Stub> stub =
254
0
                        ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(ip, port);
255
0
                if (stub == nullptr) {
256
0
                    self->result = ResultError(Status::InternalError(
257
0
                            "failed to fetch get_tablet_rowsets stub, ip={}, port={}", ip, port));
258
0
                    return;
259
0
                }
260
0
                stub->get_tablet_rowsets(&cntl, &req, &response, nullptr);
261
#else
262
                TEST_SYNC_POINT_CALLBACK("get_tablet_rowsets", &response);
263
#endif
264
0
                g_remote_fetch_tablet_rowsets_single_request_latency
265
0
                        << MonotonicMicros() - start_tm_us;
266
267
0
                std::unique_lock l(self->butex);
268
0
                if (self->done) {
269
0
                    return;
270
0
                }
271
0
                --self->task_cnt;
272
0
                auto resp_st = Status::create(response.status());
273
0
                DBUG_EXECUTE_IF("GetRowsetCntl::start_req_bg.inject_failure",
274
0
                                { resp_st = Status::InternalError("inject error"); });
275
0
                if (cntl.Failed() || !resp_st) {
276
0
                    if (self->task_cnt != 0) {
277
0
                        return;
278
0
                    }
279
0
                    std::stringstream reason;
280
0
                    reason << "failed to get rowsets from all replicas, tablet_id="
281
0
                           << self->tablet_id;
282
0
                    if (cntl.Failed()) {
283
0
                        reason << ", reason=[" << cntl.ErrorCode() << "] " << cntl.ErrorText();
284
0
                    } else {
285
0
                        reason << ", reason=" << resp_st.to_string();
286
0
                    }
287
0
                    self->result = ResultError(Status::InternalError(reason.str()));
288
0
                    self->done = true;
289
0
                    self->event.signal();
290
0
                    return;
291
0
                }
292
293
0
                Defer done_cb {[&]() {
294
0
                    self->done = true;
295
0
                    self->event.signal();
296
0
                }};
297
0
                std::vector<RowsetMetaSharedPtr> rs_metas;
298
0
                for (auto&& rs_pb : response.rowsets()) {
299
0
                    auto rs_meta = std::make_shared<RowsetMeta>();
300
0
                    if (!rs_meta->init_from_pb(rs_pb)) {
301
0
                        self->result =
302
0
                                ResultError(Status::InternalError("failed to init rowset from pb"));
303
0
                        return;
304
0
                    }
305
0
                    rs_metas.push_back(std::move(rs_meta));
306
0
                }
307
0
                CaptureRowsetResult result;
308
0
                self->result->rowsets = std::move(rs_metas);
309
310
0
                if (response.has_delete_bitmap()) {
311
0
                    self->result->delete_bitmap = std::make_unique<DeleteBitmap>(
312
0
                            DeleteBitmap::from_pb(response.delete_bitmap(), self->tablet_id));
313
0
                }
314
0
            });
315
316
0
            if (!succ) {
317
0
                return Status::InternalError(
318
0
                        "failed to create bthread when request rowsets for tablet={}", tablet_id);
319
0
            }
320
0
        }
321
0
        return Status::OK();
322
0
    }
323
324
0
    Result<RemoteGetRowsetResult> wait_for_ret() {
325
0
        event.wait();
326
0
        return std::move(result);
327
0
    }
328
329
    int64_t tablet_id;
330
    std::vector<std::pair<std::string, int32_t>> req_addrs;
331
    Version version_range;
332
    std::optional<DeleteBitmapPB> delete_bitmap_keys = std::nullopt;
333
334
private:
335
    size_t task_cnt;
336
337
    bthread::Mutex butex;
338
    bthread::CountdownEvent event {1};
339
    bool done = false;
340
341
    Result<RemoteGetRowsetResult> result;
342
};
343
344
Result<std::vector<std::pair<std::string, int32_t>>> get_peer_replicas_addresses(
345
0
        const int64_t tablet_id) {
346
0
    auto* cluster_info = ExecEnv::GetInstance()->cluster_info();
347
0
    DCHECK_NE(cluster_info, nullptr);
348
0
    auto master_addr = cluster_info->master_fe_addr;
349
0
    TGetTabletReplicaInfosRequest req;
350
0
    req.tablet_ids.push_back(tablet_id);
351
0
    TGetTabletReplicaInfosResult resp;
352
0
    auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
353
0
            master_addr.hostname, master_addr.port,
354
0
            [&](FrontendServiceConnection& client) { client->getTabletReplicaInfos(resp, req); });
355
0
    if (!st) {
356
0
        return ResultError(Status::InternalError(
357
0
                "failed to get tablet replica infos, rpc error={}, tablet_id={}", st.to_string(),
358
0
                tablet_id));
359
0
    }
360
361
0
    auto it = resp.tablet_replica_infos.find(tablet_id);
362
0
    if (it == resp.tablet_replica_infos.end()) {
363
0
        return ResultError(Status::InternalError("replicas not found, tablet_id={}", tablet_id));
364
0
    }
365
0
    auto replicas = it->second;
366
0
    auto local_host = BackendOptions::get_localhost();
367
0
    bool include_local_host = false;
368
0
    DBUG_EXECUTE_IF("get_peer_replicas_address.enable_local_host", { include_local_host = true; });
369
0
    auto ret_view =
370
0
            replicas | std::views::filter([&local_host, include_local_host](const auto& replica) {
371
0
                return local_host.find(replica.host) == std::string::npos || include_local_host;
372
0
            }) |
373
0
            std::views::transform([](auto& replica) {
374
0
                return std::make_pair(std::move(replica.host), replica.brpc_port);
375
0
            });
376
0
    return std::vector(ret_view.begin(), ret_view.end());
377
0
}
378
379
Result<CaptureRowsetResult> BaseTablet::_remote_capture_rowsets(
380
0
        const Version& version_range) const {
381
0
    auto start_tm_us = MonotonicMicros();
382
0
    Defer defer {
383
0
            [&]() { g_remote_fetch_tablet_rowsets_latency << MonotonicMicros() - start_tm_us; }};
384
0
#ifndef BE_TEST
385
0
    auto maybe_be_addresses = get_peer_replicas_addresses(tablet_id());
386
#else
387
    Result<std::vector<std::pair<std::string, int32_t>>> maybe_be_addresses;
388
    TEST_SYNC_POINT_CALLBACK("get_peer_replicas_addresses", &maybe_be_addresses);
389
#endif
390
0
    DBUG_EXECUTE_IF("Tablet::_remote_get_rowsets_meta.inject_replica_address_fail",
391
0
                    { maybe_be_addresses = ResultError(Status::InternalError("inject failure")); });
392
0
    if (!maybe_be_addresses) {
393
0
        return ResultError(std::move(maybe_be_addresses.error()));
394
0
    }
395
0
    auto be_addresses = std::move(maybe_be_addresses.value());
396
0
    if (be_addresses.empty()) {
397
0
        LOG(WARNING) << "no peers replica for tablet=" << tablet_id();
398
0
        return ResultError(Status::InternalError("no replicas for tablet={}", tablet_id()));
399
0
    }
400
401
0
    auto cntl = std::make_shared<GetRowsetsCntl>();
402
0
    cntl->tablet_id = tablet_id();
403
0
    cntl->req_addrs = std::move(be_addresses);
404
0
    cntl->version_range = version_range;
405
0
    bool is_mow = keys_type() == KeysType::UNIQUE_KEYS && enable_unique_key_merge_on_write();
406
0
    CaptureRowsetResult result;
407
0
    if (is_mow) {
408
0
        result.delete_bitmap =
409
0
                std::make_unique<DeleteBitmap>(_tablet_meta->delete_bitmap().snapshot());
410
0
        DeleteBitmapPB delete_bitmap_keys;
411
0
        auto keyset = result.delete_bitmap->delete_bitmap |
412
0
                      std::views::transform([](const auto& kv) { return kv.first; });
413
0
        for (const auto& key : keyset) {
414
0
            const auto& [rs_id, seg_id, version] = key;
415
0
            delete_bitmap_keys.mutable_rowset_ids()->Add(rs_id.to_string());
416
0
            delete_bitmap_keys.mutable_segment_ids()->Add(seg_id);
417
0
            delete_bitmap_keys.mutable_versions()->Add(version);
418
0
        }
419
0
        cntl->delete_bitmap_keys = std::move(delete_bitmap_keys);
420
0
    }
421
422
0
    RETURN_IF_ERROR_RESULT(cntl->start_req_bg());
423
0
    auto maybe_meta = cntl->wait_for_ret();
424
0
    if (!maybe_meta) {
425
0
        auto err = Status::InternalError(
426
0
                "tried to get rowsets from peer replicas and failed, "
427
0
                "reason={}",
428
0
                maybe_meta.error());
429
0
        return ResultError(std::move(err));
430
0
    }
431
432
0
    auto& remote_meta = maybe_meta.value();
433
0
    const auto& rs_metas = remote_meta.rowsets;
434
0
    for (const auto& rs_meta : rs_metas) {
435
0
        RowsetSharedPtr rs;
436
0
        auto st = RowsetFactory::create_rowset(_tablet_meta->tablet_schema(), {}, rs_meta, &rs);
437
0
        if (!st) {
438
0
            return ResultError(std::move(st));
439
0
        }
440
0
        result.rowsets.push_back(std::move(rs));
441
0
    }
442
0
    if (is_mow) {
443
        DCHECK_NE(result.delete_bitmap, nullptr);
444
0
        result.delete_bitmap->merge(*remote_meta.delete_bitmap);
445
0
    }
446
0
    return result;
447
0
}
448
449
} // namespace doris