Coverage Report

Created: 2025-11-20 04:52

/root/doris/be/src/olap/version_graph.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/version_graph.h"
19
20
#include <cctz/time_zone.h>
21
#include <stddef.h>
22
23
#include <algorithm>
24
// IWYU pragma: no_include <bits/chrono.h>
25
#include <chrono> // IWYU pragma: keep
26
#include <list>
27
#include <memory>
28
#include <optional>
29
#include <ostream>
30
#include <utility>
31
32
#include "common/logging.h"
33
34
namespace doris {
35
using namespace ErrorCode;
36
37
void TimestampedVersionTracker::_construct_versioned_tracker(
38
53
        const std::vector<RowsetMetaSharedPtr>& rs_metas) {
39
53
    int64_t max_version = 0;
40
41
    // construct the rowset graph
42
53
    _version_graph.reconstruct_version_graph(rs_metas, &max_version);
43
53
}
44
45
void TimestampedVersionTracker::construct_versioned_tracker(
46
12
        const std::vector<RowsetMetaSharedPtr>& rs_metas) {
47
12
    if (rs_metas.empty()) {
  Branch (47:9): [True: 0, False: 12]
48
0
        VLOG_NOTICE << "there is no version in the header.";
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
49
0
        return;
50
0
    }
51
12
    _stale_version_path_map.clear();
52
12
    _next_path_id = 1;
53
12
    _construct_versioned_tracker(rs_metas);
54
12
}
55
56
void TimestampedVersionTracker::construct_versioned_tracker(
57
        const std::vector<RowsetMetaSharedPtr>& rs_metas,
58
660
        const std::vector<RowsetMetaSharedPtr>& stale_metas) {
59
660
    if (rs_metas.empty()) {
  Branch (59:9): [True: 619, False: 41]
60
619
        VLOG_NOTICE << "there is no version in the header.";
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
61
619
        return;
62
619
    }
63
41
    _stale_version_path_map.clear();
64
41
    _next_path_id = 1;
65
41
    _construct_versioned_tracker(rs_metas);
66
67
    // Init `_stale_version_path_map`.
68
41
    _init_stale_version_path_map(rs_metas, stale_metas);
69
41
}
70
71
void TimestampedVersionTracker::_init_stale_version_path_map(
72
        const std::vector<RowsetMetaSharedPtr>& rs_metas,
73
41
        const std::vector<RowsetMetaSharedPtr>& stale_metas) {
74
41
    if (stale_metas.empty()) {
  Branch (74:9): [True: 40, False: 1]
75
40
        return;
76
40
    }
77
78
    // Sort stale meta by version diff (second version - first version).
79
1
    std::list<RowsetMetaSharedPtr> sorted_stale_metas;
80
7
    for (auto& rs : stale_metas) {
  Branch (80:19): [True: 7, False: 1]
81
7
        sorted_stale_metas.emplace_back(rs);
82
7
    }
83
84
    // 1. sort the existing rowsets by version in ascending order.
85
14
    sorted_stale_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
86
        // Compare by version diff between `version.first` and `version.second`.
87
14
        int64_t a_diff = a->version().second - a->version().first;
88
14
        int64_t b_diff = b->version().second - b->version().first;
89
90
14
        int diff = a_diff - b_diff;
91
14
        if (diff < 0) {
  Branch (91:13): [True: 5, False: 9]
92
5
            return true;
93
9
        } else if (diff > 0) {
  Branch (93:20): [True: 4, False: 5]
94
4
            return false;
95
4
        }
96
        // When the version diff is equal, compare the rowset`s stale time
97
5
        return a->stale_at() < b->stale_at();
98
14
    });
99
100
    // first_version -> (second_version -> rowset_meta)
101
1
    std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>> stale_map;
102
103
    // 2. generate stale path from stale_metas. traverse sorted_stale_metas and each time add stale_meta to stale_map.
104
    // when a stale path in stale_map can replace stale_meta in sorted_stale_metas, stale_map remove rowset_metas of a stale path
105
    // and add the path to `_stale_version_path_map`.
106
7
    for (auto& stale_meta : sorted_stale_metas) {
  Branch (106:27): [True: 7, False: 1]
107
7
        std::vector<RowsetMetaSharedPtr> stale_path;
108
        // 2.1 find a path in `stale_map` can replace current `stale_meta` version.
109
7
        bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(),
110
7
                                           stale_meta->end_version(), &stale_path);
111
112
        // 2.2 add version to `version_graph`.
113
7
        Version stale_meta_version = stale_meta->version();
114
7
        add_version(stale_meta_version);
115
116
        // 2.3 find the path.
117
7
        if (r) {
  Branch (117:13): [True: 1, False: 6]
118
            // Add the path to `_stale_version_path_map`.
119
1
            add_stale_path_version(stale_path);
120
            // Remove `stale_path` from `stale_map`.
121
2
            for (auto stale_item : stale_path) {
  Branch (121:34): [True: 2, False: 1]
122
2
                stale_map[stale_item->start_version()].erase(stale_item->end_version());
123
124
2
                if (stale_map[stale_item->start_version()].empty()) {
  Branch (124:21): [True: 2, False: 0]
125
2
                    stale_map.erase(stale_item->start_version());
126
2
                }
127
2
            }
128
1
        }
129
130
        // 2.4 add `stale_meta` to `stale_map`.
131
7
        auto start_iter = stale_map.find(stale_meta->start_version());
132
7
        if (start_iter != stale_map.end()) {
  Branch (132:13): [True: 0, False: 7]
133
0
            start_iter->second[stale_meta->end_version()] = stale_meta;
134
7
        } else {
135
7
            std::unordered_map<int64_t, RowsetMetaSharedPtr> item;
136
7
            item[stale_meta->end_version()] = stale_meta;
137
7
            stale_map[stale_meta->start_version()] = std::move(item);
138
7
        }
139
7
    }
140
141
    // 3. generate stale path from `rs_metas`.
142
5
    for (auto& stale_meta : rs_metas) {
  Branch (142:27): [True: 5, False: 1]
143
5
        std::vector<RowsetMetaSharedPtr> stale_path;
144
        // 3.1 find a path in stale_map can replace current `stale_meta` version.
145
5
        bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(),
146
5
                                           stale_meta->end_version(), &stale_path);
147
148
        // 3.2 find the path.
149
5
        if (r) {
  Branch (149:13): [True: 2, False: 3]
150
            // Add the path to `_stale_version_path_map`.
151
2
            add_stale_path_version(stale_path);
152
            // Remove `stale_path` from `stale_map`.
153
4
            for (auto stale_item : stale_path) {
  Branch (153:34): [True: 4, False: 2]
154
4
                stale_map[stale_item->start_version()].erase(stale_item->end_version());
155
156
4
                if (stale_map[stale_item->start_version()].empty()) {
  Branch (156:21): [True: 4, False: 0]
157
4
                    stale_map.erase(stale_item->start_version());
158
4
                }
159
4
            }
160
2
        }
161
5
    }
162
163
    // 4. process remain stale `rowset_meta` in `stale_map`.
164
1
    auto map_iter = stale_map.begin();
165
2
    while (map_iter != stale_map.end()) {
  Branch (165:12): [True: 1, False: 1]
166
1
        auto second_iter = map_iter->second.begin();
167
2
        while (second_iter != map_iter->second.end()) {
  Branch (167:16): [True: 1, False: 1]
168
            // Each remain stale `rowset_meta` generate a stale path.
169
1
            std::vector<RowsetMetaSharedPtr> stale_path;
170
1
            stale_path.push_back(second_iter->second);
171
1
            add_stale_path_version(stale_path);
172
173
1
            second_iter++;
174
1
        }
175
1
        map_iter++;
176
1
    }
177
1
}
178
179
bool TimestampedVersionTracker::_find_path_from_stale_map(
180
        const std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>>&
181
                stale_map,
182
        int64_t first_version, int64_t second_version,
183
16
        std::vector<RowsetMetaSharedPtr>* stale_path) {
184
16
    auto first_iter = stale_map.find(first_version);
185
    // If `first_version` not in `stale_map`, there is no path.
186
16
    if (first_iter == stale_map.end()) {
  Branch (186:9): [True: 9, False: 7]
187
9
        return false;
188
9
    }
189
7
    auto& second_version_map = first_iter->second;
190
7
    auto second_iter = second_version_map.find(second_version);
191
    // If second_version in `stale_map`, find a path.
192
7
    if (second_iter != second_version_map.end()) {
  Branch (192:9): [True: 3, False: 4]
193
3
        auto row_meta = second_iter->second;
194
        // Add rowset to path.
195
3
        stale_path->push_back(row_meta);
196
3
        return true;
197
3
    }
198
199
    // Traverse the first version map to backtracking  `_find_path_from_stale_map`.
200
4
    auto map_iter = second_version_map.begin();
201
5
    while (map_iter != second_version_map.end()) {
  Branch (201:12): [True: 4, False: 1]
202
        // The version greater than `second_version`, we can't find path in `stale_map`.
203
4
        if (map_iter->first > second_version) {
  Branch (203:13): [True: 0, False: 4]
204
0
            map_iter++;
205
0
            continue;
206
0
        }
207
        // Backtracking `_find_path_from_stale_map` find from `map_iter->first + 1` to `second_version`.
208
4
        stale_path->push_back(map_iter->second);
209
4
        bool r = _find_path_from_stale_map(stale_map, map_iter->first + 1, second_version,
210
4
                                           stale_path);
211
4
        if (r) {
  Branch (211:13): [True: 3, False: 1]
212
3
            return true;
213
3
        }
214
        // There is no path in current version, pop and continue.
215
1
        stale_path->pop_back();
216
1
        map_iter++;
217
1
    }
218
219
1
    return false;
220
4
}
221
222
35
void TimestampedVersionTracker::get_stale_version_path_json_doc(rapidjson::Document& path_arr) {
223
35
    auto path_arr_iter = _stale_version_path_map.begin();
224
225
    // Do loop version path.
226
133
    while (path_arr_iter != _stale_version_path_map.end()) {
  Branch (226:12): [True: 98, False: 35]
227
98
        auto path_id = path_arr_iter->first;
228
98
        auto path_version_path = path_arr_iter->second;
229
230
98
        rapidjson::Document item;
231
98
        item.SetObject();
232
        // Add `path_id` to item.
233
98
        auto path_id_str = std::to_string(path_id);
234
98
        rapidjson::Value path_id_value;
235
98
        path_id_value.SetString(path_id_str.c_str(), path_id_str.length(), path_arr.GetAllocator());
236
98
        item.AddMember("path id", path_id_value, path_arr.GetAllocator());
237
238
        // Add max create time to item.
239
98
        auto time_zone = cctz::local_time_zone();
240
241
98
        auto tp = std::chrono::system_clock::from_time_t(path_version_path->max_create_time());
242
98
        auto create_time_str = cctz::format("%Y-%m-%d %H:%M:%S %z", tp, time_zone);
243
244
98
        rapidjson::Value create_time_value;
245
98
        create_time_value.SetString(create_time_str.c_str(), create_time_str.length(),
246
98
                                    path_arr.GetAllocator());
247
98
        item.AddMember("last create time", create_time_value, path_arr.GetAllocator());
248
249
        // Add path list to item.
250
98
        std::stringstream path_list_stream;
251
98
        path_list_stream << path_id_str;
252
98
        auto path_list_ptr = path_version_path->timestamped_versions();
253
98
        auto path_list_iter = path_list_ptr.begin();
254
645
        while (path_list_iter != path_list_ptr.end()) {
  Branch (254:16): [True: 547, False: 98]
255
547
            path_list_stream << " -> ";
256
547
            path_list_stream << "[";
257
547
            path_list_stream << (*path_list_iter)->version().first;
258
547
            path_list_stream << "-";
259
547
            path_list_stream << (*path_list_iter)->version().second;
260
547
            path_list_stream << "]";
261
547
            path_list_iter++;
262
547
        }
263
98
        std::string path_list = path_list_stream.str();
264
98
        rapidjson::Value path_list_value;
265
98
        path_list_value.SetString(path_list.c_str(), path_list.length(), path_arr.GetAllocator());
266
98
        item.AddMember("path list", path_list_value, path_arr.GetAllocator());
267
268
        // Add item to `path_arr`.
269
98
        path_arr.PushBack(item, path_arr.GetAllocator());
270
271
98
        path_arr_iter++;
272
98
    }
273
35
}
274
275
void TimestampedVersionTracker::recover_versioned_tracker(
276
1
        const std::map<int64_t, PathVersionListSharedPtr>& stale_version_path_map) {
277
1
    auto _path_map_iter = stale_version_path_map.begin();
278
    // Recover `stale_version_path_map`.
279
1
    while (_path_map_iter != stale_version_path_map.end()) {
  Branch (279:12): [True: 0, False: 1]
280
        // Add `PathVersionListSharedPtr` to map.
281
0
        _stale_version_path_map[_path_map_iter->first] = _path_map_iter->second;
282
283
0
        std::vector<TimestampedVersionSharedPtr>& timestamped_versions =
284
0
                _path_map_iter->second->timestamped_versions();
285
0
        std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter =
286
0
                timestamped_versions.begin();
287
0
        while (version_path_iter != timestamped_versions.end()) {
  Branch (287:16): [True: 0, False: 0]
288
            // Add version to `_version_graph`.
289
0
            _version_graph.add_version_to_graph((*version_path_iter)->version());
290
0
            ++version_path_iter;
291
0
        }
292
0
        ++_path_map_iter;
293
0
    }
294
1
    LOG(INFO) << "recover_versioned_tracker current map info " << get_current_path_map_str();
295
1
}
296
297
11.7k
void TimestampedVersionTracker::add_version(const Version& version) {
298
11.7k
    _version_graph.add_version_to_graph(version);
299
11.7k
}
300
301
void TimestampedVersionTracker::add_stale_path_version(
302
136
        const std::vector<RowsetMetaSharedPtr>& stale_rs_metas) {
303
136
    if (stale_rs_metas.empty()) {
  Branch (303:9): [True: 1, False: 135]
304
1
        VLOG_NOTICE << "there is no version in the stale_rs_metas.";
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
305
1
        return;
306
1
    }
307
308
135
    PathVersionListSharedPtr ptr(new TimestampedVersionPathContainer());
309
617
    for (auto rs : stale_rs_metas) {
  Branch (309:18): [True: 617, False: 135]
310
617
        TimestampedVersionSharedPtr vt_ptr(new TimestampedVersion(rs->version(), rs->stale_at()));
311
617
        ptr->add_timestamped_version(vt_ptr);
312
617
    }
313
314
135
    std::vector<TimestampedVersionSharedPtr>& timestamped_versions = ptr->timestamped_versions();
315
316
135
    struct TimestampedVersionPtrCompare {
317
135
        bool operator()(const TimestampedVersionSharedPtr ptr1,
318
1.05k
                        const TimestampedVersionSharedPtr ptr2) {
319
1.05k
            return ptr1->version().first < ptr2->version().first;
320
1.05k
        }
321
135
    };
322
135
    sort(timestamped_versions.begin(), timestamped_versions.end(), TimestampedVersionPtrCompare());
323
135
    _stale_version_path_map[_next_path_id] = ptr;
324
135
    _next_path_id++;
325
135
}
326
327
// Capture consistent versions from graph.
328
Status TimestampedVersionTracker::capture_consistent_versions(
329
26
        const Version& spec_version, std::vector<Version>* version_path) const {
330
26
    return _version_graph.capture_consistent_versions(spec_version, version_path);
331
26
}
332
333
Status TimestampedVersionTracker::capture_consistent_versions_with_validator(
334
        const Version& spec_version, std::vector<Version>& version_path,
335
9
        const std::function<bool(int64_t, int64_t)>& validator) const {
336
9
    return _version_graph.capture_consistent_versions_with_validator(spec_version, version_path,
337
9
                                                                     validator);
338
9
}
339
340
Status TimestampedVersionTracker::capture_consistent_versions_prefer_cache(
341
        const Version& spec_version, std::vector<Version>& version_path,
342
13
        const std::function<bool(int64_t, int64_t)>& validator) const {
343
13
    return _version_graph.capture_consistent_versions_prefer_cache(spec_version, version_path,
344
13
                                                                   validator);
345
13
}
346
347
Status TimestampedVersionTracker::capture_consistent_versions_with_validator_mow(
348
        const Version& spec_version, std::vector<Version>& version_path,
349
11
        const std::function<bool(int64_t, int64_t)>& validator) const {
350
11
    return _version_graph.capture_consistent_versions_with_validator_mow(spec_version, version_path,
351
11
                                                                         validator);
352
11
}
353
354
void TimestampedVersionTracker::capture_expired_paths(
355
5
        int64_t stale_sweep_endtime, std::vector<int64_t>* path_version_vec) const {
356
5
    std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter =
357
5
            _stale_version_path_map.begin();
358
359
19
    while (iter != _stale_version_path_map.end()) {
  Branch (359:12): [True: 14, False: 5]
360
14
        int64_t max_create_time = iter->second->max_create_time();
361
14
        if (max_create_time <= stale_sweep_endtime) {
  Branch (361:13): [True: 9, False: 5]
362
9
            int64_t path_version = iter->first;
363
9
            path_version_vec->push_back(path_version);
364
9
        }
365
14
        ++iter;
366
14
    }
367
5
}
368
369
13
PathVersionListSharedPtr TimestampedVersionTracker::fetch_path_version_by_id(int64_t path_id) {
370
13
    if (_stale_version_path_map.count(path_id) == 0) {
  Branch (370:9): [True: 0, False: 13]
371
0
        VLOG_NOTICE << "path version " << path_id << " does not exist!";
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
372
0
        return nullptr;
373
0
    }
374
375
13
    return _stale_version_path_map[path_id];
376
13
}
377
378
13
PathVersionListSharedPtr TimestampedVersionTracker::fetch_and_delete_path_by_id(int64_t path_id) {
379
13
    if (_stale_version_path_map.count(path_id) == 0) {
  Branch (379:9): [True: 0, False: 13]
380
0
        VLOG_NOTICE << "path version " << path_id << " does not exist!";
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
381
0
        return nullptr;
382
0
    }
383
384
13
    VLOG_NOTICE << get_current_path_map_str();
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
385
13
    PathVersionListSharedPtr ptr = fetch_path_version_by_id(path_id);
386
387
13
    _stale_version_path_map.erase(path_id);
388
389
23
    for (auto& version : ptr->timestamped_versions()) {
  Branch (389:24): [True: 23, False: 13]
390
23
        static_cast<void>(_version_graph.delete_version_from_graph(version->version()));
391
23
    }
392
13
    return ptr;
393
13
}
394
395
1
std::string TimestampedVersionTracker::get_current_path_map_str() {
396
1
    std::stringstream tracker_info;
397
1
    tracker_info << "current expired next_path_id " << _next_path_id << std::endl;
398
399
1
    std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter =
400
1
            _stale_version_path_map.begin();
401
1
    while (iter != _stale_version_path_map.end()) {
  Branch (401:12): [True: 0, False: 1]
402
0
        tracker_info << "current expired path_version " << iter->first;
403
0
        std::vector<TimestampedVersionSharedPtr>& timestamped_versions =
404
0
                iter->second->timestamped_versions();
405
0
        std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter =
406
0
                timestamped_versions.begin();
407
0
        int64_t max_create_time = -1;
408
0
        while (version_path_iter != timestamped_versions.end()) {
  Branch (408:16): [True: 0, False: 0]
409
0
            if (max_create_time < (*version_path_iter)->get_create_time()) {
  Branch (409:17): [True: 0, False: 0]
410
0
                max_create_time = (*version_path_iter)->get_create_time();
411
0
            }
412
0
            tracker_info << " -> [";
413
0
            tracker_info << (*version_path_iter)->version().first;
414
0
            tracker_info << ",";
415
0
            tracker_info << (*version_path_iter)->version().second;
416
0
            tracker_info << "]";
417
418
0
            ++version_path_iter;
419
0
        }
420
421
0
        tracker_info << std::endl;
422
0
        ++iter;
423
0
    }
424
1
    return tracker_info.str();
425
1
}
426
427
1
double TimestampedVersionTracker::get_orphan_vertex_ratio() {
428
1
    return _version_graph.get_orphan_vertex_ratio();
429
1
}
430
431
5
std::string TimestampedVersionTracker::debug_string() const {
432
5
    return _version_graph.debug_string();
433
5
}
434
435
617
void TimestampedVersionPathContainer::add_timestamped_version(TimestampedVersionSharedPtr version) {
436
    // Compare and refresh `_max_create_time`.
437
617
    if (version->get_create_time() > _max_create_time) {
  Branch (437:9): [True: 135, False: 482]
438
135
        _max_create_time = version->get_create_time();
439
135
    }
440
617
    _timestamped_versions_container.push_back(version);
441
617
}
442
443
261
std::vector<TimestampedVersionSharedPtr>& TimestampedVersionPathContainer::timestamped_versions() {
444
261
    return _timestamped_versions_container;
445
261
}
446
447
void VersionGraph::construct_version_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas,
448
58
                                           int64_t* max_version) {
449
58
    if (rs_metas.empty()) {
  Branch (449:9): [True: 0, False: 58]
450
0
        VLOG_NOTICE << "there is no version in the header.";
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
451
0
        return;
452
0
    }
453
454
    // Distill vertex values from versions in TabletMeta.
455
58
    std::vector<int64_t> vertex_values;
456
58
    vertex_values.reserve(2 * rs_metas.size());
457
458
391
    for (size_t i = 0; i < rs_metas.size(); ++i) {
  Branch (458:24): [True: 333, False: 58]
459
333
        vertex_values.push_back(rs_metas[i]->start_version());
460
333
        vertex_values.push_back(rs_metas[i]->end_version() + 1);
461
333
        if (max_version != nullptr and *max_version < rs_metas[i]->end_version()) {
  Branch (461:13): [True: 333, False: 0]
  Branch (461:40): [True: 270, False: 63]
462
270
            *max_version = rs_metas[i]->end_version();
463
270
        }
464
333
    }
465
58
    std::sort(vertex_values.begin(), vertex_values.end());
466
467
    // Items in `vertex_values` are sorted, but not unique.
468
    // we choose unique items in `vertex_values` to create vertexes.
469
58
    int64_t last_vertex_value = -1;
470
724
    for (size_t i = 0; i < vertex_values.size(); ++i) {
  Branch (470:24): [True: 666, False: 58]
471
666
        if (i != 0 && vertex_values[i] == last_vertex_value) {
  Branch (471:13): [True: 608, False: 58]
  Branch (471:23): [True: 291, False: 317]
472
291
            continue;
473
291
        }
474
475
        // Add vertex to graph.
476
375
        _add_vertex_to_graph(vertex_values[i]);
477
375
        last_vertex_value = vertex_values[i];
478
375
    }
479
    // Create edges for version graph according to TabletMeta's versions.
480
391
    for (size_t i = 0; i < rs_metas.size(); ++i) {
  Branch (480:24): [True: 333, False: 58]
481
        // Versions in header are unique.
482
        // We ensure `_vertex_index_map` has its `start_version`.
483
333
        int64_t start_vertex_index = _vertex_index_map[rs_metas[i]->start_version()];
484
333
        int64_t end_vertex_index = _vertex_index_map[rs_metas[i]->end_version() + 1];
485
        // Add one edge from `start_version` to `end_version`.
486
333
        _version_graph[start_vertex_index].edges.push_front(end_vertex_index);
487
        // Add reverse edge from `end_version` to `start_version`.
488
333
        _version_graph[end_vertex_index].edges.push_front(start_vertex_index);
489
333
    }
490
491
    // Sort edges by version in descending order.
492
375
    for (auto& vertex : _version_graph) {
  Branch (492:23): [True: 375, False: 58]
493
375
        vertex.edges.sort([this](const int& vertex_idx_a, const int& vertex_idx_b) {
494
340
            return _version_graph[vertex_idx_a].value > _version_graph[vertex_idx_b].value;
495
340
        });
496
375
    }
497
58
}
498
499
void VersionGraph::reconstruct_version_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas,
500
54
                                             int64_t* max_version) {
501
54
    _version_graph.clear();
502
54
    _vertex_index_map.clear();
503
504
54
    construct_version_graph(rs_metas, max_version);
505
54
}
506
507
11.7k
void VersionGraph::add_version_to_graph(const Version& version) {
508
    // Add version.first as new vertex of version graph if not exist.
509
11.7k
    int64_t start_vertex_value = version.first;
510
11.7k
    int64_t end_vertex_value = version.second + 1;
511
512
    // Add vertex to graph.
513
11.7k
    _add_vertex_to_graph(start_vertex_value);
514
11.7k
    _add_vertex_to_graph(end_vertex_value);
515
516
11.7k
    int64_t start_vertex_index = _vertex_index_map[start_vertex_value];
517
11.7k
    int64_t end_vertex_index = _vertex_index_map[end_vertex_value];
518
519
    // We assume this version is new version, so we just add two edges
520
    // into version graph. add one edge from `start_version` to `end_version`
521
    // Make sure the vertex's edges are sorted by version in descending order when inserting.
522
11.7k
    auto end_vertex_it = _version_graph[start_vertex_index].edges.begin();
523
11.7k
    while (end_vertex_it != _version_graph[start_vertex_index].edges.end()) {
  Branch (523:12): [True: 11.3k, False: 431]
524
11.3k
        if (_version_graph[*end_vertex_it].value < _version_graph[end_vertex_index].value) {
  Branch (524:13): [True: 11.3k, False: 37]
525
11.3k
            break;
526
11.3k
        }
527
37
        end_vertex_it++;
528
37
    }
529
11.7k
    _version_graph[start_vertex_index].edges.insert(end_vertex_it, end_vertex_index);
530
531
    // We add reverse edge(from end_version to start_version) to graph
532
    // Make sure the vertex's edges are sorted by version in descending order when inserting.
533
11.7k
    auto start_vertex_it = _version_graph[end_vertex_index].edges.begin();
534
11.9k
    while (start_vertex_it != _version_graph[end_vertex_index].edges.end()) {
  Branch (534:12): [True: 215, False: 11.7k]
535
215
        if (_version_graph[*start_vertex_it].value < _version_graph[start_vertex_index].value) {
  Branch (535:13): [True: 17, False: 198]
536
17
            break;
537
17
        }
538
198
        start_vertex_it++;
539
198
    }
540
11.7k
    _version_graph[end_vertex_index].edges.insert(start_vertex_it, start_vertex_index);
541
11.7k
}
542
543
32
Status VersionGraph::delete_version_from_graph(const Version& version) {
544
32
    int64_t start_vertex_value = version.first;
545
32
    int64_t end_vertex_value = version.second + 1;
546
547
32
    if (_vertex_index_map.find(start_vertex_value) == _vertex_index_map.end() ||
  Branch (547:9): [True: 0, False: 32]
  Branch (547:9): [True: 0, False: 32]
548
32
        _vertex_index_map.find(end_vertex_value) == _vertex_index_map.end()) {
  Branch (548:9): [True: 0, False: 32]
549
0
        return Status::Error<HEADER_DELETE_VERSION>(
550
0
                "vertex for version does not exists. version={}-{}", version.first, version.second);
551
0
    }
552
553
32
    int64_t start_vertex_index = _vertex_index_map[start_vertex_value];
554
32
    int64_t end_vertex_index = _vertex_index_map[end_vertex_value];
555
    // Remove edge and its reverse edge.
556
    // When there are same versions in edges, just remove the first version.
557
32
    auto start_edges_iter = _version_graph[start_vertex_index].edges.begin();
558
47
    while (start_edges_iter != _version_graph[start_vertex_index].edges.end()) {
  Branch (558:12): [True: 47, False: 0]
559
47
        if (*start_edges_iter == end_vertex_index) {
  Branch (559:13): [True: 32, False: 15]
560
32
            _version_graph[start_vertex_index].edges.erase(start_edges_iter);
561
32
            break;
562
32
        }
563
15
        start_edges_iter++;
564
15
    }
565
566
32
    auto end_edges_iter = _version_graph[end_vertex_index].edges.begin();
567
65
    while (end_edges_iter != _version_graph[end_vertex_index].edges.end()) {
  Branch (567:12): [True: 65, False: 0]
568
65
        if (*end_edges_iter == start_vertex_index) {
  Branch (568:13): [True: 32, False: 33]
569
32
            _version_graph[end_vertex_index].edges.erase(end_edges_iter);
570
32
            break;
571
32
        }
572
33
        end_edges_iter++;
573
33
    }
574
575
    // Here we do not delete vertex in `_version_graph` even if its edges are empty.
576
    // the `_version_graph` will be rebuilt when doing trash sweep.
577
32
    return Status::OK();
578
32
}
579
580
23.8k
void VersionGraph::_add_vertex_to_graph(int64_t vertex_value) {
581
    // Vertex with vertex_value already exists.
582
23.8k
    if (_vertex_index_map.find(vertex_value) != _vertex_index_map.end()) {
  Branch (582:9): [True: 11.4k, False: 12.4k]
583
11.4k
        VLOG_NOTICE << "vertex with vertex value already exists. value=" << vertex_value;
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
584
11.4k
        return;
585
11.4k
    }
586
587
12.4k
    _version_graph.emplace_back(Vertex(vertex_value));
588
12.4k
    _vertex_index_map[vertex_value] = _version_graph.size() - 1;
589
12.4k
}
590
591
Status VersionGraph::capture_consistent_versions(const Version& spec_version,
592
28
                                                 std::vector<Version>* version_path) const {
593
28
    if (spec_version.first > spec_version.second) {
  Branch (593:9): [True: 0, False: 28]
594
0
        return Status::Error<INVALID_ARGUMENT, false>(
595
0
                "invalid specified version. spec_version={}-{}", spec_version.first,
596
0
                spec_version.second);
597
0
    }
598
599
28
    int64_t cur_idx = -1;
600
32
    for (size_t i = 0; i < _version_graph.size(); i++) {
  Branch (600:24): [True: 32, False: 0]
601
32
        if (_version_graph[i].value == spec_version.first) {
  Branch (601:13): [True: 28, False: 4]
602
28
            cur_idx = i;
603
28
            break;
604
28
        }
605
32
    }
606
607
28
    if (cur_idx < 0) {
  Branch (607:9): [True: 0, False: 28]
608
0
        return Status::InternalError<false>(
609
0
                "failed to find path in version_graph. spec_version: {}-{}", spec_version.first,
610
0
                spec_version.second);
611
0
    }
612
613
28
    int64_t end_value = spec_version.second + 1;
614
107
    while (_version_graph[cur_idx].value < end_value) {
  Branch (614:12): [True: 80, False: 27]
615
80
        int64_t next_idx = -1;
616
84
        for (const auto& it : _version_graph[cur_idx].edges) {
  Branch (616:29): [True: 84, False: 0]
617
            // Only consider incremental versions.
618
84
            if (_version_graph[it].value < _version_graph[cur_idx].value) {
  Branch (618:17): [True: 1, False: 83]
619
1
                break;
620
1
            }
621
622
83
            if (_version_graph[it].value > end_value) {
  Branch (622:17): [True: 4, False: 79]
623
4
                continue;
624
4
            }
625
626
            // Considering edges had been sorted by version in descending order,
627
            // This version is the largest version that smaller than `end_version`.
628
79
            next_idx = it;
629
79
            break;
630
83
        }
631
632
80
        if (next_idx > -1) {
  Branch (632:13): [True: 79, False: 1]
633
79
            if (version_path != nullptr) {
  Branch (633:17): [True: 79, False: 0]
634
79
                version_path->emplace_back(_version_graph[cur_idx].value,
635
79
                                           _version_graph[next_idx].value - 1);
636
79
            }
637
79
            cur_idx = next_idx;
638
79
        } else {
639
1
            return Status::InternalError<false>(
640
1
                    "fail to find path in version_graph. spec_version: {}-{}", spec_version.first,
641
1
                    spec_version.second);
642
1
        }
643
80
    }
644
645
27
    if (VLOG_TRACE_IS_ON && version_path != nullptr) {
Line
Count
Source
50
27
#define VLOG_TRACE_IS_ON VLOG_IS_ON(10)
  Branch (645:29): [True: 0, False: 0]
646
0
        std::stringstream shortest_path_for_debug;
647
0
        for (const auto& version : *version_path) {
  Branch (647:34): [True: 0, False: 0]
648
0
            shortest_path_for_debug << version << ' ';
649
0
        }
650
0
        VLOG_TRACE << "success to find path for spec_version. spec_version=" << spec_version
Line
Count
Source
40
0
#define VLOG_TRACE VLOG(10)
651
0
                   << ", path=" << shortest_path_for_debug.str();
652
0
    }
653
654
27
    return Status::OK();
655
28
}
656
657
Status VersionGraph::capture_consistent_versions_prefer_cache(
658
        const Version& spec_version, std::vector<Version>& version_path,
659
13
        const std::function<bool(int64_t, int64_t)>& validator) const {
660
13
    if (spec_version.first > spec_version.second) {
  Branch (660:9): [True: 0, False: 13]
661
0
        return Status::Error<INVALID_ARGUMENT, false>(
662
0
                "invalid specified version. spec_version={}-{}", spec_version.first,
663
0
                spec_version.second);
664
0
    }
665
666
13
    int64_t cur_idx = -1;
667
13
    for (size_t i = 0; i < _version_graph.size(); i++) {
  Branch (667:24): [True: 13, False: 0]
668
13
        if (_version_graph[i].value == spec_version.first) {
  Branch (668:13): [True: 13, False: 0]
669
13
            cur_idx = i;
670
13
            break;
671
13
        }
672
13
    }
673
674
13
    if (cur_idx < 0) {
  Branch (674:9): [True: 0, False: 13]
675
0
        return Status::InternalError<false>("failed to find path in version_graph. spec_version={}",
676
0
                                            spec_version.to_string());
677
0
    }
678
679
13
    int64_t end_value = spec_version.second + 1;
680
90
    while (_version_graph[cur_idx].value < end_value) {
  Branch (680:12): [True: 77, False: 13]
681
77
        int64_t next_idx = -1;
682
77
        int64_t first_idx = -1;
683
107
        for (const auto& it : _version_graph[cur_idx].edges) {
  Branch (683:29): [True: 107, False: 0]
684
            // Only consider incremental versions.
685
107
            if (_version_graph[it].value < _version_graph[cur_idx].value) {
  Branch (685:17): [True: 15, False: 92]
686
15
                break;
687
15
            }
688
92
            if (first_idx == -1) {
  Branch (688:17): [True: 77, False: 15]
689
77
                first_idx = it;
690
77
            }
691
692
92
            if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) {
  Branch (692:17): [True: 30, False: 62]
693
30
                continue;
694
30
            }
695
696
62
            next_idx = it;
697
62
            break;
698
92
        }
699
700
77
        if (next_idx > -1) {
  Branch (700:13): [True: 62, False: 15]
701
62
            version_path.emplace_back(_version_graph[cur_idx].value,
702
62
                                      _version_graph[next_idx].value - 1);
703
704
62
            cur_idx = next_idx;
705
62
        } else if (first_idx != -1) {
  Branch (705:20): [True: 15, False: 0]
706
            // if all edges are not in cache, use the first edge if possible
707
15
            version_path.emplace_back(_version_graph[cur_idx].value,
708
15
                                      _version_graph[first_idx].value - 1);
709
15
            cur_idx = first_idx;
710
15
        } else {
711
0
            return Status::OK();
712
0
        }
713
77
    }
714
13
    return Status::OK();
715
13
}
716
717
Status VersionGraph::capture_consistent_versions_with_validator(
718
        const Version& spec_version, std::vector<Version>& version_path,
719
9
        const std::function<bool(int64_t, int64_t)>& validator) const {
720
9
    if (spec_version.first > spec_version.second) {
  Branch (720:9): [True: 0, False: 9]
721
0
        return Status::Error<INVALID_ARGUMENT, false>(
722
0
                "invalid specified version. spec_version={}-{}", spec_version.first,
723
0
                spec_version.second);
724
0
    }
725
726
9
    int64_t cur_idx = -1;
727
9
    for (size_t i = 0; i < _version_graph.size(); i++) {
  Branch (727:24): [True: 9, False: 0]
728
9
        if (_version_graph[i].value == spec_version.first) {
  Branch (728:13): [True: 9, False: 0]
729
9
            cur_idx = i;
730
9
            break;
731
9
        }
732
9
    }
733
734
9
    if (cur_idx < 0) {
  Branch (734:9): [True: 0, False: 9]
735
0
        return Status::InternalError<false>("failed to find path in version_graph. spec_version={}",
736
0
                                            spec_version.to_string());
737
0
    }
738
739
9
    int64_t end_value = spec_version.second + 1;
740
50
    while (_version_graph[cur_idx].value < end_value) {
  Branch (740:12): [True: 48, False: 2]
741
48
        int64_t next_idx = -1;
742
60
        for (const auto& it : _version_graph[cur_idx].edges) {
  Branch (742:29): [True: 60, False: 0]
743
            // Only consider incremental versions.
744
60
            if (_version_graph[it].value < _version_graph[cur_idx].value) {
  Branch (744:17): [True: 7, False: 53]
745
7
                break;
746
7
            }
747
748
53
            if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) {
  Branch (748:17): [True: 12, False: 41]
749
12
                continue;
750
12
            }
751
752
41
            next_idx = it;
753
41
            break;
754
53
        }
755
756
48
        if (next_idx > -1) {
  Branch (756:13): [True: 41, False: 7]
757
41
            version_path.emplace_back(_version_graph[cur_idx].value,
758
41
                                      _version_graph[next_idx].value - 1);
759
760
41
            cur_idx = next_idx;
761
41
        } else {
762
7
            return Status::OK();
763
7
        }
764
48
    }
765
2
    return Status::OK();
766
9
}
767
768
Status VersionGraph::capture_consistent_versions_with_validator_mow(
769
        const Version& spec_version, std::vector<Version>& version_path,
770
11
        const std::function<bool(int64_t, int64_t)>& validator) const {
771
11
    if (spec_version.first > spec_version.second) {
  Branch (771:9): [True: 0, False: 11]
772
0
        return Status::Error<INVALID_ARGUMENT, false>(
773
0
                "invalid specified version. spec_version={}-{}", spec_version.first,
774
0
                spec_version.second);
775
0
    }
776
777
11
    int64_t cur_idx = -1;
778
11
    for (size_t i = 0; i < _version_graph.size(); i++) {
  Branch (778:24): [True: 11, False: 0]
779
11
        if (_version_graph[i].value == spec_version.first) {
  Branch (779:13): [True: 11, False: 0]
780
11
            cur_idx = i;
781
11
            break;
782
11
        }
783
11
    }
784
785
11
    if (cur_idx < 0) {
  Branch (785:9): [True: 0, False: 11]
786
0
        return Status::InternalError<false>("failed to find path in version_graph. spec_version={}",
787
0
                                            spec_version.to_string());
788
0
    }
789
790
11
    int64_t end_value = spec_version.second + 1;
791
60
    while (_version_graph[cur_idx].value < end_value) {
  Branch (791:12): [True: 55, False: 5]
792
55
        int64_t next_idx = -1;
793
65
        for (const auto& it : _version_graph[cur_idx].edges) {
  Branch (793:29): [True: 65, False: 0]
794
            // Only consider incremental versions.
795
65
            if (_version_graph[it].value < _version_graph[cur_idx].value) {
  Branch (795:17): [True: 0, False: 65]
796
0
                break;
797
0
            }
798
799
65
            if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) {
  Branch (799:17): [True: 16, False: 49]
800
16
                if (_version_graph[cur_idx].value + 1 == _version_graph[it].value) {
  Branch (800:21): [True: 6, False: 10]
801
6
                    break;
802
6
                }
803
10
                end_value = std::min(_version_graph[it].value, end_value);
804
10
                continue;
805
16
            }
806
807
49
            next_idx = it;
808
49
            break;
809
65
        }
810
811
55
        if (next_idx > -1) {
  Branch (811:13): [True: 49, False: 6]
812
49
            version_path.emplace_back(_version_graph[cur_idx].value,
813
49
                                      _version_graph[next_idx].value - 1);
814
815
49
            cur_idx = next_idx;
816
49
        } else {
817
6
            return Status::OK();
818
6
        }
819
55
    }
820
5
    return Status::OK();
821
11
}
822
823
2
double VersionGraph::get_orphan_vertex_ratio() {
824
2
    int64_t vertex_num = _version_graph.size();
825
2
    int64_t orphan_vertex_num = 0;
826
15
    for (auto& iter : _version_graph) {
  Branch (826:21): [True: 15, False: 2]
827
15
        if (iter.edges.empty()) {
  Branch (827:13): [True: 6, False: 9]
828
6
            ++orphan_vertex_num;
829
6
        }
830
15
    }
831
2
    return orphan_vertex_num / (double)vertex_num;
832
2
}
833
834
5
std::string VersionGraph::debug_string() const {
835
5
    std::stringstream ss;
836
5
    ss << "VersionGraph: [";
837
100
    for (size_t i = 0; i < _version_graph.size(); ++i) {
  Branch (837:24): [True: 95, False: 5]
838
95
        ss << "{value: " << _version_graph[i].value << ", edges: [";
839
208
        for (const auto& edge : _version_graph[i].edges) {
  Branch (839:31): [True: 208, False: 95]
840
208
            if (_version_graph[edge].value > _version_graph[i].value) {
  Branch (840:17): [True: 104, False: 104]
841
104
                ss << _version_graph[edge].value << ", ";
842
104
            }
843
208
        }
844
95
        ss << "]}, ";
845
95
    }
846
5
    ss << "]";
847
5
    return ss.str();
848
5
}
849
} // namespace doris