Coverage Report

Created: 2026-03-24 20:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/version_graph.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/version_graph.h"
19
20
#include <cctz/time_zone.h>
21
#include <stddef.h>
22
23
#include <algorithm>
24
// IWYU pragma: no_include <bits/chrono.h>
25
#include <chrono> // IWYU pragma: keep
26
#include <list>
27
#include <memory>
28
#include <optional>
29
#include <ostream>
30
#include <ranges>
31
#include <utility>
32
33
#include "common/logging.h"
34
35
namespace doris {
36
#include "common/compile_check_begin.h"
37
38
using namespace ErrorCode;
39
40
void TimestampedVersionTracker::construct_versioned_tracker(
41
0
        const RowsetMetaMapContainer& rs_metas) {
42
0
    construct_versioned_tracker(std::views::values(rs_metas));
43
0
}
44
45
void TimestampedVersionTracker::construct_versioned_tracker(
46
1.55k
        const RowsetMetaMapContainer& rs_metas, const RowsetMetaMapContainer& stale_metas) {
47
1.55k
    construct_versioned_tracker(std::views::values(rs_metas), std::views::values(stale_metas));
48
1.55k
}
49
50
bool TimestampedVersionTracker::_find_path_from_stale_map(
51
        const std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>>&
52
                stale_map,
53
        int64_t first_version, int64_t second_version,
54
32
        std::vector<RowsetMetaSharedPtr>* stale_path) {
55
32
    auto first_iter = stale_map.find(first_version);
56
    // If `first_version` not in `stale_map`, there is no path.
57
32
    if (first_iter == stale_map.end()) {
58
18
        return false;
59
18
    }
60
14
    auto& second_version_map = first_iter->second;
61
14
    auto second_iter = second_version_map.find(second_version);
62
    // If second_version in `stale_map`, find a path.
63
14
    if (second_iter != second_version_map.end()) {
64
6
        auto row_meta = second_iter->second;
65
        // Add rowset to path.
66
6
        stale_path->push_back(row_meta);
67
6
        return true;
68
6
    }
69
70
    // Traverse the first version map to backtracking  `_find_path_from_stale_map`.
71
8
    auto map_iter = second_version_map.begin();
72
10
    while (map_iter != second_version_map.end()) {
73
        // The version greater than `second_version`, we can't find path in `stale_map`.
74
8
        if (map_iter->first > second_version) {
75
0
            map_iter++;
76
0
            continue;
77
0
        }
78
        // Backtracking `_find_path_from_stale_map` find from `map_iter->first + 1` to `second_version`.
79
8
        stale_path->push_back(map_iter->second);
80
8
        bool r = _find_path_from_stale_map(stale_map, map_iter->first + 1, second_version,
81
8
                                           stale_path);
82
8
        if (r) {
83
6
            return true;
84
6
        }
85
        // There is no path in current version, pop and continue.
86
2
        stale_path->pop_back();
87
2
        map_iter++;
88
2
    }
89
90
2
    return false;
91
8
}
92
93
70
void TimestampedVersionTracker::get_stale_version_path_json_doc(rapidjson::Document& path_arr) {
94
70
    auto path_arr_iter = _stale_version_path_map.begin();
95
96
    // Do loop version path.
97
266
    while (path_arr_iter != _stale_version_path_map.end()) {
98
196
        auto path_id = path_arr_iter->first;
99
196
        auto path_version_path = path_arr_iter->second;
100
101
196
        rapidjson::Document item;
102
196
        item.SetObject();
103
        // Add `path_id` to item.
104
196
        auto path_id_str = std::to_string(path_id);
105
196
        rapidjson::Value path_id_value;
106
196
        path_id_value.SetString(path_id_str.c_str(),
107
196
                                static_cast<rapidjson::SizeType>(path_id_str.length()),
108
196
                                path_arr.GetAllocator());
109
196
        item.AddMember("path id", path_id_value, path_arr.GetAllocator());
110
111
        // Add max create time to item.
112
196
        auto time_zone = cctz::local_time_zone();
113
114
196
        auto tp = std::chrono::system_clock::from_time_t(path_version_path->max_create_time());
115
196
        auto create_time_str = cctz::format("%Y-%m-%d %H:%M:%S %z", tp, time_zone);
116
117
196
        rapidjson::Value create_time_value;
118
196
        create_time_value.SetString(create_time_str.c_str(),
119
196
                                    static_cast<rapidjson::SizeType>(create_time_str.length()),
120
196
                                    path_arr.GetAllocator());
121
196
        item.AddMember("last create time", create_time_value, path_arr.GetAllocator());
122
123
        // Add path list to item.
124
196
        std::stringstream path_list_stream;
125
196
        path_list_stream << path_id_str;
126
196
        auto path_list_ptr = path_version_path->timestamped_versions();
127
196
        auto path_list_iter = path_list_ptr.begin();
128
1.29k
        while (path_list_iter != path_list_ptr.end()) {
129
1.09k
            path_list_stream << " -> ";
130
1.09k
            path_list_stream << "[";
131
1.09k
            path_list_stream << (*path_list_iter)->version().first;
132
1.09k
            path_list_stream << "-";
133
1.09k
            path_list_stream << (*path_list_iter)->version().second;
134
1.09k
            path_list_stream << "]";
135
1.09k
            path_list_iter++;
136
1.09k
        }
137
196
        std::string path_list = path_list_stream.str();
138
196
        rapidjson::Value path_list_value;
139
196
        path_list_value.SetString(path_list.c_str(),
140
196
                                  static_cast<rapidjson::SizeType>(path_list.length()),
141
196
                                  path_arr.GetAllocator());
142
196
        item.AddMember("path list", path_list_value, path_arr.GetAllocator());
143
144
        // Add item to `path_arr`.
145
196
        path_arr.PushBack(item, path_arr.GetAllocator());
146
147
196
        path_arr_iter++;
148
196
    }
149
70
}
150
151
void TimestampedVersionTracker::recover_versioned_tracker(
152
2
        const std::map<int64_t, PathVersionListSharedPtr>& stale_version_path_map) {
153
2
    auto _path_map_iter = stale_version_path_map.begin();
154
    // Recover `stale_version_path_map`.
155
2
    while (_path_map_iter != stale_version_path_map.end()) {
156
        // Add `PathVersionListSharedPtr` to map.
157
0
        _stale_version_path_map[_path_map_iter->first] = _path_map_iter->second;
158
159
0
        std::vector<TimestampedVersionSharedPtr>& timestamped_versions =
160
0
                _path_map_iter->second->timestamped_versions();
161
0
        std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter =
162
0
                timestamped_versions.begin();
163
0
        while (version_path_iter != timestamped_versions.end()) {
164
            // Add version to `_version_graph`.
165
0
            _version_graph.add_version_to_graph((*version_path_iter)->version());
166
0
            ++version_path_iter;
167
0
        }
168
0
        ++_path_map_iter;
169
0
    }
170
2
    LOG(INFO) << "recover_versioned_tracker current map info " << get_current_path_map_str();
171
2
}
172
173
23.7k
void TimestampedVersionTracker::add_version(const Version& version) {
174
23.7k
    _version_graph.add_version_to_graph(version);
175
23.7k
}
176
177
void TimestampedVersionTracker::add_stale_path_version(
178
294
        const std::vector<RowsetMetaSharedPtr>& stale_rs_metas) {
179
294
    if (stale_rs_metas.empty()) {
180
2
        VLOG_NOTICE << "there is no version in the stale_rs_metas.";
181
2
        return;
182
2
    }
183
184
292
    PathVersionListSharedPtr ptr(new TimestampedVersionPathContainer());
185
1.39k
    for (auto rs : stale_rs_metas) {
186
1.39k
        TimestampedVersionSharedPtr vt_ptr(new TimestampedVersion(rs->version(), rs->stale_at()));
187
1.39k
        ptr->add_timestamped_version(vt_ptr);
188
1.39k
    }
189
190
292
    std::vector<TimestampedVersionSharedPtr>& timestamped_versions = ptr->timestamped_versions();
191
192
292
    struct TimestampedVersionPtrCompare {
193
292
        bool operator()(const TimestampedVersionSharedPtr ptr1,
194
2.46k
                        const TimestampedVersionSharedPtr ptr2) {
195
2.46k
            return ptr1->version().first < ptr2->version().first;
196
2.46k
        }
197
292
    };
198
292
    sort(timestamped_versions.begin(), timestamped_versions.end(), TimestampedVersionPtrCompare());
199
292
    _stale_version_path_map[_next_path_id] = ptr;
200
292
    _next_path_id++;
201
292
}
202
203
// Capture consistent versions from graph.
204
Status TimestampedVersionTracker::capture_consistent_versions(
205
52
        const Version& spec_version, std::vector<Version>* version_path) const {
206
52
    return _version_graph.capture_consistent_versions(spec_version, version_path);
207
52
}
208
209
Status TimestampedVersionTracker::capture_consistent_versions_with_validator(
210
        const Version& spec_version, std::vector<Version>& version_path,
211
24
        const std::function<bool(int64_t, int64_t)>& validator) const {
212
24
    return _version_graph.capture_consistent_versions_with_validator(spec_version, version_path,
213
24
                                                                     validator);
214
24
}
215
216
Status TimestampedVersionTracker::capture_consistent_versions_prefer_cache(
217
        const Version& spec_version, std::vector<Version>& version_path,
218
26
        const std::function<bool(int64_t, int64_t)>& validator) const {
219
26
    return _version_graph.capture_consistent_versions_prefer_cache(spec_version, version_path,
220
26
                                                                   validator);
221
26
}
222
223
Status TimestampedVersionTracker::capture_consistent_versions_with_validator_mow(
224
        const Version& spec_version, std::vector<Version>& version_path,
225
24
        const std::function<bool(int64_t, int64_t)>& validator) const {
226
24
    return _version_graph.capture_consistent_versions_with_validator_mow(spec_version, version_path,
227
24
                                                                         validator);
228
24
}
229
230
void TimestampedVersionTracker::capture_expired_paths(
231
10
        int64_t stale_sweep_endtime, std::vector<int64_t>* path_version_vec) const {
232
10
    std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter =
233
10
            _stale_version_path_map.begin();
234
235
38
    while (iter != _stale_version_path_map.end()) {
236
28
        int64_t max_create_time = iter->second->max_create_time();
237
28
        if (max_create_time <= stale_sweep_endtime) {
238
18
            int64_t path_version = iter->first;
239
18
            path_version_vec->push_back(path_version);
240
18
        }
241
28
        ++iter;
242
28
    }
243
10
}
244
245
26
PathVersionListSharedPtr TimestampedVersionTracker::fetch_path_version_by_id(int64_t path_id) {
246
26
    if (_stale_version_path_map.count(path_id) == 0) {
247
0
        VLOG_NOTICE << "path version " << path_id << " does not exist!";
248
0
        return nullptr;
249
0
    }
250
251
26
    return _stale_version_path_map[path_id];
252
26
}
253
254
26
PathVersionListSharedPtr TimestampedVersionTracker::fetch_and_delete_path_by_id(int64_t path_id) {
255
26
    if (_stale_version_path_map.count(path_id) == 0) {
256
0
        VLOG_NOTICE << "path version " << path_id << " does not exist!";
257
0
        return nullptr;
258
0
    }
259
260
26
    VLOG_NOTICE << get_current_path_map_str();
261
26
    PathVersionListSharedPtr ptr = fetch_path_version_by_id(path_id);
262
263
26
    _stale_version_path_map.erase(path_id);
264
265
46
    for (auto& version : ptr->timestamped_versions()) {
266
46
        static_cast<void>(_version_graph.delete_version_from_graph(version->version()));
267
46
    }
268
26
    return ptr;
269
26
}
270
271
28
std::string TimestampedVersionTracker::get_current_path_map_str() {
272
28
    std::stringstream tracker_info;
273
28
    tracker_info << "current expired next_path_id " << _next_path_id << std::endl;
274
275
28
    std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter =
276
28
            _stale_version_path_map.begin();
277
98
    while (iter != _stale_version_path_map.end()) {
278
70
        tracker_info << "current expired path_version " << iter->first;
279
70
        std::vector<TimestampedVersionSharedPtr>& timestamped_versions =
280
70
                iter->second->timestamped_versions();
281
70
        std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter =
282
70
                timestamped_versions.begin();
283
70
        int64_t max_create_time = -1;
284
190
        while (version_path_iter != timestamped_versions.end()) {
285
120
            if (max_create_time < (*version_path_iter)->get_create_time()) {
286
70
                max_create_time = (*version_path_iter)->get_create_time();
287
70
            }
288
120
            tracker_info << " -> [";
289
120
            tracker_info << (*version_path_iter)->version().first;
290
120
            tracker_info << ",";
291
120
            tracker_info << (*version_path_iter)->version().second;
292
120
            tracker_info << "]";
293
294
120
            ++version_path_iter;
295
120
        }
296
297
70
        tracker_info << std::endl;
298
70
        ++iter;
299
70
    }
300
28
    return tracker_info.str();
301
28
}
302
303
2
double TimestampedVersionTracker::get_orphan_vertex_ratio() {
304
2
    return _version_graph.get_orphan_vertex_ratio();
305
2
}
306
307
10
std::string TimestampedVersionTracker::debug_string() const {
308
10
    return _version_graph.debug_string();
309
10
}
310
311
1.39k
void TimestampedVersionPathContainer::add_timestamped_version(TimestampedVersionSharedPtr version) {
312
    // Compare and refresh `_max_create_time`.
313
1.39k
    if (version->get_create_time() > _max_create_time) {
314
292
        _max_create_time = version->get_create_time();
315
292
    }
316
1.39k
    _timestamped_versions_container.push_back(version);
317
1.39k
}
318
319
614
std::vector<TimestampedVersionSharedPtr>& TimestampedVersionPathContainer::timestamped_versions() {
320
614
    return _timestamped_versions_container;
321
614
}
322
323
void VersionGraph::construct_version_graph(const RowsetMetaMapContainer& rs_metas,
324
0
                                           int64_t* max_version) {
325
0
    construct_version_graph(std::views::values(rs_metas), max_version);
326
0
}
327
328
void VersionGraph::reconstruct_version_graph(const RowsetMetaMapContainer& rs_metas,
329
0
                                             int64_t* max_version) {
330
0
    reconstruct_version_graph(std::views::values(rs_metas), max_version);
331
0
}
332
333
23.7k
void VersionGraph::add_version_to_graph(const Version& version) {
334
    // Add version.first as new vertex of version graph if not exist.
335
23.7k
    int64_t start_vertex_value = version.first;
336
23.7k
    int64_t end_vertex_value = version.second + 1;
337
338
    // Add vertex to graph.
339
23.7k
    _add_vertex_to_graph(start_vertex_value);
340
23.7k
    _add_vertex_to_graph(end_vertex_value);
341
342
23.7k
    int64_t start_vertex_index = _vertex_index_map[start_vertex_value];
343
23.7k
    int64_t end_vertex_index = _vertex_index_map[end_vertex_value];
344
345
    // We assume this version is new version, so we just add two edges
346
    // into version graph. add one edge from `start_version` to `end_version`
347
    // Make sure the vertex's edges are sorted by version in descending order when inserting.
348
23.7k
    auto end_vertex_it = _version_graph[start_vertex_index].edges.begin();
349
23.8k
    while (end_vertex_it != _version_graph[start_vertex_index].edges.end()) {
350
22.9k
        if (_version_graph[*end_vertex_it].value < _version_graph[end_vertex_index].value) {
351
22.8k
            break;
352
22.8k
        }
353
74
        end_vertex_it++;
354
74
    }
355
23.7k
    _version_graph[start_vertex_index].edges.insert(end_vertex_it, end_vertex_index);
356
357
    // We add reverse edge(from end_version to start_version) to graph
358
    // Make sure the vertex's edges are sorted by version in descending order when inserting.
359
23.7k
    auto start_vertex_it = _version_graph[end_vertex_index].edges.begin();
360
24.2k
    while (start_vertex_it != _version_graph[end_vertex_index].edges.end()) {
361
464
        if (_version_graph[*start_vertex_it].value < _version_graph[start_vertex_index].value) {
362
34
            break;
363
34
        }
364
430
        start_vertex_it++;
365
430
    }
366
23.7k
    _version_graph[end_vertex_index].edges.insert(start_vertex_it, start_vertex_index);
367
23.7k
}
368
369
64
Status VersionGraph::delete_version_from_graph(const Version& version) {
370
64
    int64_t start_vertex_value = version.first;
371
64
    int64_t end_vertex_value = version.second + 1;
372
373
64
    if (_vertex_index_map.find(start_vertex_value) == _vertex_index_map.end() ||
374
64
        _vertex_index_map.find(end_vertex_value) == _vertex_index_map.end()) {
375
0
        return Status::Error<HEADER_DELETE_VERSION>(
376
0
                "vertex for version does not exists. version={}-{}", version.first, version.second);
377
0
    }
378
379
64
    int64_t start_vertex_index = _vertex_index_map[start_vertex_value];
380
64
    int64_t end_vertex_index = _vertex_index_map[end_vertex_value];
381
    // Remove edge and its reverse edge.
382
    // When there are same versions in edges, just remove the first version.
383
64
    auto start_edges_iter = _version_graph[start_vertex_index].edges.begin();
384
94
    while (start_edges_iter != _version_graph[start_vertex_index].edges.end()) {
385
94
        if (*start_edges_iter == end_vertex_index) {
386
64
            _version_graph[start_vertex_index].edges.erase(start_edges_iter);
387
64
            break;
388
64
        }
389
30
        start_edges_iter++;
390
30
    }
391
392
64
    auto end_edges_iter = _version_graph[end_vertex_index].edges.begin();
393
130
    while (end_edges_iter != _version_graph[end_vertex_index].edges.end()) {
394
130
        if (*end_edges_iter == start_vertex_index) {
395
64
            _version_graph[end_vertex_index].edges.erase(end_edges_iter);
396
64
            break;
397
64
        }
398
66
        end_edges_iter++;
399
66
    }
400
401
    // Here we do not delete vertex in `_version_graph` even if its edges are empty.
402
    // the `_version_graph` will be rebuilt when doing trash sweep.
403
64
    return Status::OK();
404
64
}
405
406
54.3k
void VersionGraph::_add_vertex_to_graph(int64_t vertex_value) {
407
    // Vertex with vertex_value already exists.
408
54.3k
    if (_vertex_index_map.find(vertex_value) != _vertex_index_map.end()) {
409
23.1k
        VLOG_NOTICE << "vertex with vertex value already exists. value=" << vertex_value;
410
23.1k
        return;
411
23.1k
    }
412
413
31.1k
    _version_graph.emplace_back(Vertex(vertex_value));
414
31.1k
    _vertex_index_map[vertex_value] = _version_graph.size() - 1;
415
31.1k
}
416
417
Status VersionGraph::capture_consistent_versions(const Version& spec_version,
418
56
                                                 std::vector<Version>* version_path) const {
419
56
    if (spec_version.first > spec_version.second) {
420
0
        return Status::Error<INVALID_ARGUMENT, false>(
421
0
                "invalid specified version. spec_version={}-{}", spec_version.first,
422
0
                spec_version.second);
423
0
    }
424
425
56
    int64_t cur_idx = -1;
426
64
    for (size_t i = 0; i < _version_graph.size(); i++) {
427
64
        if (_version_graph[i].value == spec_version.first) {
428
56
            cur_idx = i;
429
56
            break;
430
56
        }
431
64
    }
432
433
56
    if (cur_idx < 0) {
434
0
        return Status::InternalError<false>(
435
0
                "failed to find path in version_graph. spec_version: {}-{}", spec_version.first,
436
0
                spec_version.second);
437
0
    }
438
439
56
    int64_t end_value = spec_version.second + 1;
440
214
    while (_version_graph[cur_idx].value < end_value) {
441
160
        int64_t next_idx = -1;
442
168
        for (const auto& it : _version_graph[cur_idx].edges) {
443
            // Only consider incremental versions.
444
168
            if (_version_graph[it].value < _version_graph[cur_idx].value) {
445
2
                break;
446
2
            }
447
448
166
            if (_version_graph[it].value > end_value) {
449
8
                continue;
450
8
            }
451
452
            // Considering edges had been sorted by version in descending order,
453
            // This version is the largest version that smaller than `end_version`.
454
158
            next_idx = it;
455
158
            break;
456
166
        }
457
458
160
        if (next_idx > -1) {
459
158
            if (version_path != nullptr) {
460
158
                version_path->emplace_back(_version_graph[cur_idx].value,
461
158
                                           _version_graph[next_idx].value - 1);
462
158
            }
463
158
            cur_idx = next_idx;
464
158
        } else {
465
2
            return Status::InternalError<false>(
466
2
                    "fail to find path in version_graph. spec_version: {}-{}", spec_version.first,
467
2
                    spec_version.second);
468
2
        }
469
160
    }
470
471
54
    if (VLOG_TRACE_IS_ON && version_path != nullptr) {
472
0
        std::stringstream shortest_path_for_debug;
473
0
        for (const auto& version : *version_path) {
474
0
            shortest_path_for_debug << version << ' ';
475
0
        }
476
0
        VLOG_TRACE << "success to find path for spec_version. spec_version=" << spec_version
477
0
                   << ", path=" << shortest_path_for_debug.str();
478
0
    }
479
480
54
    return Status::OK();
481
56
}
482
483
Status VersionGraph::capture_consistent_versions_prefer_cache(
484
        const Version& spec_version, std::vector<Version>& version_path,
485
26
        const std::function<bool(int64_t, int64_t)>& validator) const {
486
26
    if (spec_version.first > spec_version.second) {
487
0
        return Status::Error<INVALID_ARGUMENT, false>(
488
0
                "invalid specified version. spec_version={}-{}", spec_version.first,
489
0
                spec_version.second);
490
0
    }
491
492
26
    int64_t cur_idx = -1;
493
26
    for (size_t i = 0; i < _version_graph.size(); i++) {
494
26
        if (_version_graph[i].value == spec_version.first) {
495
26
            cur_idx = i;
496
26
            break;
497
26
        }
498
26
    }
499
500
26
    if (cur_idx < 0) {
501
0
        return Status::InternalError<false>("failed to find path in version_graph. spec_version={}",
502
0
                                            spec_version.to_string());
503
0
    }
504
505
26
    int64_t end_value = spec_version.second + 1;
506
180
    while (_version_graph[cur_idx].value < end_value) {
507
154
        int64_t next_idx = -1;
508
154
        int64_t first_idx = -1;
509
214
        for (const auto& it : _version_graph[cur_idx].edges) {
510
            // Only consider incremental versions.
511
214
            if (_version_graph[it].value < _version_graph[cur_idx].value) {
512
30
                break;
513
30
            }
514
184
            if (first_idx == -1) {
515
154
                first_idx = it;
516
154
            }
517
518
184
            if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) {
519
60
                continue;
520
60
            }
521
522
124
            next_idx = it;
523
124
            break;
524
184
        }
525
526
154
        if (next_idx > -1) {
527
124
            version_path.emplace_back(_version_graph[cur_idx].value,
528
124
                                      _version_graph[next_idx].value - 1);
529
530
124
            cur_idx = next_idx;
531
124
        } else if (first_idx != -1) {
532
            // if all edges are not in cache, use the first edge if possible
533
30
            version_path.emplace_back(_version_graph[cur_idx].value,
534
30
                                      _version_graph[first_idx].value - 1);
535
30
            cur_idx = first_idx;
536
30
        } else {
537
0
            return Status::OK();
538
0
        }
539
154
    }
540
26
    return Status::OK();
541
26
}
542
543
Status VersionGraph::capture_consistent_versions_with_validator(
544
        const Version& spec_version, std::vector<Version>& version_path,
545
24
        const std::function<bool(int64_t, int64_t)>& validator) const {
546
24
    if (spec_version.first > spec_version.second) {
547
0
        return Status::Error<INVALID_ARGUMENT, false>(
548
0
                "invalid specified version. spec_version={}-{}", spec_version.first,
549
0
                spec_version.second);
550
0
    }
551
552
24
    int64_t cur_idx = -1;
553
24
    for (size_t i = 0; i < _version_graph.size(); i++) {
554
24
        if (_version_graph[i].value == spec_version.first) {
555
24
            cur_idx = i;
556
24
            break;
557
24
        }
558
24
    }
559
560
24
    if (cur_idx < 0) {
561
0
        return Status::InternalError<false>("failed to find path in version_graph. spec_version={}",
562
0
                                            spec_version.to_string());
563
0
    }
564
565
24
    int64_t end_value = spec_version.second + 1;
566
134
    while (_version_graph[cur_idx].value < end_value) {
567
124
        int64_t next_idx = -1;
568
148
        for (const auto& it : _version_graph[cur_idx].edges) {
569
            // Only consider incremental versions.
570
148
            if (_version_graph[it].value < _version_graph[cur_idx].value) {
571
14
                break;
572
14
            }
573
574
134
            if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) {
575
24
                continue;
576
24
            }
577
578
110
            next_idx = it;
579
110
            break;
580
134
        }
581
582
124
        if (next_idx > -1) {
583
110
            version_path.emplace_back(_version_graph[cur_idx].value,
584
110
                                      _version_graph[next_idx].value - 1);
585
586
110
            cur_idx = next_idx;
587
110
        } else {
588
14
            return Status::OK();
589
14
        }
590
124
    }
591
10
    return Status::OK();
592
24
}
593
594
Status VersionGraph::capture_consistent_versions_with_validator_mow(
595
        const Version& spec_version, std::vector<Version>& version_path,
596
24
        const std::function<bool(int64_t, int64_t)>& validator) const {
597
24
    if (spec_version.first > spec_version.second) {
598
0
        return Status::Error<INVALID_ARGUMENT, false>(
599
0
                "invalid specified version. spec_version={}-{}", spec_version.first,
600
0
                spec_version.second);
601
0
    }
602
603
24
    int64_t cur_idx = -1;
604
24
    for (size_t i = 0; i < _version_graph.size(); i++) {
605
24
        if (_version_graph[i].value == spec_version.first) {
606
24
            cur_idx = i;
607
24
            break;
608
24
        }
609
24
    }
610
611
24
    if (cur_idx < 0) {
612
0
        return Status::InternalError<false>("failed to find path in version_graph. spec_version={}",
613
0
                                            spec_version.to_string());
614
0
    }
615
616
24
    int64_t end_value = spec_version.second + 1;
617
134
    while (_version_graph[cur_idx].value < end_value) {
618
122
        int64_t next_idx = -1;
619
142
        for (const auto& it : _version_graph[cur_idx].edges) {
620
            // Only consider incremental versions.
621
142
            if (_version_graph[it].value < _version_graph[cur_idx].value) {
622
0
                break;
623
0
            }
624
625
142
            if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) {
626
32
                if (_version_graph[cur_idx].value + 1 == _version_graph[it].value) {
627
12
                    break;
628
12
                }
629
20
                end_value = std::min(_version_graph[it].value, end_value);
630
20
                continue;
631
32
            }
632
633
110
            next_idx = it;
634
110
            break;
635
142
        }
636
637
122
        if (next_idx > -1) {
638
110
            version_path.emplace_back(_version_graph[cur_idx].value,
639
110
                                      _version_graph[next_idx].value - 1);
640
641
110
            cur_idx = next_idx;
642
110
        } else {
643
12
            return Status::OK();
644
12
        }
645
122
    }
646
12
    return Status::OK();
647
24
}
648
649
4
double VersionGraph::get_orphan_vertex_ratio() {
650
4
    int64_t vertex_num = _version_graph.size();
651
4
    int64_t orphan_vertex_num = 0;
652
30
    for (auto& iter : _version_graph) {
653
30
        if (iter.edges.empty()) {
654
12
            ++orphan_vertex_num;
655
12
        }
656
30
    }
657
4
    return static_cast<double>(orphan_vertex_num) / static_cast<double>(vertex_num);
658
4
}
659
660
10
std::string VersionGraph::debug_string() const {
661
10
    std::stringstream ss;
662
10
    ss << "VersionGraph: [";
663
200
    for (size_t i = 0; i < _version_graph.size(); ++i) {
664
190
        ss << "{value: " << _version_graph[i].value << ", edges: [";
665
416
        for (const auto& edge : _version_graph[i].edges) {
666
416
            if (_version_graph[edge].value > _version_graph[i].value) {
667
208
                ss << _version_graph[edge].value << ", ";
668
208
            }
669
416
        }
670
190
        ss << "]}, ";
671
190
    }
672
10
    ss << "]";
673
10
    return ss.str();
674
10
}
675
676
#include "common/compile_check_end.h"
677
} // namespace doris