Coverage Report

Created: 2025-07-25 20:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/olap/version_graph.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/version_graph.h"
19
20
#include <cctz/time_zone.h>
21
#include <stddef.h>
22
23
#include <algorithm>
24
// IWYU pragma: no_include <bits/chrono.h>
25
#include <chrono> // IWYU pragma: keep
26
#include <list>
27
#include <memory>
28
#include <ostream>
29
#include <utility>
30
31
#include "common/logging.h"
32
33
namespace doris {
34
#include "common/compile_check_begin.h"
35
36
using namespace ErrorCode;
37
38
void TimestampedVersionTracker::_construct_versioned_tracker(
39
52
        const std::vector<RowsetMetaSharedPtr>& rs_metas) {
40
52
    int64_t max_version = 0;
41
42
    // construct the rowset graph
43
52
    _version_graph.reconstruct_version_graph(rs_metas, &max_version);
44
52
}
45
46
void TimestampedVersionTracker::construct_versioned_tracker(
47
12
        const std::vector<RowsetMetaSharedPtr>& rs_metas) {
48
12
    if (rs_metas.empty()) {
49
0
        VLOG_NOTICE << "there is no version in the header.";
50
0
        return;
51
0
    }
52
12
    _stale_version_path_map.clear();
53
12
    _next_path_id = 1;
54
12
    _construct_versioned_tracker(rs_metas);
55
12
}
56
57
void TimestampedVersionTracker::construct_versioned_tracker(
58
        const std::vector<RowsetMetaSharedPtr>& rs_metas,
59
562
        const std::vector<RowsetMetaSharedPtr>& stale_metas) {
60
562
    if (rs_metas.empty()) {
61
522
        VLOG_NOTICE << "there is no version in the header.";
62
522
        return;
63
522
    }
64
40
    _stale_version_path_map.clear();
65
40
    _next_path_id = 1;
66
40
    _construct_versioned_tracker(rs_metas);
67
68
    // Init `_stale_version_path_map`.
69
40
    _init_stale_version_path_map(rs_metas, stale_metas);
70
40
}
71
72
void TimestampedVersionTracker::_init_stale_version_path_map(
73
        const std::vector<RowsetMetaSharedPtr>& rs_metas,
74
40
        const std::vector<RowsetMetaSharedPtr>& stale_metas) {
75
40
    if (stale_metas.empty()) {
76
39
        return;
77
39
    }
78
79
    // Sort stale meta by version diff (second version - first version).
80
1
    std::list<RowsetMetaSharedPtr> sorted_stale_metas;
81
7
    for (auto& rs : stale_metas) {
82
7
        sorted_stale_metas.emplace_back(rs);
83
7
    }
84
85
    // 1. sort the existing rowsets by version in ascending order.
86
14
    sorted_stale_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
87
        // Compare by version diff between `version.first` and `version.second`.
88
14
        int64_t a_diff = a->version().second - a->version().first;
89
14
        int64_t b_diff = b->version().second - b->version().first;
90
91
14
        int64_t diff = a_diff - b_diff;
92
14
        if (diff < 0) {
93
5
            return true;
94
9
        } else if (diff > 0) {
95
4
            return false;
96
4
        }
97
        // When the version diff is equal, compare the rowset`s create time
98
5
        return a->creation_time() < b->creation_time();
99
14
    });
100
101
    // first_version -> (second_version -> rowset_meta)
102
1
    std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>> stale_map;
103
104
    // 2. generate stale path from stale_metas. traverse sorted_stale_metas and each time add stale_meta to stale_map.
105
    // when a stale path in stale_map can replace stale_meta in sorted_stale_metas, stale_map remove rowset_metas of a stale path
106
    // and add the path to `_stale_version_path_map`.
107
7
    for (auto& stale_meta : sorted_stale_metas) {
108
7
        std::vector<RowsetMetaSharedPtr> stale_path;
109
        // 2.1 find a path in `stale_map` can replace current `stale_meta` version.
110
7
        bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(),
111
7
                                           stale_meta->end_version(), &stale_path);
112
113
        // 2.2 add version to `version_graph`.
114
7
        Version stale_meta_version = stale_meta->version();
115
7
        add_version(stale_meta_version);
116
117
        // 2.3 find the path.
118
7
        if (r) {
119
            // Add the path to `_stale_version_path_map`.
120
1
            add_stale_path_version(stale_path);
121
            // Remove `stale_path` from `stale_map`.
122
2
            for (auto stale_item : stale_path) {
123
2
                stale_map[stale_item->start_version()].erase(stale_item->end_version());
124
125
2
                if (stale_map[stale_item->start_version()].empty()) {
126
2
                    stale_map.erase(stale_item->start_version());
127
2
                }
128
2
            }
129
1
        }
130
131
        // 2.4 add `stale_meta` to `stale_map`.
132
7
        auto start_iter = stale_map.find(stale_meta->start_version());
133
7
        if (start_iter != stale_map.end()) {
134
0
            start_iter->second[stale_meta->end_version()] = stale_meta;
135
7
        } else {
136
7
            std::unordered_map<int64_t, RowsetMetaSharedPtr> item;
137
7
            item[stale_meta->end_version()] = stale_meta;
138
7
            stale_map[stale_meta->start_version()] = std::move(item);
139
7
        }
140
7
    }
141
142
    // 3. generate stale path from `rs_metas`.
143
5
    for (auto& stale_meta : rs_metas) {
144
5
        std::vector<RowsetMetaSharedPtr> stale_path;
145
        // 3.1 find a path in stale_map can replace current `stale_meta` version.
146
5
        bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(),
147
5
                                           stale_meta->end_version(), &stale_path);
148
149
        // 3.2 find the path.
150
5
        if (r) {
151
            // Add the path to `_stale_version_path_map`.
152
2
            add_stale_path_version(stale_path);
153
            // Remove `stale_path` from `stale_map`.
154
4
            for (auto stale_item : stale_path) {
155
4
                stale_map[stale_item->start_version()].erase(stale_item->end_version());
156
157
4
                if (stale_map[stale_item->start_version()].empty()) {
158
4
                    stale_map.erase(stale_item->start_version());
159
4
                }
160
4
            }
161
2
        }
162
5
    }
163
164
    // 4. process remain stale `rowset_meta` in `stale_map`.
165
1
    auto map_iter = stale_map.begin();
166
2
    while (map_iter != stale_map.end()) {
167
1
        auto second_iter = map_iter->second.begin();
168
2
        while (second_iter != map_iter->second.end()) {
169
            // Each remain stale `rowset_meta` generate a stale path.
170
1
            std::vector<RowsetMetaSharedPtr> stale_path;
171
1
            stale_path.push_back(second_iter->second);
172
1
            add_stale_path_version(stale_path);
173
174
1
            second_iter++;
175
1
        }
176
1
        map_iter++;
177
1
    }
178
1
}
179
180
bool TimestampedVersionTracker::_find_path_from_stale_map(
181
        const std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>>&
182
                stale_map,
183
        int64_t first_version, int64_t second_version,
184
16
        std::vector<RowsetMetaSharedPtr>* stale_path) {
185
16
    auto first_iter = stale_map.find(first_version);
186
    // If `first_version` not in `stale_map`, there is no path.
187
16
    if (first_iter == stale_map.end()) {
188
9
        return false;
189
9
    }
190
7
    auto& second_version_map = first_iter->second;
191
7
    auto second_iter = second_version_map.find(second_version);
192
    // If second_version in `stale_map`, find a path.
193
7
    if (second_iter != second_version_map.end()) {
194
3
        auto row_meta = second_iter->second;
195
        // Add rowset to path.
196
3
        stale_path->push_back(row_meta);
197
3
        return true;
198
3
    }
199
200
    // Traverse the first version map to backtracking  `_find_path_from_stale_map`.
201
4
    auto map_iter = second_version_map.begin();
202
5
    while (map_iter != second_version_map.end()) {
203
        // The version greater than `second_version`, we can't find path in `stale_map`.
204
4
        if (map_iter->first > second_version) {
205
0
            map_iter++;
206
0
            continue;
207
0
        }
208
        // Backtracking `_find_path_from_stale_map` find from `map_iter->first + 1` to `second_version`.
209
4
        stale_path->push_back(map_iter->second);
210
4
        bool r = _find_path_from_stale_map(stale_map, map_iter->first + 1, second_version,
211
4
                                           stale_path);
212
4
        if (r) {
213
3
            return true;
214
3
        }
215
        // There is no path in current version, pop and continue.
216
1
        stale_path->pop_back();
217
1
        map_iter++;
218
1
    }
219
220
1
    return false;
221
4
}
222
223
2
void TimestampedVersionTracker::get_stale_version_path_json_doc(rapidjson::Document& path_arr) {
224
2
    auto path_arr_iter = _stale_version_path_map.begin();
225
226
    // Do loop version path.
227
6
    while (path_arr_iter != _stale_version_path_map.end()) {
228
4
        auto path_id = path_arr_iter->first;
229
4
        auto path_version_path = path_arr_iter->second;
230
231
4
        rapidjson::Document item;
232
4
        item.SetObject();
233
        // Add `path_id` to item.
234
4
        auto path_id_str = std::to_string(path_id);
235
4
        rapidjson::Value path_id_value;
236
4
        path_id_value.SetString(path_id_str.c_str(),
237
4
                                static_cast<rapidjson::SizeType>(path_id_str.length()),
238
4
                                path_arr.GetAllocator());
239
4
        item.AddMember("path id", path_id_value, path_arr.GetAllocator());
240
241
        // Add max create time to item.
242
4
        auto time_zone = cctz::local_time_zone();
243
244
4
        auto tp = std::chrono::system_clock::from_time_t(path_version_path->max_create_time());
245
4
        auto create_time_str = cctz::format("%Y-%m-%d %H:%M:%S %z", tp, time_zone);
246
247
4
        rapidjson::Value create_time_value;
248
4
        create_time_value.SetString(create_time_str.c_str(),
249
4
                                    static_cast<rapidjson::SizeType>(create_time_str.length()),
250
4
                                    path_arr.GetAllocator());
251
4
        item.AddMember("last create time", create_time_value, path_arr.GetAllocator());
252
253
        // Add path list to item.
254
4
        std::stringstream path_list_stream;
255
4
        path_list_stream << path_id_str;
256
4
        auto path_list_ptr = path_version_path->timestamped_versions();
257
4
        auto path_list_iter = path_list_ptr.begin();
258
11
        while (path_list_iter != path_list_ptr.end()) {
259
7
            path_list_stream << " -> ";
260
7
            path_list_stream << "[";
261
7
            path_list_stream << (*path_list_iter)->version().first;
262
7
            path_list_stream << "-";
263
7
            path_list_stream << (*path_list_iter)->version().second;
264
7
            path_list_stream << "]";
265
7
            path_list_iter++;
266
7
        }
267
4
        std::string path_list = path_list_stream.str();
268
4
        rapidjson::Value path_list_value;
269
4
        path_list_value.SetString(path_list.c_str(),
270
4
                                  static_cast<rapidjson::SizeType>(path_list.length()),
271
4
                                  path_arr.GetAllocator());
272
4
        item.AddMember("path list", path_list_value, path_arr.GetAllocator());
273
274
        // Add item to `path_arr`.
275
4
        path_arr.PushBack(item, path_arr.GetAllocator());
276
277
4
        path_arr_iter++;
278
4
    }
279
2
}
280
281
void TimestampedVersionTracker::recover_versioned_tracker(
282
1
        const std::map<int64_t, PathVersionListSharedPtr>& stale_version_path_map) {
283
1
    auto _path_map_iter = stale_version_path_map.begin();
284
    // Recover `stale_version_path_map`.
285
1
    while (_path_map_iter != stale_version_path_map.end()) {
286
        // Add `PathVersionListSharedPtr` to map.
287
0
        _stale_version_path_map[_path_map_iter->first] = _path_map_iter->second;
288
289
0
        std::vector<TimestampedVersionSharedPtr>& timestamped_versions =
290
0
                _path_map_iter->second->timestamped_versions();
291
0
        std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter =
292
0
                timestamped_versions.begin();
293
0
        while (version_path_iter != timestamped_versions.end()) {
294
            // Add version to `_version_graph`.
295
0
            _version_graph.add_version_to_graph((*version_path_iter)->version());
296
0
            ++version_path_iter;
297
0
        }
298
0
        ++_path_map_iter;
299
0
    }
300
1
    LOG(INFO) << "recover_versioned_tracker current map info " << get_current_path_map_str();
301
1
}
302
303
10.9k
void TimestampedVersionTracker::add_version(const Version& version) {
304
10.9k
    _version_graph.add_version_to_graph(version);
305
10.9k
}
306
307
void TimestampedVersionTracker::add_stale_path_version(
308
42
        const std::vector<RowsetMetaSharedPtr>& stale_rs_metas) {
309
42
    if (stale_rs_metas.empty()) {
310
1
        VLOG_NOTICE << "there is no version in the stale_rs_metas.";
311
1
        return;
312
1
    }
313
314
41
    PathVersionListSharedPtr ptr(new TimestampedVersionPathContainer());
315
94
    for (auto rs : stale_rs_metas) {
316
94
        TimestampedVersionSharedPtr vt_ptr(
317
94
                new TimestampedVersion(rs->version(), rs->creation_time()));
318
94
        ptr->add_timestamped_version(vt_ptr);
319
94
    }
320
321
41
    std::vector<TimestampedVersionSharedPtr>& timestamped_versions = ptr->timestamped_versions();
322
323
41
    struct TimestampedVersionPtrCompare {
324
41
        bool operator()(const TimestampedVersionSharedPtr ptr1,
325
132
                        const TimestampedVersionSharedPtr ptr2) {
326
132
            return ptr1->version().first < ptr2->version().first;
327
132
        }
328
41
    };
329
41
    sort(timestamped_versions.begin(), timestamped_versions.end(), TimestampedVersionPtrCompare());
330
41
    _stale_version_path_map[_next_path_id] = ptr;
331
41
    _next_path_id++;
332
41
}
333
334
// Capture consistent versions from graph.
335
Status TimestampedVersionTracker::capture_consistent_versions(
336
21
        const Version& spec_version, std::vector<Version>* version_path) const {
337
21
    return _version_graph.capture_consistent_versions(spec_version, version_path);
338
21
}
339
340
void TimestampedVersionTracker::capture_expired_paths(
341
3
        int64_t stale_sweep_endtime, std::vector<int64_t>* path_version_vec) const {
342
3
    std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter =
343
3
            _stale_version_path_map.begin();
344
345
15
    while (iter != _stale_version_path_map.end()) {
346
12
        int64_t max_create_time = iter->second->max_create_time();
347
12
        if (max_create_time <= stale_sweep_endtime) {
348
8
            int64_t path_version = iter->first;
349
8
            path_version_vec->push_back(path_version);
350
8
        }
351
12
        ++iter;
352
12
    }
353
3
}
354
355
13
PathVersionListSharedPtr TimestampedVersionTracker::fetch_path_version_by_id(int64_t path_id) {
356
13
    if (_stale_version_path_map.count(path_id) == 0) {
357
0
        VLOG_NOTICE << "path version " << path_id << " does not exist!";
358
0
        return nullptr;
359
0
    }
360
361
13
    return _stale_version_path_map[path_id];
362
13
}
363
364
13
PathVersionListSharedPtr TimestampedVersionTracker::fetch_and_delete_path_by_id(int64_t path_id) {
365
13
    if (_stale_version_path_map.count(path_id) == 0) {
366
0
        VLOG_NOTICE << "path version " << path_id << " does not exist!";
367
0
        return nullptr;
368
0
    }
369
370
13
    VLOG_NOTICE << get_current_path_map_str();
371
13
    PathVersionListSharedPtr ptr = fetch_path_version_by_id(path_id);
372
373
13
    _stale_version_path_map.erase(path_id);
374
375
23
    for (auto& version : ptr->timestamped_versions()) {
376
23
        static_cast<void>(_version_graph.delete_version_from_graph(version->version()));
377
23
    }
378
13
    return ptr;
379
13
}
380
381
1
std::string TimestampedVersionTracker::get_current_path_map_str() {
382
1
    std::stringstream tracker_info;
383
1
    tracker_info << "current expired next_path_id " << _next_path_id << std::endl;
384
385
1
    std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter =
386
1
            _stale_version_path_map.begin();
387
1
    while (iter != _stale_version_path_map.end()) {
388
0
        tracker_info << "current expired path_version " << iter->first;
389
0
        std::vector<TimestampedVersionSharedPtr>& timestamped_versions =
390
0
                iter->second->timestamped_versions();
391
0
        std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter =
392
0
                timestamped_versions.begin();
393
0
        int64_t max_create_time = -1;
394
0
        while (version_path_iter != timestamped_versions.end()) {
395
0
            if (max_create_time < (*version_path_iter)->get_create_time()) {
396
0
                max_create_time = (*version_path_iter)->get_create_time();
397
0
            }
398
0
            tracker_info << " -> [";
399
0
            tracker_info << (*version_path_iter)->version().first;
400
0
            tracker_info << ",";
401
0
            tracker_info << (*version_path_iter)->version().second;
402
0
            tracker_info << "]";
403
404
0
            ++version_path_iter;
405
0
        }
406
407
0
        tracker_info << std::endl;
408
0
        ++iter;
409
0
    }
410
1
    return tracker_info.str();
411
1
}
412
413
1
double TimestampedVersionTracker::get_orphan_vertex_ratio() {
414
1
    return _version_graph.get_orphan_vertex_ratio();
415
1
}
416
417
94
void TimestampedVersionPathContainer::add_timestamped_version(TimestampedVersionSharedPtr version) {
418
    // Compare and refresh `_max_create_time`.
419
94
    if (version->get_create_time() > _max_create_time) {
420
40
        _max_create_time = version->get_create_time();
421
40
    }
422
94
    _timestamped_versions_container.push_back(version);
423
94
}
424
425
73
std::vector<TimestampedVersionSharedPtr>& TimestampedVersionPathContainer::timestamped_versions() {
426
73
    return _timestamped_versions_container;
427
73
}
428
429
void VersionGraph::construct_version_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas,
430
57
                                           int64_t* max_version) {
431
57
    if (rs_metas.empty()) {
432
0
        VLOG_NOTICE << "there is no version in the header.";
433
0
        return;
434
0
    }
435
436
    // Distill vertex values from versions in TabletMeta.
437
57
    std::vector<int64_t> vertex_values;
438
57
    vertex_values.reserve(2 * rs_metas.size());
439
440
389
    for (size_t i = 0; i < rs_metas.size(); ++i) {
441
332
        vertex_values.push_back(rs_metas[i]->start_version());
442
332
        vertex_values.push_back(rs_metas[i]->end_version() + 1);
443
332
        if (max_version != nullptr and *max_version < rs_metas[i]->end_version()) {
444
269
            *max_version = rs_metas[i]->end_version();
445
269
        }
446
332
    }
447
57
    std::sort(vertex_values.begin(), vertex_values.end());
448
449
    // Items in `vertex_values` are sorted, but not unique.
450
    // we choose unique items in `vertex_values` to create vertexes.
451
57
    int64_t last_vertex_value = -1;
452
721
    for (size_t i = 0; i < vertex_values.size(); ++i) {
453
664
        if (i != 0 && vertex_values[i] == last_vertex_value) {
454
291
            continue;
455
291
        }
456
457
        // Add vertex to graph.
458
373
        _add_vertex_to_graph(vertex_values[i]);
459
373
        last_vertex_value = vertex_values[i];
460
373
    }
461
    // Create edges for version graph according to TabletMeta's versions.
462
389
    for (size_t i = 0; i < rs_metas.size(); ++i) {
463
        // Versions in header are unique.
464
        // We ensure `_vertex_index_map` has its `start_version`.
465
332
        int64_t start_vertex_index = _vertex_index_map[rs_metas[i]->start_version()];
466
332
        int64_t end_vertex_index = _vertex_index_map[rs_metas[i]->end_version() + 1];
467
        // Add one edge from `start_version` to `end_version`.
468
332
        _version_graph[start_vertex_index].edges.push_front(end_vertex_index);
469
        // Add reverse edge from `end_version` to `start_version`.
470
332
        _version_graph[end_vertex_index].edges.push_front(start_vertex_index);
471
332
    }
472
473
    // Sort edges by version in descending order.
474
373
    for (auto& vertex : _version_graph) {
475
373
        vertex.edges.sort([this](const int& vertex_idx_a, const int& vertex_idx_b) {
476
340
            return _version_graph[vertex_idx_a].value > _version_graph[vertex_idx_b].value;
477
340
        });
478
373
    }
479
57
}
480
481
void VersionGraph::reconstruct_version_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas,
482
53
                                             int64_t* max_version) {
483
53
    _version_graph.clear();
484
53
    _vertex_index_map.clear();
485
486
53
    construct_version_graph(rs_metas, max_version);
487
53
}
488
489
11.0k
void VersionGraph::add_version_to_graph(const Version& version) {
490
    // Add version.first as new vertex of version graph if not exist.
491
11.0k
    int64_t start_vertex_value = version.first;
492
11.0k
    int64_t end_vertex_value = version.second + 1;
493
494
    // Add vertex to graph.
495
11.0k
    _add_vertex_to_graph(start_vertex_value);
496
11.0k
    _add_vertex_to_graph(end_vertex_value);
497
498
11.0k
    int64_t start_vertex_index = _vertex_index_map[start_vertex_value];
499
11.0k
    int64_t end_vertex_index = _vertex_index_map[end_vertex_value];
500
501
    // We assume this version is new version, so we just add two edges
502
    // into version graph. add one edge from `start_version` to `end_version`
503
    // Make sure the vertex's edges are sorted by version in descending order when inserting.
504
11.0k
    auto end_vertex_it = _version_graph[start_vertex_index].edges.begin();
505
11.0k
    while (end_vertex_it != _version_graph[start_vertex_index].edges.end()) {
506
10.6k
        if (_version_graph[*end_vertex_it].value < _version_graph[end_vertex_index].value) {
507
10.6k
            break;
508
10.6k
        }
509
37
        end_vertex_it++;
510
37
    }
511
11.0k
    _version_graph[start_vertex_index].edges.insert(end_vertex_it, end_vertex_index);
512
513
    // We add reverse edge(from end_version to start_version) to graph
514
    // Make sure the vertex's edges are sorted by version in descending order when inserting.
515
11.0k
    auto start_vertex_it = _version_graph[end_vertex_index].edges.begin();
516
11.0k
    while (start_vertex_it != _version_graph[end_vertex_index].edges.end()) {
517
55
        if (_version_graph[*start_vertex_it].value < _version_graph[start_vertex_index].value) {
518
17
            break;
519
17
        }
520
38
        start_vertex_it++;
521
38
    }
522
11.0k
    _version_graph[end_vertex_index].edges.insert(start_vertex_it, start_vertex_index);
523
11.0k
}
524
525
32
Status VersionGraph::delete_version_from_graph(const Version& version) {
526
32
    int64_t start_vertex_value = version.first;
527
32
    int64_t end_vertex_value = version.second + 1;
528
529
32
    if (_vertex_index_map.find(start_vertex_value) == _vertex_index_map.end() ||
530
32
        _vertex_index_map.find(end_vertex_value) == _vertex_index_map.end()) {
531
0
        return Status::Error<HEADER_DELETE_VERSION>(
532
0
                "vertex for version does not exists. version={}-{}", version.first, version.second);
533
0
    }
534
535
32
    int64_t start_vertex_index = _vertex_index_map[start_vertex_value];
536
32
    int64_t end_vertex_index = _vertex_index_map[end_vertex_value];
537
    // Remove edge and its reverse edge.
538
    // When there are same versions in edges, just remove the first version.
539
32
    auto start_edges_iter = _version_graph[start_vertex_index].edges.begin();
540
47
    while (start_edges_iter != _version_graph[start_vertex_index].edges.end()) {
541
47
        if (*start_edges_iter == end_vertex_index) {
542
32
            _version_graph[start_vertex_index].edges.erase(start_edges_iter);
543
32
            break;
544
32
        }
545
15
        start_edges_iter++;
546
15
    }
547
548
32
    auto end_edges_iter = _version_graph[end_vertex_index].edges.begin();
549
65
    while (end_edges_iter != _version_graph[end_vertex_index].edges.end()) {
550
65
        if (*end_edges_iter == start_vertex_index) {
551
32
            _version_graph[end_vertex_index].edges.erase(end_edges_iter);
552
32
            break;
553
32
        }
554
33
        end_edges_iter++;
555
33
    }
556
557
    // Here we do not delete vertex in `_version_graph` even if its edges are empty.
558
    // the `_version_graph` will be rebuilt when doing trash sweep.
559
32
    return Status::OK();
560
32
}
561
562
22.3k
void VersionGraph::_add_vertex_to_graph(int64_t vertex_value) {
563
    // Vertex with vertex_value already exists.
564
22.3k
    if (_vertex_index_map.find(vertex_value) != _vertex_index_map.end()) {
565
10.6k
        VLOG_NOTICE << "vertex with vertex value already exists. value=" << vertex_value;
566
10.6k
        return;
567
10.6k
    }
568
569
11.7k
    _version_graph.emplace_back(Vertex(vertex_value));
570
11.7k
    _vertex_index_map[vertex_value] = _version_graph.size() - 1;
571
11.7k
}
572
573
Status VersionGraph::capture_consistent_versions(const Version& spec_version,
574
23
                                                 std::vector<Version>* version_path) const {
575
23
    if (spec_version.first > spec_version.second) {
576
0
        return Status::Error<INVALID_ARGUMENT, false>(
577
0
                "invalid specified version. spec_version={}-{}", spec_version.first,
578
0
                spec_version.second);
579
0
    }
580
581
23
    int64_t cur_idx = -1;
582
27
    for (size_t i = 0; i < _version_graph.size(); i++) {
583
27
        if (_version_graph[i].value == spec_version.first) {
584
23
            cur_idx = i;
585
23
            break;
586
23
        }
587
27
    }
588
589
23
    if (cur_idx < 0) {
590
0
        return Status::InternalError<false>(
591
0
                "failed to find path in version_graph. spec_version: {}-{}", spec_version.first,
592
0
                spec_version.second);
593
0
    }
594
595
23
    int64_t end_value = spec_version.second + 1;
596
79
    while (_version_graph[cur_idx].value < end_value) {
597
57
        int64_t next_idx = -1;
598
61
        for (const auto& it : _version_graph[cur_idx].edges) {
599
            // Only consider incremental versions.
600
61
            if (_version_graph[it].value < _version_graph[cur_idx].value) {
601
1
                break;
602
1
            }
603
604
60
            if (_version_graph[it].value > end_value) {
605
4
                continue;
606
4
            }
607
608
            // Considering edges had been sorted by version in descending order,
609
            // This version is the largest version that smaller than `end_version`.
610
56
            next_idx = it;
611
56
            break;
612
60
        }
613
614
57
        if (next_idx > -1) {
615
56
            if (version_path != nullptr) {
616
36
                version_path->emplace_back(_version_graph[cur_idx].value,
617
36
                                           _version_graph[next_idx].value - 1);
618
36
            }
619
56
            cur_idx = next_idx;
620
56
        } else {
621
1
            return Status::InternalError<false>(
622
1
                    "fail to find path in version_graph. spec_version: {}-{}", spec_version.first,
623
1
                    spec_version.second);
624
1
        }
625
57
    }
626
627
22
    if (VLOG_TRACE_IS_ON && version_path != nullptr) {
628
0
        std::stringstream shortest_path_for_debug;
629
0
        for (const auto& version : *version_path) {
630
0
            shortest_path_for_debug << version << ' ';
631
0
        }
632
0
        VLOG_TRACE << "success to find path for spec_version. spec_version=" << spec_version
633
0
                   << ", path=" << shortest_path_for_debug.str();
634
0
    }
635
636
22
    return Status::OK();
637
23
}
638
639
2
double VersionGraph::get_orphan_vertex_ratio() {
640
2
    int64_t vertex_num = _version_graph.size();
641
2
    int64_t orphan_vertex_num = 0;
642
15
    for (auto& iter : _version_graph) {
643
15
        if (iter.edges.empty()) {
644
6
            ++orphan_vertex_num;
645
6
        }
646
15
    }
647
2
    return static_cast<double>(orphan_vertex_num) / static_cast<double>(vertex_num);
648
2
}
649
650
#include "common/compile_check_end.h"
651
} // namespace doris