Coverage Report

Created: 2026-04-14 12:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/version_graph.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/version_graph.h"
19
20
#include <cctz/time_zone.h>
21
#include <stddef.h>
22
23
#include <algorithm>
24
// IWYU pragma: no_include <bits/chrono.h>
25
#include <chrono> // IWYU pragma: keep
26
#include <list>
27
#include <memory>
28
#include <optional>
29
#include <ostream>
30
#include <ranges>
31
#include <utility>
32
33
#include "common/logging.h"
34
35
namespace doris {
36
37
using namespace ErrorCode;
38
39
void TimestampedVersionTracker::construct_versioned_tracker(
40
0
        const RowsetMetaMapContainer& rs_metas) {
41
0
    construct_versioned_tracker(std::views::values(rs_metas));
42
0
}
43
44
void TimestampedVersionTracker::construct_versioned_tracker(
45
459k
        const RowsetMetaMapContainer& rs_metas, const RowsetMetaMapContainer& stale_metas) {
46
459k
    construct_versioned_tracker(std::views::values(rs_metas), std::views::values(stale_metas));
47
459k
}
48
49
bool TimestampedVersionTracker::_find_path_from_stale_map(
50
        const std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>>&
51
                stale_map,
52
        int64_t first_version, int64_t second_version,
53
132k
        std::vector<RowsetMetaSharedPtr>* stale_path) {
54
132k
    auto first_iter = stale_map.find(first_version);
55
    // If `first_version` not in `stale_map`, there is no path.
56
132k
    if (first_iter == stale_map.end()) {
57
68.2k
        return false;
58
68.2k
    }
59
64.4k
    auto& second_version_map = first_iter->second;
60
64.4k
    auto second_iter = second_version_map.find(second_version);
61
    // If second_version in `stale_map`, find a path.
62
64.4k
    if (second_iter != second_version_map.end()) {
63
9.09k
        auto row_meta = second_iter->second;
64
        // Add rowset to path.
65
9.09k
        stale_path->push_back(row_meta);
66
9.09k
        return true;
67
9.09k
    }
68
69
    // Traverse the first version map to backtracking  `_find_path_from_stale_map`.
70
55.4k
    auto map_iter = second_version_map.begin();
71
55.4k
    while (map_iter != second_version_map.end()) {
72
        // The version greater than `second_version`, we can't find path in `stale_map`.
73
55.4k
        if (map_iter->first > second_version) {
74
0
            map_iter++;
75
0
            continue;
76
0
        }
77
        // Backtracking `_find_path_from_stale_map` find from `map_iter->first + 1` to `second_version`.
78
55.4k
        stale_path->push_back(map_iter->second);
79
55.4k
        bool r = _find_path_from_stale_map(stale_map, map_iter->first + 1, second_version,
80
55.4k
                                           stale_path);
81
55.4k
        if (r) {
82
55.3k
            return true;
83
55.3k
        }
84
        // There is no path in current version, pop and continue.
85
1
        stale_path->pop_back();
86
1
        map_iter++;
87
1
    }
88
89
1
    return false;
90
55.4k
}
91
92
2.67k
void TimestampedVersionTracker::get_stale_version_path_json_doc(rapidjson::Document& path_arr) {
93
2.67k
    auto path_arr_iter = _stale_version_path_map.begin();
94
95
    // Do loop version path.
96
5.46k
    while (path_arr_iter != _stale_version_path_map.end()) {
97
2.78k
        auto path_id = path_arr_iter->first;
98
2.78k
        auto path_version_path = path_arr_iter->second;
99
100
2.78k
        rapidjson::Document item;
101
2.78k
        item.SetObject();
102
        // Add `path_id` to item.
103
2.78k
        auto path_id_str = std::to_string(path_id);
104
2.78k
        rapidjson::Value path_id_value;
105
2.78k
        path_id_value.SetString(path_id_str.c_str(),
106
2.78k
                                static_cast<rapidjson::SizeType>(path_id_str.length()),
107
2.78k
                                path_arr.GetAllocator());
108
2.78k
        item.AddMember("path id", path_id_value, path_arr.GetAllocator());
109
110
        // Add max create time to item.
111
2.78k
        auto time_zone = cctz::local_time_zone();
112
113
2.78k
        auto tp = std::chrono::system_clock::from_time_t(path_version_path->max_create_time());
114
2.78k
        auto create_time_str = cctz::format("%Y-%m-%d %H:%M:%S %z", tp, time_zone);
115
116
2.78k
        rapidjson::Value create_time_value;
117
2.78k
        create_time_value.SetString(create_time_str.c_str(),
118
2.78k
                                    static_cast<rapidjson::SizeType>(create_time_str.length()),
119
2.78k
                                    path_arr.GetAllocator());
120
2.78k
        item.AddMember("last create time", create_time_value, path_arr.GetAllocator());
121
122
        // Add path list to item.
123
2.78k
        std::stringstream path_list_stream;
124
2.78k
        path_list_stream << path_id_str;
125
2.78k
        auto path_list_ptr = path_version_path->timestamped_versions();
126
2.78k
        auto path_list_iter = path_list_ptr.begin();
127
15.6k
        while (path_list_iter != path_list_ptr.end()) {
128
12.8k
            path_list_stream << " -> ";
129
12.8k
            path_list_stream << "[";
130
12.8k
            path_list_stream << (*path_list_iter)->version().first;
131
12.8k
            path_list_stream << "-";
132
12.8k
            path_list_stream << (*path_list_iter)->version().second;
133
12.8k
            path_list_stream << "]";
134
12.8k
            path_list_iter++;
135
12.8k
        }
136
2.78k
        std::string path_list = path_list_stream.str();
137
2.78k
        rapidjson::Value path_list_value;
138
2.78k
        path_list_value.SetString(path_list.c_str(),
139
2.78k
                                  static_cast<rapidjson::SizeType>(path_list.length()),
140
2.78k
                                  path_arr.GetAllocator());
141
2.78k
        item.AddMember("path list", path_list_value, path_arr.GetAllocator());
142
143
        // Add item to `path_arr`.
144
2.78k
        path_arr.PushBack(item, path_arr.GetAllocator());
145
146
2.78k
        path_arr_iter++;
147
2.78k
    }
148
2.67k
}
149
150
void TimestampedVersionTracker::recover_versioned_tracker(
151
1
        const std::map<int64_t, PathVersionListSharedPtr>& stale_version_path_map) {
152
1
    auto _path_map_iter = stale_version_path_map.begin();
153
    // Recover `stale_version_path_map`.
154
1
    while (_path_map_iter != stale_version_path_map.end()) {
155
        // Add `PathVersionListSharedPtr` to map.
156
0
        _stale_version_path_map[_path_map_iter->first] = _path_map_iter->second;
157
158
0
        std::vector<TimestampedVersionSharedPtr>& timestamped_versions =
159
0
                _path_map_iter->second->timestamped_versions();
160
0
        std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter =
161
0
                timestamped_versions.begin();
162
0
        while (version_path_iter != timestamped_versions.end()) {
163
            // Add version to `_version_graph`.
164
0
            _version_graph.add_version_to_graph((*version_path_iter)->version());
165
0
            ++version_path_iter;
166
0
        }
167
0
        ++_path_map_iter;
168
0
    }
169
1
    LOG(INFO) << "recover_versioned_tracker current map info " << get_current_path_map_str();
170
1
}
171
172
439k
void TimestampedVersionTracker::add_version(const Version& version) {
173
439k
    _version_graph.add_version_to_graph(version);
174
439k
}
175
176
void TimestampedVersionTracker::add_stale_path_version(
177
17.0k
        const std::vector<RowsetMetaSharedPtr>& stale_rs_metas) {
178
17.0k
    if (stale_rs_metas.empty()) {
179
1
        VLOG_NOTICE << "there is no version in the stale_rs_metas.";
180
1
        return;
181
1
    }
182
183
17.0k
    PathVersionListSharedPtr ptr(new TimestampedVersionPathContainer());
184
129k
    for (auto rs : stale_rs_metas) {
185
129k
        TimestampedVersionSharedPtr vt_ptr(new TimestampedVersion(rs->version(), rs->stale_at()));
186
129k
        ptr->add_timestamped_version(vt_ptr);
187
129k
    }
188
189
17.0k
    std::vector<TimestampedVersionSharedPtr>& timestamped_versions = ptr->timestamped_versions();
190
191
17.0k
    struct TimestampedVersionPtrCompare {
192
17.0k
        bool operator()(const TimestampedVersionSharedPtr ptr1,
193
269k
                        const TimestampedVersionSharedPtr ptr2) {
194
269k
            return ptr1->version().first < ptr2->version().first;
195
269k
        }
196
17.0k
    };
197
17.0k
    sort(timestamped_versions.begin(), timestamped_versions.end(), TimestampedVersionPtrCompare());
198
17.0k
    _stale_version_path_map[_next_path_id] = ptr;
199
17.0k
    _next_path_id++;
200
17.0k
}
201
202
// Capture consistent versions from graph.
203
Status TimestampedVersionTracker::capture_consistent_versions(
204
1.49M
        const Version& spec_version, std::vector<Version>* version_path) const {
205
1.49M
    return _version_graph.capture_consistent_versions(spec_version, version_path);
206
1.49M
}
207
208
Status TimestampedVersionTracker::capture_consistent_versions_with_validator(
209
        const Version& spec_version, std::vector<Version>& version_path,
210
14
        const std::function<bool(int64_t, int64_t)>& validator) const {
211
14
    return _version_graph.capture_consistent_versions_with_validator(spec_version, version_path,
212
14
                                                                     validator);
213
14
}
214
215
Status TimestampedVersionTracker::capture_consistent_versions_prefer_cache(
216
        const Version& spec_version, std::vector<Version>& version_path,
217
15
        const std::function<bool(int64_t, int64_t)>& validator) const {
218
15
    return _version_graph.capture_consistent_versions_prefer_cache(spec_version, version_path,
219
15
                                                                   validator);
220
15
}
221
222
Status TimestampedVersionTracker::capture_consistent_versions_with_validator_mow(
223
        const Version& spec_version, std::vector<Version>& version_path,
224
14
        const std::function<bool(int64_t, int64_t)>& validator) const {
225
14
    return _version_graph.capture_consistent_versions_with_validator_mow(spec_version, version_path,
226
14
                                                                         validator);
227
14
}
228
229
void TimestampedVersionTracker::capture_expired_paths(
230
343k
        int64_t stale_sweep_endtime, std::vector<int64_t>* path_version_vec) const {
231
343k
    std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter =
232
343k
            _stale_version_path_map.begin();
233
234
366k
    while (iter != _stale_version_path_map.end()) {
235
23.4k
        int64_t max_create_time = iter->second->max_create_time();
236
23.4k
        if (max_create_time <= stale_sweep_endtime) {
237
11.1k
            int64_t path_version = iter->first;
238
11.1k
            path_version_vec->push_back(path_version);
239
11.1k
        }
240
23.4k
        ++iter;
241
23.4k
    }
242
343k
}
243
244
11.1k
PathVersionListSharedPtr TimestampedVersionTracker::fetch_path_version_by_id(int64_t path_id) {
245
11.1k
    if (_stale_version_path_map.count(path_id) == 0) {
246
0
        VLOG_NOTICE << "path version " << path_id << " does not exist!";
247
0
        return nullptr;
248
0
    }
249
250
11.1k
    return _stale_version_path_map[path_id];
251
11.1k
}
252
253
11.1k
PathVersionListSharedPtr TimestampedVersionTracker::fetch_and_delete_path_by_id(int64_t path_id) {
254
11.1k
    if (_stale_version_path_map.count(path_id) == 0) {
255
0
        VLOG_NOTICE << "path version " << path_id << " does not exist!";
256
0
        return nullptr;
257
0
    }
258
259
11.1k
    VLOG_NOTICE << get_current_path_map_str();
260
11.1k
    PathVersionListSharedPtr ptr = fetch_path_version_by_id(path_id);
261
262
11.1k
    _stale_version_path_map.erase(path_id);
263
264
85.0k
    for (auto& version : ptr->timestamped_versions()) {
265
85.0k
        static_cast<void>(_version_graph.delete_version_from_graph(version->version()));
266
85.0k
    }
267
11.1k
    return ptr;
268
11.1k
}
269
270
14
std::string TimestampedVersionTracker::get_current_path_map_str() {
271
14
    std::stringstream tracker_info;
272
14
    tracker_info << "current expired next_path_id " << _next_path_id << std::endl;
273
274
14
    std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter =
275
14
            _stale_version_path_map.begin();
276
49
    while (iter != _stale_version_path_map.end()) {
277
35
        tracker_info << "current expired path_version " << iter->first;
278
35
        std::vector<TimestampedVersionSharedPtr>& timestamped_versions =
279
35
                iter->second->timestamped_versions();
280
35
        std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter =
281
35
                timestamped_versions.begin();
282
35
        int64_t max_create_time = -1;
283
95
        while (version_path_iter != timestamped_versions.end()) {
284
60
            if (max_create_time < (*version_path_iter)->get_create_time()) {
285
35
                max_create_time = (*version_path_iter)->get_create_time();
286
35
            }
287
60
            tracker_info << " -> [";
288
60
            tracker_info << (*version_path_iter)->version().first;
289
60
            tracker_info << ",";
290
60
            tracker_info << (*version_path_iter)->version().second;
291
60
            tracker_info << "]";
292
293
60
            ++version_path_iter;
294
60
        }
295
296
35
        tracker_info << std::endl;
297
35
        ++iter;
298
35
    }
299
14
    return tracker_info.str();
300
14
}
301
302
6.28k
double TimestampedVersionTracker::get_orphan_vertex_ratio() {
303
6.28k
    return _version_graph.get_orphan_vertex_ratio();
304
6.28k
}
305
306
5
std::string TimestampedVersionTracker::debug_string() const {
307
5
    return _version_graph.debug_string();
308
5
}
309
310
129k
void TimestampedVersionPathContainer::add_timestamped_version(TimestampedVersionSharedPtr version) {
311
    // Compare and refresh `_max_create_time`.
312
129k
    if (version->get_create_time() > _max_create_time) {
313
26.2k
        _max_create_time = version->get_create_time();
314
26.2k
    }
315
129k
    _timestamped_versions_container.push_back(version);
316
129k
}
317
318
42.2k
std::vector<TimestampedVersionSharedPtr>& TimestampedVersionPathContainer::timestamped_versions() {
319
42.2k
    return _timestamped_versions_container;
320
42.2k
}
321
322
void VersionGraph::construct_version_graph(const RowsetMetaMapContainer& rs_metas,
323
0
                                           int64_t* max_version) {
324
0
    construct_version_graph(std::views::values(rs_metas), max_version);
325
0
}
326
327
void VersionGraph::reconstruct_version_graph(const RowsetMetaMapContainer& rs_metas,
328
0
                                             int64_t* max_version) {
329
0
    reconstruct_version_graph(std::views::values(rs_metas), max_version);
330
0
}
331
332
438k
void VersionGraph::add_version_to_graph(const Version& version) {
333
    // Add version.first as new vertex of version graph if not exist.
334
438k
    int64_t start_vertex_value = version.first;
335
438k
    int64_t end_vertex_value = version.second + 1;
336
337
    // Add vertex to graph.
338
438k
    _add_vertex_to_graph(start_vertex_value);
339
438k
    _add_vertex_to_graph(end_vertex_value);
340
341
438k
    int64_t start_vertex_index = _vertex_index_map[start_vertex_value];
342
438k
    int64_t end_vertex_index = _vertex_index_map[end_vertex_value];
343
344
    // We assume this version is new version, so we just add two edges
345
    // into version graph. add one edge from `start_version` to `end_version`
346
    // Make sure the vertex's edges are sorted by version in descending order when inserting.
347
438k
    auto end_vertex_it = _version_graph[start_vertex_index].edges.begin();
348
446k
    while (end_vertex_it != _version_graph[start_vertex_index].edges.end()) {
349
304k
        if (_version_graph[*end_vertex_it].value < _version_graph[end_vertex_index].value) {
350
296k
            break;
351
296k
        }
352
7.94k
        end_vertex_it++;
353
7.94k
    }
354
438k
    _version_graph[start_vertex_index].edges.insert(end_vertex_it, end_vertex_index);
355
356
    // We add reverse edge(from end_version to start_version) to graph
357
    // Make sure the vertex's edges are sorted by version in descending order when inserting.
358
438k
    auto start_vertex_it = _version_graph[end_vertex_index].edges.begin();
359
479k
    while (start_vertex_it != _version_graph[end_vertex_index].edges.end()) {
360
46.0k
        if (_version_graph[*start_vertex_it].value < _version_graph[start_vertex_index].value) {
361
4.77k
            break;
362
4.77k
        }
363
41.3k
        start_vertex_it++;
364
41.3k
    }
365
438k
    _version_graph[end_vertex_index].edges.insert(start_vertex_it, start_vertex_index);
366
438k
}
367
368
85.0k
Status VersionGraph::delete_version_from_graph(const Version& version) {
369
85.0k
    int64_t start_vertex_value = version.first;
370
85.0k
    int64_t end_vertex_value = version.second + 1;
371
372
85.0k
    if (_vertex_index_map.find(start_vertex_value) == _vertex_index_map.end() ||
373
85.0k
        _vertex_index_map.find(end_vertex_value) == _vertex_index_map.end()) {
374
0
        return Status::Error<HEADER_DELETE_VERSION>(
375
0
                "vertex for version does not exists. version={}-{}", version.first, version.second);
376
0
    }
377
378
85.0k
    int64_t start_vertex_index = _vertex_index_map[start_vertex_value];
379
85.0k
    int64_t end_vertex_index = _vertex_index_map[end_vertex_value];
380
    // Remove edge and its reverse edge.
381
    // When there are same versions in edges, just remove the first version.
382
85.0k
    auto start_edges_iter = _version_graph[start_vertex_index].edges.begin();
383
143k
    while (start_edges_iter != _version_graph[start_vertex_index].edges.end()) {
384
143k
        if (*start_edges_iter == end_vertex_index) {
385
85.0k
            _version_graph[start_vertex_index].edges.erase(start_edges_iter);
386
85.0k
            break;
387
85.0k
        }
388
58.8k
        start_edges_iter++;
389
58.8k
    }
390
391
85.0k
    auto end_edges_iter = _version_graph[end_vertex_index].edges.begin();
392
166k
    while (end_edges_iter != _version_graph[end_vertex_index].edges.end()) {
393
166k
        if (*end_edges_iter == start_vertex_index) {
394
85.0k
            _version_graph[end_vertex_index].edges.erase(end_edges_iter);
395
85.0k
            break;
396
85.0k
        }
397
81.4k
        end_edges_iter++;
398
81.4k
    }
399
400
    // Here we do not delete vertex in `_version_graph` even if its edges are empty.
401
    // the `_version_graph` will be rebuilt when doing trash sweep.
402
85.0k
    return Status::OK();
403
85.0k
}
404
405
1.76M
void VersionGraph::_add_vertex_to_graph(int64_t vertex_value) {
406
    // Vertex with vertex_value already exists.
407
1.76M
    if (_vertex_index_map.find(vertex_value) != _vertex_index_map.end()) {
408
333k
        VLOG_NOTICE << "vertex with vertex value already exists. value=" << vertex_value;
409
333k
        return;
410
333k
    }
411
412
1.43M
    _version_graph.emplace_back(Vertex(vertex_value));
413
1.43M
    _vertex_index_map[vertex_value] = _version_graph.size() - 1;
414
1.43M
}
415
416
Status VersionGraph::capture_consistent_versions(const Version& spec_version,
417
1.49M
                                                 std::vector<Version>* version_path) const {
418
1.49M
    if (spec_version.first > spec_version.second) {
419
0
        return Status::Error<INVALID_ARGUMENT, false>(
420
0
                "invalid specified version. spec_version={}-{}", spec_version.first,
421
0
                spec_version.second);
422
0
    }
423
424
1.49M
    int64_t cur_idx = -1;
425
1.50M
    for (size_t i = 0; i < _version_graph.size(); i++) {
426
1.50M
        if (_version_graph[i].value == spec_version.first) {
427
1.49M
            cur_idx = i;
428
1.49M
            break;
429
1.49M
        }
430
1.50M
    }
431
432
1.49M
    if (cur_idx < 0) {
433
0
        return Status::InternalError<false>(
434
0
                "failed to find path in version_graph. spec_version: {}-{}", spec_version.first,
435
0
                spec_version.second);
436
0
    }
437
438
1.49M
    int64_t end_value = spec_version.second + 1;
439
8.33M
    while (_version_graph[cur_idx].value < end_value) {
440
6.82M
        int64_t next_idx = -1;
441
6.82M
        for (const auto& it : _version_graph[cur_idx].edges) {
442
            // Only consider incremental versions.
443
6.82M
            if (_version_graph[it].value < _version_graph[cur_idx].value) {
444
1
                break;
445
1
            }
446
447
6.82M
            if (_version_graph[it].value > end_value) {
448
6
                continue;
449
6
            }
450
451
            // Considering edges had been sorted by version in descending order,
452
            // This version is the largest version that smaller than `end_version`.
453
6.82M
            next_idx = it;
454
6.82M
            break;
455
6.82M
        }
456
457
6.84M
        if (next_idx > -1) {
458
6.84M
            if (version_path != nullptr) {
459
6.83M
                version_path->emplace_back(_version_graph[cur_idx].value,
460
6.83M
                                           _version_graph[next_idx].value - 1);
461
6.83M
            }
462
6.84M
            cur_idx = next_idx;
463
18.4E
        } else {
464
18.4E
            return Status::InternalError<false>(
465
18.4E
                    "fail to find path in version_graph. spec_version: {}-{}", spec_version.first,
466
18.4E
                    spec_version.second);
467
18.4E
        }
468
6.82M
    }
469
470
1.51M
    if (VLOG_TRACE_IS_ON && version_path != nullptr) {
471
0
        std::stringstream shortest_path_for_debug;
472
0
        for (const auto& version : *version_path) {
473
0
            shortest_path_for_debug << version << ' ';
474
0
        }
475
0
        VLOG_TRACE << "success to find path for spec_version. spec_version=" << spec_version
476
0
                   << ", path=" << shortest_path_for_debug.str();
477
0
    }
478
479
1.51M
    return Status::OK();
480
1.49M
}
481
482
Status VersionGraph::capture_consistent_versions_prefer_cache(
483
        const Version& spec_version, std::vector<Version>& version_path,
484
15
        const std::function<bool(int64_t, int64_t)>& validator) const {
485
15
    if (spec_version.first > spec_version.second) {
486
0
        return Status::Error<INVALID_ARGUMENT, false>(
487
0
                "invalid specified version. spec_version={}-{}", spec_version.first,
488
0
                spec_version.second);
489
0
    }
490
491
15
    int64_t cur_idx = -1;
492
15
    for (size_t i = 0; i < _version_graph.size(); i++) {
493
15
        if (_version_graph[i].value == spec_version.first) {
494
15
            cur_idx = i;
495
15
            break;
496
15
        }
497
15
    }
498
499
15
    if (cur_idx < 0) {
500
0
        return Status::InternalError<false>("failed to find path in version_graph. spec_version={}",
501
0
                                            spec_version.to_string());
502
0
    }
503
504
15
    int64_t end_value = spec_version.second + 1;
505
134
    while (_version_graph[cur_idx].value < end_value) {
506
119
        int64_t next_idx = -1;
507
119
        int64_t first_idx = -1;
508
149
        for (const auto& it : _version_graph[cur_idx].edges) {
509
            // Only consider incremental versions.
510
149
            if (_version_graph[it].value < _version_graph[cur_idx].value) {
511
15
                break;
512
15
            }
513
134
            if (first_idx == -1) {
514
119
                first_idx = it;
515
119
            }
516
517
134
            if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) {
518
30
                continue;
519
30
            }
520
521
104
            next_idx = it;
522
104
            break;
523
134
        }
524
525
119
        if (next_idx > -1) {
526
104
            version_path.emplace_back(_version_graph[cur_idx].value,
527
104
                                      _version_graph[next_idx].value - 1);
528
529
104
            cur_idx = next_idx;
530
104
        } else if (first_idx != -1) {
531
            // if all edges are not in cache, use the first edge if possible
532
15
            version_path.emplace_back(_version_graph[cur_idx].value,
533
15
                                      _version_graph[first_idx].value - 1);
534
15
            cur_idx = first_idx;
535
15
        } else {
536
0
            return Status::OK();
537
0
        }
538
119
    }
539
15
    return Status::OK();
540
15
}
541
542
Status VersionGraph::capture_consistent_versions_with_validator(
543
        const Version& spec_version, std::vector<Version>& version_path,
544
14
        const std::function<bool(int64_t, int64_t)>& validator) const {
545
14
    if (spec_version.first > spec_version.second) {
546
0
        return Status::Error<INVALID_ARGUMENT, false>(
547
0
                "invalid specified version. spec_version={}-{}", spec_version.first,
548
0
                spec_version.second);
549
0
    }
550
551
14
    int64_t cur_idx = -1;
552
14
    for (size_t i = 0; i < _version_graph.size(); i++) {
553
14
        if (_version_graph[i].value == spec_version.first) {
554
14
            cur_idx = i;
555
14
            break;
556
14
        }
557
14
    }
558
559
14
    if (cur_idx < 0) {
560
0
        return Status::InternalError<false>("failed to find path in version_graph. spec_version={}",
561
0
                                            spec_version.to_string());
562
0
    }
563
564
14
    int64_t end_value = spec_version.second + 1;
565
111
    while (_version_graph[cur_idx].value < end_value) {
566
104
        int64_t next_idx = -1;
567
116
        for (const auto& it : _version_graph[cur_idx].edges) {
568
            // Only consider incremental versions.
569
116
            if (_version_graph[it].value < _version_graph[cur_idx].value) {
570
7
                break;
571
7
            }
572
573
109
            if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) {
574
12
                continue;
575
12
            }
576
577
97
            next_idx = it;
578
97
            break;
579
109
        }
580
581
104
        if (next_idx > -1) {
582
97
            version_path.emplace_back(_version_graph[cur_idx].value,
583
97
                                      _version_graph[next_idx].value - 1);
584
585
97
            cur_idx = next_idx;
586
97
        } else {
587
7
            return Status::OK();
588
7
        }
589
104
    }
590
7
    return Status::OK();
591
14
}
592
593
Status VersionGraph::capture_consistent_versions_with_validator_mow(
594
        const Version& spec_version, std::vector<Version>& version_path,
595
14
        const std::function<bool(int64_t, int64_t)>& validator) const {
596
14
    if (spec_version.first > spec_version.second) {
597
0
        return Status::Error<INVALID_ARGUMENT, false>(
598
0
                "invalid specified version. spec_version={}-{}", spec_version.first,
599
0
                spec_version.second);
600
0
    }
601
602
14
    int64_t cur_idx = -1;
603
14
    for (size_t i = 0; i < _version_graph.size(); i++) {
604
14
        if (_version_graph[i].value == spec_version.first) {
605
14
            cur_idx = i;
606
14
            break;
607
14
        }
608
14
    }
609
610
14
    if (cur_idx < 0) {
611
0
        return Status::InternalError<false>("failed to find path in version_graph. spec_version={}",
612
0
                                            spec_version.to_string());
613
0
    }
614
615
14
    int64_t end_value = spec_version.second + 1;
616
111
    while (_version_graph[cur_idx].value < end_value) {
617
103
        int64_t next_idx = -1;
618
113
        for (const auto& it : _version_graph[cur_idx].edges) {
619
            // Only consider incremental versions.
620
113
            if (_version_graph[it].value < _version_graph[cur_idx].value) {
621
0
                break;
622
0
            }
623
624
113
            if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) {
625
16
                if (_version_graph[cur_idx].value + 1 == _version_graph[it].value) {
626
6
                    break;
627
6
                }
628
10
                end_value = std::min(_version_graph[it].value, end_value);
629
10
                continue;
630
16
            }
631
632
97
            next_idx = it;
633
97
            break;
634
113
        }
635
636
103
        if (next_idx > -1) {
637
97
            version_path.emplace_back(_version_graph[cur_idx].value,
638
97
                                      _version_graph[next_idx].value - 1);
639
640
97
            cur_idx = next_idx;
641
97
        } else {
642
6
            return Status::OK();
643
6
        }
644
103
    }
645
8
    return Status::OK();
646
14
}
647
648
6.28k
double VersionGraph::get_orphan_vertex_ratio() {
649
6.28k
    int64_t vertex_num = _version_graph.size();
650
6.28k
    int64_t orphan_vertex_num = 0;
651
126k
    for (auto& iter : _version_graph) {
652
126k
        if (iter.edges.empty()) {
653
74.1k
            ++orphan_vertex_num;
654
74.1k
        }
655
126k
    }
656
6.28k
    return static_cast<double>(orphan_vertex_num) / static_cast<double>(vertex_num);
657
6.28k
}
658
659
5
std::string VersionGraph::debug_string() const {
660
5
    std::stringstream ss;
661
5
    ss << "VersionGraph: [";
662
100
    for (size_t i = 0; i < _version_graph.size(); ++i) {
663
95
        ss << "{value: " << _version_graph[i].value << ", edges: [";
664
208
        for (const auto& edge : _version_graph[i].edges) {
665
208
            if (_version_graph[edge].value > _version_graph[i].value) {
666
104
                ss << _version_graph[edge].value << ", ";
667
104
            }
668
208
        }
669
95
        ss << "]}, ";
670
95
    }
671
5
    ss << "]";
672
5
    return ss.str();
673
5
}
674
675
} // namespace doris