Coverage Report

Created: 2026-05-19 15:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset_version_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <brpc/controller.h>
19
#include <bthread/bthread.h>
20
#include <bthread/countdown_event.h>
21
#include <bthread/mutex.h>
22
#include <bthread/types.h>
23
#include <bvar/latency_recorder.h>
24
#include <gen_cpp/FrontendService_types.h>
25
#include <gen_cpp/HeartbeatService_types.h>
26
#include <gen_cpp/Types_types.h>
27
#include <gen_cpp/internal_service.pb.h>
28
#include <gen_cpp/olap_file.pb.h>
29
#include <glog/logging.h>
30
31
#include <cstdint>
32
#include <memory>
33
#include <mutex>
34
#include <optional>
35
#include <ranges>
36
#include <sstream>
37
#include <utility>
38
39
#include "cloud/config.h"
40
#include "common/status.h"
41
#include "cpp/sync_point.h"
42
#include "service/backend_options.h"
43
#include "service/internal_service.h"
44
#include "storage/olap_common.h"
45
#include "storage/rowset/rowset.h"
46
#include "storage/rowset/rowset_factory.h"
47
#include "storage/rowset/rowset_reader.h"
48
#include "storage/tablet/base_tablet.h"
49
#include "util/brpc_client_cache.h"
50
#include "util/client_cache.h"
51
#include "util/debug_points.h"
52
#include "util/thrift_rpc_helper.h"
53
#include "util/time.h"
54
55
namespace doris {
56
57
using namespace ErrorCode;
58
using namespace std::ranges;
59
60
static bvar::LatencyRecorder g_remote_fetch_tablet_rowsets_single_request_latency(
61
        "remote_fetch_rowsets_single_rpc");
62
static bvar::LatencyRecorder g_remote_fetch_tablet_rowsets_latency("remote_fetch_rowsets");
63
64
[[nodiscard]] Result<std::vector<Version>> BaseTablet::capture_consistent_versions_unlocked(
65
17
        const Version& version_range, const CaptureRowsetOps& options) const {
66
17
    std::vector<Version> version_path;
67
17
    auto& version_tracker =
68
17
            options.capture_row_binlog ? _row_binlog_version_tracker : _timestamped_version_tracker;
69
17
    auto st = version_tracker.capture_consistent_versions(version_range, &version_path);
70
17
    if (!st && !options.quiet) {
71
1
        auto missed_versions =
72
1
                get_missed_versions_unlocked(version_range.second, options.capture_row_binlog);
73
1
        if (missed_versions.empty()) {
74
0
            LOG(WARNING) << fmt::format(
75
0
                    "version already has been merged. version_range={}, max_version={}, "
76
0
                    "tablet_id={}",
77
0
                    version_range.to_string(), _tablet_meta->max_version().second, tablet_id());
78
0
            return ResultError(Status::Error<VERSION_ALREADY_MERGED>(
79
0
                    "missed versions is empty, version_range={}, max_version={}, tablet_id={}",
80
0
                    version_range.to_string(), _tablet_meta->max_version().second, tablet_id()));
81
0
        }
82
1
        LOG(WARNING) << fmt::format("missed version for version_range={}, tablet_id={}, st={}",
83
1
                                    version_range.to_string(), tablet_id(), st);
84
1
        _print_missed_versions(missed_versions);
85
1
        if (!options.skip_missing_versions) {
86
1
            return ResultError(std::move(st));
87
1
        }
88
1
        LOG(WARNING) << "force skipping missing version for tablet:" << tablet_id();
89
0
    }
90
16
    DBUG_EXECUTE_IF("Tablet::capture_consistent_versions.inject_failure", {
91
16
        auto tablet_id = dp->param<int64_t>("tablet_id", -1);
92
16
        auto skip_by_option = dp->param<bool>("skip_by_option", false);
93
16
        if (skip_by_option && !options.enable_fetch_rowsets_from_peers) {
94
16
            return version_path;
95
16
        }
96
16
        if ((tablet_id != -1 && (tablet_id == _tablet_meta->tablet_id())) || tablet_id == -2) {
97
16
            return ResultError(Status::Error<VERSION_ALREADY_MERGED>("version already merged"));
98
16
        }
99
16
    });
100
16
    return version_path;
101
16
}
102
103
[[nodiscard]] Result<CaptureRowsetResult> BaseTablet::capture_consistent_rowsets_unlocked(
104
43
        const Version& version_range, const CaptureRowsetOps& options) const {
105
43
    CaptureRowsetResult result;
106
43
    auto& rowsets = result.rowsets;
107
43
    auto maybe_versions = capture_consistent_versions_unlocked(version_range, options);
108
43
    if (maybe_versions) {
109
42
        const auto& version_paths = maybe_versions.value();
110
42
        rowsets.reserve(version_paths.size());
111
112
42
        auto rowset_for_version = [&](const Version& version,
113
200
                                      bool include_stale) -> Result<RowsetSharedPtr> {
114
200
            const auto& rs_version_map =
115
200
                    options.capture_row_binlog ? _row_binlog_rs_version_map : _rs_version_map;
116
200
            if (auto it = rs_version_map.find(version); it != rs_version_map.end()) {
117
142
                return it->second;
118
142
            } else {
119
58
                VLOG_NOTICE << "fail to find Rowset in "
120
0
                            << (options.capture_row_binlog ? "row_binlog_rs_version" : "rs_version")
121
0
                            << " for version. tablet=" << tablet_id() << ", version='"
122
0
                            << version.first << "-" << version.second;
123
58
            }
124
58
            if (!options.capture_row_binlog && include_stale) {
125
58
                if (auto it = _stale_rs_version_map.find(version);
126
58
                    it != _stale_rs_version_map.end()) {
127
58
                    return it->second;
128
58
                } else {
129
0
                    LOG(WARNING) << fmt::format(
130
0
                            "fail to find Rowset in stale_rs_version for version. tablet={}, "
131
0
                            "version={}-{}",
132
0
                            tablet_id(), version.first, version.second);
133
0
                }
134
58
            }
135
0
            return ResultError(Status::Error<CAPTURE_ROWSET_ERROR>(
136
0
                    "failed to find rowset for version={}", version.to_string()));
137
58
        };
138
139
200
        for (const auto& version : version_paths) {
140
200
            auto ret = rowset_for_version(version, options.include_stale_rowsets);
141
200
            if (!ret) {
142
0
                return ResultError(std::move(ret.error()));
143
0
            }
144
145
200
            rowsets.push_back(std::move(ret.value()));
146
200
        }
147
42
        if (options.capture_row_binlog) {
148
0
            result.delete_bitmap = _tablet_meta->binlog_delvec_ptr();
149
42
        } else if (keys_type() == KeysType::UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
150
0
            result.delete_bitmap = _tablet_meta->delete_bitmap_ptr();
151
0
        }
152
42
        return result;
153
42
    }
154
155
1
    if (!config::is_cloud_mode() || !options.enable_fetch_rowsets_from_peers) {
156
1
        return ResultError(std::move(maybe_versions.error()));
157
1
    }
158
0
    auto ret = _remote_capture_rowsets(version_range);
159
0
    if (!ret) {
160
0
        auto st = Status::Error<VERSION_ALREADY_MERGED>(
161
0
                "version already merged, meet error during remote capturing rowsets, "
162
0
                "error={}, version_range={}",
163
0
                ret.error().to_string(), version_range.to_string());
164
0
        return ResultError(std::move(st));
165
0
    }
166
0
    return ret;
167
0
}
168
169
[[nodiscard]] Result<std::vector<RowSetSplits>> BaseTablet::capture_rs_readers_unlocked(
170
2
        const Version& version_range, const CaptureRowsetOps& options) const {
171
2
    auto maybe_rs_list = capture_consistent_rowsets_unlocked(version_range, options);
172
2
    if (!maybe_rs_list) {
173
1
        return ResultError(std::move(maybe_rs_list.error()));
174
1
    }
175
1
    const auto& rs_list = maybe_rs_list.value().rowsets;
176
1
    std::vector<RowSetSplits> rs_splits;
177
1
    rs_splits.reserve(rs_list.size());
178
1
    for (const auto& rs : rs_list) {
179
1
        RowsetReaderSharedPtr rs_reader;
180
1
        auto st = rs->create_reader(&rs_reader);
181
1
        if (!st) {
182
0
            return ResultError(Status::Error<CAPTURE_ROWSET_READER_ERROR>(
183
0
                    "failed to create reader for rowset={}, reason={}", rs->rowset_id().to_string(),
184
0
                    st.to_string()));
185
0
        }
186
1
        rs_splits.emplace_back(std::move(rs_reader));
187
1
    }
188
1
    return rs_splits;
189
1
}
190
191
[[nodiscard]] Result<TabletReadSource> BaseTablet::capture_read_source(
192
37
        const Version& version_range, const CaptureRowsetOps& options) {
193
37
    std::shared_lock rdlock(get_header_lock());
194
37
    auto maybe_result = capture_consistent_rowsets_unlocked(version_range, options);
195
37
    if (!maybe_result) {
196
0
        return ResultError(std::move(maybe_result.error()));
197
0
    }
198
37
    auto rowsets_result = std::move(maybe_result.value());
199
37
    TabletReadSource read_source;
200
37
    read_source.delete_bitmap = std::move(rowsets_result.delete_bitmap);
201
37
    const auto& rowsets = rowsets_result.rowsets;
202
37
    read_source.rs_splits.reserve(rowsets.size());
203
191
    for (const auto& rs : rowsets) {
204
191
        RowsetReaderSharedPtr rs_reader;
205
191
        auto st = rs->create_reader(&rs_reader);
206
191
        if (!st) {
207
0
            return ResultError(Status::Error<CAPTURE_ROWSET_READER_ERROR>(
208
0
                    "failed to create reader for rowset={}, reason={}", rs->rowset_id().to_string(),
209
0
                    st.to_string()));
210
0
        }
211
191
        read_source.rs_splits.emplace_back(std::move(rs_reader));
212
191
    }
213
37
    return read_source;
214
37
}
215
216
template <typename Fn, typename... Args>
217
0
bool call_bthread(bthread_t& th, const bthread_attr_t* attr, Fn&& fn, Args&&... args) {
218
0
    auto p_wrap_fn = new auto([=] { fn(args...); });
219
0
    auto call_back = [](void* ar) -> void* {
220
0
        auto f = reinterpret_cast<decltype(p_wrap_fn)>(ar);
221
0
        (*f)();
222
0
        delete f;
223
0
        return nullptr;
224
0
    };
225
0
    return bthread_start_background(&th, attr, call_back, p_wrap_fn) == 0;
226
0
}
227
228
struct GetRowsetsCntl : std::enable_shared_from_this<GetRowsetsCntl> {
229
    struct RemoteGetRowsetResult {
230
        std::vector<RowsetMetaSharedPtr> rowsets;
231
        std::unique_ptr<DeleteBitmap> delete_bitmap;
232
    };
233
234
0
    Status start_req_bg() {
235
0
        task_cnt = req_addrs.size();
236
0
        for (const auto& [ip, port] : req_addrs) {
237
0
            bthread_t tid;
238
0
            bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
239
240
0
            bool succ = call_bthread(tid, &attr, [self = shared_from_this(), &ip, port]() {
241
0
                LOG(INFO) << "start to get tablet rowsets from peer BE, ip=" << ip;
242
0
                Defer defer_log {[&ip, port]() {
243
0
                    LOG(INFO) << "finish to get rowsets from peer BE, ip=" << ip
244
0
                              << ", port=" << port;
245
0
                }};
246
247
0
                PGetTabletRowsetsRequest req;
248
0
                req.set_tablet_id(self->tablet_id);
249
0
                req.set_version_start(self->version_range.first);
250
0
                req.set_version_end(self->version_range.second);
251
0
                if (self->delete_bitmap_keys.has_value()) {
252
0
                    req.mutable_delete_bitmap_keys()->CopyFrom(self->delete_bitmap_keys.value());
253
0
                }
254
0
                brpc::Controller cntl;
255
0
                cntl.set_timeout_ms(60000);
256
0
                cntl.set_max_retry(3);
257
0
                PGetTabletRowsetsResponse response;
258
0
                auto start_tm_us = MonotonicMicros();
259
#ifndef BE_TEST
260
                std::shared_ptr<PBackendService_Stub> stub =
261
                        ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(ip, port);
262
                if (stub == nullptr) {
263
                    self->result = ResultError(Status::InternalError(
264
                            "failed to fetch get_tablet_rowsets stub, ip={}, port={}", ip, port));
265
                    return;
266
                }
267
                stub->get_tablet_rowsets(&cntl, &req, &response, nullptr);
268
#else
269
0
                TEST_SYNC_POINT_CALLBACK("get_tablet_rowsets", &response);
270
0
#endif
271
0
                g_remote_fetch_tablet_rowsets_single_request_latency
272
0
                        << MonotonicMicros() - start_tm_us;
273
274
0
                std::unique_lock l(self->butex);
275
0
                if (self->done) {
276
0
                    return;
277
0
                }
278
0
                --self->task_cnt;
279
0
                auto resp_st = Status::create(response.status());
280
0
                DBUG_EXECUTE_IF("GetRowsetCntl::start_req_bg.inject_failure",
281
0
                                { resp_st = Status::InternalError("inject error"); });
282
0
                if (cntl.Failed() || !resp_st) {
283
0
                    if (self->task_cnt != 0) {
284
0
                        return;
285
0
                    }
286
0
                    std::stringstream reason;
287
0
                    reason << "failed to get rowsets from all replicas, tablet_id="
288
0
                           << self->tablet_id;
289
0
                    if (cntl.Failed()) {
290
0
                        reason << ", reason=[" << cntl.ErrorCode() << "] " << cntl.ErrorText();
291
0
                    } else {
292
0
                        reason << ", reason=" << resp_st.to_string();
293
0
                    }
294
0
                    self->result = ResultError(Status::InternalError(reason.str()));
295
0
                    self->done = true;
296
0
                    self->event.signal();
297
0
                    return;
298
0
                }
299
300
0
                Defer done_cb {[&]() {
301
0
                    self->done = true;
302
0
                    self->event.signal();
303
0
                }};
304
0
                std::vector<RowsetMetaSharedPtr> rs_metas;
305
0
                for (auto&& rs_pb : response.rowsets()) {
306
0
                    auto rs_meta = std::make_shared<RowsetMeta>();
307
0
                    if (!rs_meta->init_from_pb(rs_pb)) {
308
0
                        self->result =
309
0
                                ResultError(Status::InternalError("failed to init rowset from pb"));
310
0
                        return;
311
0
                    }
312
0
                    rs_metas.push_back(std::move(rs_meta));
313
0
                }
314
0
                CaptureRowsetResult result;
315
0
                self->result->rowsets = std::move(rs_metas);
316
317
0
                if (response.has_delete_bitmap()) {
318
0
                    self->result->delete_bitmap = std::make_unique<DeleteBitmap>(
319
0
                            DeleteBitmap::from_pb(response.delete_bitmap(), self->tablet_id));
320
0
                }
321
0
            });
322
323
0
            if (!succ) {
324
0
                return Status::InternalError(
325
0
                        "failed to create bthread when request rowsets for tablet={}", tablet_id);
326
0
            }
327
0
        }
328
0
        return Status::OK();
329
0
    }
330
331
0
    Result<RemoteGetRowsetResult> wait_for_ret() {
332
0
        event.wait();
333
0
        return std::move(result);
334
0
    }
335
336
    int64_t tablet_id;
337
    std::vector<std::pair<std::string, int32_t>> req_addrs;
338
    Version version_range;
339
    std::optional<DeleteBitmapPB> delete_bitmap_keys = std::nullopt;
340
341
private:
342
    size_t task_cnt;
343
344
    bthread::Mutex butex;
345
    bthread::CountdownEvent event {1};
346
    bool done = false;
347
348
    Result<RemoteGetRowsetResult> result;
349
};
350
351
Result<std::vector<std::pair<std::string, int32_t>>> get_peer_replicas_addresses(
352
0
        const int64_t tablet_id) {
353
0
    auto* cluster_info = ExecEnv::GetInstance()->cluster_info();
354
0
    DCHECK_NE(cluster_info, nullptr);
355
0
    auto master_addr = cluster_info->master_fe_addr;
356
0
    TGetTabletReplicaInfosRequest req;
357
0
    req.tablet_ids.push_back(tablet_id);
358
0
    TGetTabletReplicaInfosResult resp;
359
0
    auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
360
0
            master_addr.hostname, master_addr.port,
361
0
            [&](FrontendServiceConnection& client) { client->getTabletReplicaInfos(resp, req); });
362
0
    if (!st) {
363
0
        return ResultError(Status::InternalError(
364
0
                "failed to get tablet replica infos, rpc error={}, tablet_id={}", st.to_string(),
365
0
                tablet_id));
366
0
    }
367
368
0
    auto it = resp.tablet_replica_infos.find(tablet_id);
369
0
    if (it == resp.tablet_replica_infos.end()) {
370
0
        return ResultError(Status::InternalError("replicas not found, tablet_id={}", tablet_id));
371
0
    }
372
0
    auto replicas = it->second;
373
0
    auto local_host = BackendOptions::get_localhost();
374
0
    bool include_local_host = false;
375
0
    DBUG_EXECUTE_IF("get_peer_replicas_address.enable_local_host", { include_local_host = true; });
376
0
    auto ret_view =
377
0
            replicas | std::views::filter([&local_host, include_local_host](const auto& replica) {
378
0
                return local_host.find(replica.host) == std::string::npos || include_local_host;
379
0
            }) |
380
0
            std::views::transform([](auto& replica) {
381
0
                return std::make_pair(std::move(replica.host), replica.brpc_port);
382
0
            });
383
0
    return std::vector(ret_view.begin(), ret_view.end());
384
0
}
385
386
Result<CaptureRowsetResult> BaseTablet::_remote_capture_rowsets(
387
0
        const Version& version_range) const {
388
0
    auto start_tm_us = MonotonicMicros();
389
0
    Defer defer {
390
0
            [&]() { g_remote_fetch_tablet_rowsets_latency << MonotonicMicros() - start_tm_us; }};
391
#ifndef BE_TEST
392
    auto maybe_be_addresses = get_peer_replicas_addresses(tablet_id());
393
#else
394
0
    Result<std::vector<std::pair<std::string, int32_t>>> maybe_be_addresses;
395
0
    TEST_SYNC_POINT_CALLBACK("get_peer_replicas_addresses", &maybe_be_addresses);
396
0
#endif
397
0
    DBUG_EXECUTE_IF("Tablet::_remote_get_rowsets_meta.inject_replica_address_fail",
398
0
                    { maybe_be_addresses = ResultError(Status::InternalError("inject failure")); });
399
0
    if (!maybe_be_addresses) {
400
0
        return ResultError(std::move(maybe_be_addresses.error()));
401
0
    }
402
0
    auto be_addresses = std::move(maybe_be_addresses.value());
403
0
    if (be_addresses.empty()) {
404
0
        LOG(WARNING) << "no peers replica for tablet=" << tablet_id();
405
0
        return ResultError(Status::InternalError("no replicas for tablet={}", tablet_id()));
406
0
    }
407
408
0
    auto cntl = std::make_shared<GetRowsetsCntl>();
409
0
    cntl->tablet_id = tablet_id();
410
0
    cntl->req_addrs = std::move(be_addresses);
411
0
    cntl->version_range = version_range;
412
0
    bool is_mow = keys_type() == KeysType::UNIQUE_KEYS && enable_unique_key_merge_on_write();
413
0
    CaptureRowsetResult result;
414
0
    if (is_mow) {
415
0
        result.delete_bitmap =
416
0
                std::make_unique<DeleteBitmap>(_tablet_meta->delete_bitmap().snapshot());
417
0
        DeleteBitmapPB delete_bitmap_keys;
418
0
        auto keyset = result.delete_bitmap->delete_bitmap |
419
0
                      std::views::transform([](const auto& kv) { return kv.first; });
420
0
        for (const auto& key : keyset) {
421
0
            const auto& [rs_id, seg_id, version] = key;
422
0
            delete_bitmap_keys.mutable_rowset_ids()->Add(rs_id.to_string());
423
0
            delete_bitmap_keys.mutable_segment_ids()->Add(seg_id);
424
0
            delete_bitmap_keys.mutable_versions()->Add(version);
425
0
        }
426
0
        cntl->delete_bitmap_keys = std::move(delete_bitmap_keys);
427
0
    }
428
429
0
    RETURN_IF_ERROR_RESULT(cntl->start_req_bg());
430
0
    auto maybe_meta = cntl->wait_for_ret();
431
0
    if (!maybe_meta) {
432
0
        auto err = Status::InternalError(
433
0
                "tried to get rowsets from peer replicas and failed, "
434
0
                "reason={}",
435
0
                maybe_meta.error());
436
0
        return ResultError(std::move(err));
437
0
    }
438
439
0
    auto& remote_meta = maybe_meta.value();
440
0
    const auto& rs_metas = remote_meta.rowsets;
441
0
    for (const auto& rs_meta : rs_metas) {
442
0
        RowsetSharedPtr rs;
443
0
        auto st = RowsetFactory::create_rowset(_tablet_meta->tablet_schema(), {}, rs_meta, &rs);
444
0
        if (!st) {
445
0
            return ResultError(std::move(st));
446
0
        }
447
0
        result.rowsets.push_back(std::move(rs));
448
0
    }
449
0
    if (is_mow) {
450
        DCHECK_NE(result.delete_bitmap, nullptr);
451
0
        result.delete_bitmap->merge(*remote_meta.delete_bitmap);
452
0
    }
453
0
    return result;
454
0
}
455
456
} // namespace doris