Coverage Report

Created: 2025-10-16 20:27

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/olap/version_graph.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <rapidjson/document.h>
21
#include <stdint.h>
22
23
#include <functional>
24
#include <map>
25
#include <memory>
26
#include <string>
27
#include <unordered_map>
28
#include <vector>
29
30
#include "common/status.h"
31
#include "olap/olap_common.h"
32
#include "olap/rowset/rowset_meta.h"
33
34
namespace doris {
35
36
template <typename R>
37
concept RowsetMetaSPtrRange = std::same_as<std::ranges::range_value_t<R>, RowsetMetaSharedPtr> &&
38
                              std::ranges::sized_range<R> && std::ranges::forward_range<R>;
39
/// VersionGraph class which is implemented to build and maintain total versions of rowsets.
40
/// This class use adjacency-matrix represent rowsets version and links. A vertex is a version
41
/// and a link is the _version object of a rowset (from start version to end version + 1).
42
/// Use this class, when given a spec_version, we can get a version path which is the shortest path
43
/// in the graph.
44
class VersionGraph {
45
public:
46
    /// Use rs_metas to construct the graph including vertex and edges, and return the
47
    /// max_version in metas.
48
    void construct_version_graph(RowsetMetaSPtrRange auto&& rs_metas, int64_t* max_version);
49
    void construct_version_graph(const RowsetMetaMapContainer& rs_metas, int64_t* max_version);
50
    /// Reconstruct the graph, begin construction the vertex vec and edges list will be cleared.
51
    void reconstruct_version_graph(RowsetMetaSPtrRange auto&&, int64_t* max_version);
52
    void reconstruct_version_graph(const RowsetMetaMapContainer& rs_metas, int64_t* max_version);
53
    /// Add a version to this graph, graph will add the version and edge in version.
54
    void add_version_to_graph(const Version& version);
55
    /// Delete a version from graph. Notice that this del operation only remove this edges and
56
    /// remain the vertex.
57
    Status delete_version_from_graph(const Version& version);
58
    /// Given a spec_version, this method can find a version path which is the shortest path
59
    /// in the graph. The version paths are added to version_path as return info.
60
    Status capture_consistent_versions(const Version& spec_version,
61
                                       std::vector<Version>* version_path) const;
62
63
    Status capture_consistent_versions_prefer_cache(
64
            const Version& spec_version, std::vector<Version>& version_path,
65
            const std::function<bool(int64_t, int64_t)>& validator) const;
66
67
    // Given a start, this method can find a version path which satisfy the following conditions:
68
    // 1. all edges satisfy the conditions specified by `validator` in the graph.
69
    // 2. the destination version is as far as possible.
70
    // 3. the path is the shortest path.
71
    // The version paths are added to version_path as return info.
72
    // If this version not in main version, version_path can be included expired rowset.
73
    // NOTE: this method may return edges which is in stale path
74
    //
75
    // @param validator: Function that takes (start_version, end_version) representing a rowset
76
    //                   and returns true if the rowset should be included in the path, false to skip it
77
    Status capture_consistent_versions_with_validator(
78
            const Version& spec_version, std::vector<Version>& version_path,
79
            const std::function<bool(int64_t, int64_t)>& validator) const;
80
81
    // Capture consistent versions with validator for merge-on-write (MOW) tables.
82
    // Similar to capture_consistent_versions_with_validator but with special handling for MOW tables.
83
    // For MOW tables, newly generated delete bitmap marks will be on the rowsets which are in newest layout.
84
    // So we can only capture rowsets which are in newest data layout to ensure data correctness.
85
    //
86
    // @param validator: Function that takes (start_version, end_version) representing a rowset
87
    //                   and returns true if the rowset is warmed up, false if not warmed up
88
    Status capture_consistent_versions_with_validator_mow(
89
            const Version& spec_version, std::vector<Version>& version_path,
90
            const std::function<bool(int64_t, int64_t)>& validator) const;
91
92
    // See comment of TimestampedVersionTracker's get_orphan_vertex_ratio();
93
    double get_orphan_vertex_ratio();
94
95
    std::string debug_string() const;
96
97
private:
98
    /// Private method add a version to graph.
99
    void _add_vertex_to_graph(int64_t vertex_value);
100
101
    // OLAP version contains two parts, [start_version, end_version]. In order
102
    // to construct graph, the OLAP version has two corresponding vertex, one
103
    // vertex's value is version.start_version, the other is
104
    // version.end_version + 1.
105
    // Use adjacency list to describe version graph.
106
    // In order to speed up the version capture, vertex's edges are sorted by version in descending order.
107
    std::vector<Vertex> _version_graph;
108
109
    // vertex value --> vertex_index of _version_graph
110
    // It is easy to find vertex index according to vertex value.
111
    std::unordered_map<int64_t, int64_t> _vertex_index_map;
112
};
113
114
/// TimestampedVersion class which is implemented to maintain multi-version path of rowsets.
115
/// This compaction info of a rowset includes start version, end version and the create time.
116
class TimestampedVersion {
117
public:
118
    /// TimestampedVersion construction function. Use rowset version and create time to build a TimestampedVersion.
119
    TimestampedVersion(const Version& version, int64_t create_time)
120
638
            : _version(version.first, version.second), _create_time(create_time) {}
121
122
638
    ~TimestampedVersion() {}
123
124
    /// Return the rowset version of TimestampedVersion record.
125
3.40k
    Version version() const { return _version; }
126
    /// Return the rowset create_time of TimestampedVersion record.
127
774
    int64_t get_create_time() { return _create_time; }
128
129
    /// Compare two version trackers.
130
0
    bool operator!=(const TimestampedVersion& rhs) const { return _version != rhs._version; }
131
132
    /// Compare two version trackers.
133
0
    bool operator==(const TimestampedVersion& rhs) const { return _version == rhs._version; }
134
135
    /// Judge if a tracker contains the other.
136
0
    bool contains(const TimestampedVersion& other) const {
137
0
        return _version.contains(other._version);
138
0
    }
139
140
private:
141
    Version _version;
142
    int64_t _create_time;
143
};
144
145
using TimestampedVersionSharedPtr = std::shared_ptr<TimestampedVersion>;
146
147
/// TimestampedVersionPathContainer class is used to maintain a path timestamped version path
148
/// and record the max create time in a path version. Once a timestamped version is added, the max_create_time
149
/// will compare with the version timestamp and be refreshed.
150
class TimestampedVersionPathContainer {
151
public:
152
    /// TimestampedVersionPathContainer construction function, max_create_time is assigned to 0.
153
136
    TimestampedVersionPathContainer() : _max_create_time(0) {}
154
155
    /// Return the max create time in a path version.
156
112
    int64_t max_create_time() { return _max_create_time; }
157
158
    /// Add a timestamped version to timestamped_versions_container. Once a timestamped version is added,
159
    /// the max_create_time will compare with the version timestamp and be refreshed.
160
    void add_timestamped_version(TimestampedVersionSharedPtr version);
161
162
    /// Return the timestamped_versions_container as const type.
163
    std::vector<TimestampedVersionSharedPtr>& timestamped_versions();
164
165
private:
166
    std::vector<TimestampedVersionSharedPtr> _timestamped_versions_container;
167
    int64_t _max_create_time;
168
};
169
170
using PathVersionListSharedPtr = std::shared_ptr<TimestampedVersionPathContainer>;
171
172
/// TimestampedVersionTracker class is responsible to track all rowsets version links of a tablet.
173
/// This class not only records the graph of all versions, but also records the paths which will be removed
174
/// after the path is expired.
175
class TimestampedVersionTracker {
176
public:
177
    /// Construct rowsets version tracker by main path rowset meta.
178
    void construct_versioned_tracker(RowsetMetaSPtrRange auto&& rs_metas);
179
    void construct_versioned_tracker(const RowsetMetaMapContainer& rs_metas);
180
181
    /// Construct rowsets version tracker by main path rowset meta and stale rowset meta.
182
    void construct_versioned_tracker(RowsetMetaSPtrRange auto&& rs_metas,
183
                                     RowsetMetaSPtrRange auto&& stale_metas);
184
    void construct_versioned_tracker(const RowsetMetaMapContainer& rs_metas,
185
                                     const RowsetMetaMapContainer& stale_metas);
186
187
    /// Recover rowsets version tracker from stale version path map. When delete operation fails, the
188
    /// tracker can be recovered from deleted stale_version_path_map.
189
    void recover_versioned_tracker(
190
            const std::map<int64_t, PathVersionListSharedPtr>& stale_version_path_map);
191
192
    /// Add a version to tracker, this version is a new version rowset, not merged rowset.
193
    void add_version(const Version& version);
194
195
5
    void delete_version(const Version& version) {
196
5
        static_cast<void>(_version_graph.delete_version_from_graph(version));
197
5
    }
198
199
    /// Add a version path with stale_rs_metas, this versions in version path
200
    /// are merged rowsets.  These rowsets are tracked and removed after they are expired.
201
    /// TabletManager sweep these rowsets using tracker by timing.
202
    void add_stale_path_version(const std::vector<RowsetMetaSharedPtr>& stale_rs_metas);
203
204
    /// Given a spec_version, this method can find a version path which is the shortest path
205
    /// in the graph. The version paths are added to version_path as return info.
206
    /// If this version not in main version, version_path can be included expired rowset.
207
    Status capture_consistent_versions(const Version& spec_version,
208
                                       std::vector<Version>* version_path) const;
209
210
    Status capture_consistent_versions_prefer_cache(
211
            const Version& spec_version, std::vector<Version>& version_path,
212
            const std::function<bool(int64_t, int64_t)>& validator) const;
213
214
    // Given a start, this method can find a version path which satisfy the following conditions:
215
    // 1. all edges satisfy the conditions specified by `validator` in the graph.
216
    // 2. the destination version is as far as possible.
217
    // 3. the path is the shortest path.
218
    // The version paths are added to version_path as return info.
219
    // If this version not in main version, version_path can be included expired rowset.
220
    // NOTE: this method may return edges which is in stale path
221
    //
222
    // @param validator: Function that takes (start_version, end_version) representing a rowset
223
    //                   and returns true if the rowset should be included in the path, false to skip it
224
    Status capture_consistent_versions_with_validator(
225
            const Version& spec_version, std::vector<Version>& version_path,
226
            const std::function<bool(int64_t, int64_t)>& validator) const;
227
228
    // Capture consistent versions with validator for merge-on-write (MOW) tables.
229
    // Similar to capture_consistent_versions_with_validator but with special handling for MOW tables.
230
    // For MOW tables, newly generated delete bitmap marks will be on the rowsets which are in newest layout.
231
    // So we can only capture rowsets which are in newest data layout to ensure data correctness.
232
    //
233
    // @param validator: Function that takes (start_version, end_version) representing a rowset
234
    //                   and returns true if the rowset is warmed up, false if not warmed up
235
    Status capture_consistent_versions_with_validator_mow(
236
            const Version& spec_version, std::vector<Version>& version_path,
237
            const std::function<bool(int64_t, int64_t)>& validator) const;
238
239
    /// Capture all expired path version.
240
    /// When the last rowset create time of a path greater than expired time  which can be expressed
241
    /// "now() - tablet_rowset_stale_sweep_time_sec" , this path will be remained.
242
    /// Otherwise, this path will be added to path_version.
243
    void capture_expired_paths(int64_t stale_sweep_endtime,
244
                               std::vector<int64_t>* path_version) const;
245
246
    /// Fetch all versions with a path_version.
247
    PathVersionListSharedPtr fetch_path_version_by_id(int64_t path_id);
248
249
    /// Fetch all versions with a path_version, at the same time remove this path from the tracker.
250
    /// Next time, fetch this path, it will return empty.
251
    PathVersionListSharedPtr fetch_and_delete_path_by_id(int64_t path_id);
252
253
    /// Print all expired version path in a tablet.
254
    std::string get_current_path_map_str();
255
256
    /// Get json document of _stale_version_path_map. Fill the path_id and version_path
257
    /// list in the document. The parameter path arr is used as return variable.
258
    void get_stale_version_path_json_doc(rapidjson::Document& path_arr);
259
260
    // Return proportion of orphan vertex in VersionGraph's _version_graph.
261
    // If a vertex is no longer the starting point of any edge, then this vertex is defined as orphan vertex
262
    double get_orphan_vertex_ratio();
263
264
    std::string debug_string() const;
265
266
private:
267
    /// Construct rowsets version tracker with main path rowset meta.
268
    void _construct_versioned_tracker(RowsetMetaSPtrRange auto&& rs_metas);
269
270
    /// init stale_version_path_map by main path rowset meta and stale rowset meta.
271
    void _init_stale_version_path_map(RowsetMetaSPtrRange auto&& rs_metas,
272
                                      RowsetMetaSPtrRange auto&& stale_metas);
273
274
    /// find a path in stale_map from first_version to second_version, stale_path is used as result.
275
    bool _find_path_from_stale_map(
276
            const std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>>&
277
                    stale_map,
278
            int64_t first_version, int64_t second_version,
279
            std::vector<RowsetMetaSharedPtr>* stale_path);
280
281
private:
282
    // This variable records the id of path version which will be dispatched to next path version,
283
    // it is not persisted.
284
    int64_t _next_path_id = 1;
285
286
    // path_version -> list of path version,
287
    // This variable is used to maintain the map from path version and it's all version.
288
    std::map<int64_t, PathVersionListSharedPtr> _stale_version_path_map;
289
290
    VersionGraph _version_graph;
291
};
292
293
void VersionGraph::reconstruct_version_graph(RowsetMetaSPtrRange auto&& rs_metas,
294
54
                                             int64_t* max_version) {
295
54
    _version_graph.clear();
296
54
    _vertex_index_map.clear();
297
298
54
    construct_version_graph(rs_metas, max_version);
299
54
}
_ZN5doris12VersionGraph25reconstruct_version_graphITkNS_19RowsetMetaSPtrRangeERSt6vectorISt10shared_ptrINS_10RowsetMetaEESaIS5_EEEEvOT_Pl
Line
Count
Source
294
14
                                             int64_t* max_version) {
295
14
    _version_graph.clear();
296
14
    _vertex_index_map.clear();
297
298
14
    construct_version_graph(rs_metas, max_version);
299
14
}
_ZN5doris12VersionGraph25reconstruct_version_graphITkNS_19RowsetMetaSPtrRangeERNSt6ranges13elements_viewINS2_8ref_viewIKSt13unordered_mapINS_7VersionESt10shared_ptrINS_10RowsetMetaEENS_13HashOfVersionESt8equal_toIS6_ESaISt4pairIKS6_S9_EEEEELm1EEEEEvOT_Pl
Line
Count
Source
294
40
                                             int64_t* max_version) {
295
40
    _version_graph.clear();
296
40
    _vertex_index_map.clear();
297
298
40
    construct_version_graph(rs_metas, max_version);
299
40
}
Unexecuted instantiation: _ZN5doris12VersionGraph25reconstruct_version_graphITkNS_19RowsetMetaSPtrRangeENSt6ranges13elements_viewINS2_8ref_viewIKSt13unordered_mapINS_7VersionESt10shared_ptrINS_10RowsetMetaEENS_13HashOfVersionESt8equal_toIS6_ESaISt4pairIKS6_S9_EEEEELm1EEEEEvOT_Pl
300
301
void VersionGraph::construct_version_graph(RowsetMetaSPtrRange auto&& rs_metas,
302
58
                                           int64_t* max_version) {
303
58
    if (std::ranges::empty(rs_metas)) {
304
0
        VLOG_NOTICE << "there is no version in the header.";
305
0
        return;
306
0
    }
307
308
    // Distill vertex values from versions in TabletMeta.
309
58
    std::vector<int64_t> vertex_values;
310
58
    vertex_values.reserve(2 * std::ranges::size(rs_metas));
311
312
333
    for (const auto& rs : rs_metas) {
313
333
        vertex_values.push_back(rs->start_version());
314
333
        vertex_values.push_back(rs->end_version() + 1);
315
333
        if (max_version != nullptr and *max_version < rs->end_version()) {
316
115
            *max_version = rs->end_version();
317
115
        }
318
333
    }
319
58
    std::sort(vertex_values.begin(), vertex_values.end());
320
321
    // Items in `vertex_values` are sorted, but not unique.
322
    // we choose unique items in `vertex_values` to create vertexes.
323
58
    int64_t last_vertex_value = -1;
324
724
    for (size_t i = 0; i < vertex_values.size(); ++i) {
325
666
        if (i != 0 && vertex_values[i] == last_vertex_value) {
326
291
            continue;
327
291
        }
328
329
        // Add vertex to graph.
330
375
        _add_vertex_to_graph(vertex_values[i]);
331
375
        last_vertex_value = vertex_values[i];
332
375
    }
333
    // Create edges for version graph according to TabletMeta's versions.
334
333
    for (const auto& rs : rs_metas) {
335
        // Versions in header are unique.
336
        // We ensure `_vertex_index_map` has its `start_version`.
337
333
        int64_t start_vertex_index = _vertex_index_map[rs->start_version()];
338
333
        int64_t end_vertex_index = _vertex_index_map[rs->end_version() + 1];
339
        // Add one edge from `start_version` to `end_version`.
340
333
        _version_graph[start_vertex_index].edges.push_front(end_vertex_index);
341
        // Add reverse edge from `end_version` to `start_version`.
342
333
        _version_graph[end_vertex_index].edges.push_front(start_vertex_index);
343
333
    }
344
345
    // Sort edges by version in descending order.
346
375
    for (auto& vertex : _version_graph) {
347
375
        vertex.edges.sort([this](const int& vertex_idx_a, const int& vertex_idx_b) {
348
340
            return _version_graph[vertex_idx_a].value > _version_graph[vertex_idx_b].value;
349
340
        });
_ZZN5doris12VersionGraph23construct_version_graphITkNS_19RowsetMetaSPtrRangeERSt6vectorISt10shared_ptrINS_10RowsetMetaEESaIS5_EEEEvOT_PlENKUlRKiSD_E_clESD_SD_
Line
Count
Source
347
175
        vertex.edges.sort([this](const int& vertex_idx_a, const int& vertex_idx_b) {
348
175
            return _version_graph[vertex_idx_a].value > _version_graph[vertex_idx_b].value;
349
175
        });
_ZZN5doris12VersionGraph23construct_version_graphITkNS_19RowsetMetaSPtrRangeERNSt6ranges13elements_viewINS2_8ref_viewIKSt13unordered_mapINS_7VersionESt10shared_ptrINS_10RowsetMetaEENS_13HashOfVersionESt8equal_toIS6_ESaISt4pairIKS6_S9_EEEEELm1EEEEEvOT_PlENKUlRKiSQ_E_clESQ_SQ_
Line
Count
Source
347
165
        vertex.edges.sort([this](const int& vertex_idx_a, const int& vertex_idx_b) {
348
165
            return _version_graph[vertex_idx_a].value > _version_graph[vertex_idx_b].value;
349
165
        });
Unexecuted instantiation: _ZZN5doris12VersionGraph23construct_version_graphITkNS_19RowsetMetaSPtrRangeENSt6ranges13elements_viewINS2_8ref_viewIKSt13unordered_mapINS_7VersionESt10shared_ptrINS_10RowsetMetaEENS_13HashOfVersionESt8equal_toIS6_ESaISt4pairIKS6_S9_EEEEELm1EEEEEvOT_PlENKUlRKiSP_E_clESP_SP_
350
375
    }
351
58
}
_ZN5doris12VersionGraph23construct_version_graphITkNS_19RowsetMetaSPtrRangeERSt6vectorISt10shared_ptrINS_10RowsetMetaEESaIS5_EEEEvOT_Pl
Line
Count
Source
302
18
                                           int64_t* max_version) {
303
18
    if (std::ranges::empty(rs_metas)) {
304
0
        VLOG_NOTICE << "there is no version in the header.";
305
0
        return;
306
0
    }
307
308
    // Distill vertex values from versions in TabletMeta.
309
18
    std::vector<int64_t> vertex_values;
310
18
    vertex_values.reserve(2 * std::ranges::size(rs_metas));
311
312
127
    for (const auto& rs : rs_metas) {
313
127
        vertex_values.push_back(rs->start_version());
314
127
        vertex_values.push_back(rs->end_version() + 1);
315
127
        if (max_version != nullptr and *max_version < rs->end_version()) {
316
72
            *max_version = rs->end_version();
317
72
        }
318
127
    }
319
18
    std::sort(vertex_values.begin(), vertex_values.end());
320
321
    // Items in `vertex_values` are sorted, but not unique.
322
    // we choose unique items in `vertex_values` to create vertexes.
323
18
    int64_t last_vertex_value = -1;
324
272
    for (size_t i = 0; i < vertex_values.size(); ++i) {
325
254
        if (i != 0 && vertex_values[i] == last_vertex_value) {
326
126
            continue;
327
126
        }
328
329
        // Add vertex to graph.
330
128
        _add_vertex_to_graph(vertex_values[i]);
331
128
        last_vertex_value = vertex_values[i];
332
128
    }
333
    // Create edges for version graph according to TabletMeta's versions.
334
127
    for (const auto& rs : rs_metas) {
335
        // Versions in header are unique.
336
        // We ensure `_vertex_index_map` has its `start_version`.
337
127
        int64_t start_vertex_index = _vertex_index_map[rs->start_version()];
338
127
        int64_t end_vertex_index = _vertex_index_map[rs->end_version() + 1];
339
        // Add one edge from `start_version` to `end_version`.
340
127
        _version_graph[start_vertex_index].edges.push_front(end_vertex_index);
341
        // Add reverse edge from `end_version` to `start_version`.
342
127
        _version_graph[end_vertex_index].edges.push_front(start_vertex_index);
343
127
    }
344
345
    // Sort edges by version in descending order.
346
128
    for (auto& vertex : _version_graph) {
347
128
        vertex.edges.sort([this](const int& vertex_idx_a, const int& vertex_idx_b) {
348
128
            return _version_graph[vertex_idx_a].value > _version_graph[vertex_idx_b].value;
349
128
        });
350
128
    }
351
18
}
_ZN5doris12VersionGraph23construct_version_graphITkNS_19RowsetMetaSPtrRangeERNSt6ranges13elements_viewINS2_8ref_viewIKSt13unordered_mapINS_7VersionESt10shared_ptrINS_10RowsetMetaEENS_13HashOfVersionESt8equal_toIS6_ESaISt4pairIKS6_S9_EEEEELm1EEEEEvOT_Pl
Line
Count
Source
302
40
                                           int64_t* max_version) {
303
40
    if (std::ranges::empty(rs_metas)) {
304
0
        VLOG_NOTICE << "there is no version in the header.";
305
0
        return;
306
0
    }
307
308
    // Distill vertex values from versions in TabletMeta.
309
40
    std::vector<int64_t> vertex_values;
310
40
    vertex_values.reserve(2 * std::ranges::size(rs_metas));
311
312
206
    for (const auto& rs : rs_metas) {
313
206
        vertex_values.push_back(rs->start_version());
314
206
        vertex_values.push_back(rs->end_version() + 1);
315
206
        if (max_version != nullptr and *max_version < rs->end_version()) {
316
43
            *max_version = rs->end_version();
317
43
        }
318
206
    }
319
40
    std::sort(vertex_values.begin(), vertex_values.end());
320
321
    // Items in `vertex_values` are sorted, but not unique.
322
    // we choose unique items in `vertex_values` to create vertexes.
323
40
    int64_t last_vertex_value = -1;
324
452
    for (size_t i = 0; i < vertex_values.size(); ++i) {
325
412
        if (i != 0 && vertex_values[i] == last_vertex_value) {
326
165
            continue;
327
165
        }
328
329
        // Add vertex to graph.
330
247
        _add_vertex_to_graph(vertex_values[i]);
331
247
        last_vertex_value = vertex_values[i];
332
247
    }
333
    // Create edges for version graph according to TabletMeta's versions.
334
206
    for (const auto& rs : rs_metas) {
335
        // Versions in header are unique.
336
        // We ensure `_vertex_index_map` has its `start_version`.
337
206
        int64_t start_vertex_index = _vertex_index_map[rs->start_version()];
338
206
        int64_t end_vertex_index = _vertex_index_map[rs->end_version() + 1];
339
        // Add one edge from `start_version` to `end_version`.
340
206
        _version_graph[start_vertex_index].edges.push_front(end_vertex_index);
341
        // Add reverse edge from `end_version` to `start_version`.
342
206
        _version_graph[end_vertex_index].edges.push_front(start_vertex_index);
343
206
    }
344
345
    // Sort edges by version in descending order.
346
247
    for (auto& vertex : _version_graph) {
347
247
        vertex.edges.sort([this](const int& vertex_idx_a, const int& vertex_idx_b) {
348
247
            return _version_graph[vertex_idx_a].value > _version_graph[vertex_idx_b].value;
349
247
        });
350
247
    }
351
40
}
Unexecuted instantiation: _ZN5doris12VersionGraph23construct_version_graphITkNS_19RowsetMetaSPtrRangeENSt6ranges13elements_viewINS2_8ref_viewIKSt13unordered_mapINS_7VersionESt10shared_ptrINS_10RowsetMetaEENS_13HashOfVersionESt8equal_toIS6_ESaISt4pairIKS6_S9_EEEEELm1EEEEEvOT_Pl
352
353
53
void TimestampedVersionTracker::_construct_versioned_tracker(RowsetMetaSPtrRange auto&& rs_metas) {
354
53
    int64_t max_version = 0;
355
356
    // construct the rowset graph
357
53
    _version_graph.reconstruct_version_graph(rs_metas, &max_version);
358
53
}
_ZN5doris25TimestampedVersionTracker28_construct_versioned_trackerITkNS_19RowsetMetaSPtrRangeERSt6vectorISt10shared_ptrINS_10RowsetMetaEESaIS5_EEEEvOT_
Line
Count
Source
353
13
void TimestampedVersionTracker::_construct_versioned_tracker(RowsetMetaSPtrRange auto&& rs_metas) {
354
13
    int64_t max_version = 0;
355
356
    // construct the rowset graph
357
13
    _version_graph.reconstruct_version_graph(rs_metas, &max_version);
358
13
}
_ZN5doris25TimestampedVersionTracker28_construct_versioned_trackerITkNS_19RowsetMetaSPtrRangeERNSt6ranges13elements_viewINS2_8ref_viewIKSt13unordered_mapINS_7VersionESt10shared_ptrINS_10RowsetMetaEENS_13HashOfVersionESt8equal_toIS6_ESaISt4pairIKS6_S9_EEEEELm1EEEEEvOT_
Line
Count
Source
353
40
void TimestampedVersionTracker::_construct_versioned_tracker(RowsetMetaSPtrRange auto&& rs_metas) {
354
40
    int64_t max_version = 0;
355
356
    // construct the rowset graph
357
40
    _version_graph.reconstruct_version_graph(rs_metas, &max_version);
358
40
}
359
360
12
void TimestampedVersionTracker::construct_versioned_tracker(RowsetMetaSPtrRange auto&& rs_metas) {
361
12
    if (std::ranges::empty(rs_metas)) {
362
0
        VLOG_NOTICE << "there is no version in the header.";
363
0
        return;
364
0
    }
365
12
    _stale_version_path_map.clear();
366
12
    _next_path_id = 1;
367
12
    _construct_versioned_tracker(rs_metas);
368
12
}
_ZN5doris25TimestampedVersionTracker27construct_versioned_trackerITkNS_19RowsetMetaSPtrRangeERSt6vectorISt10shared_ptrINS_10RowsetMetaEESaIS5_EEEEvOT_
Line
Count
Source
360
12
void TimestampedVersionTracker::construct_versioned_tracker(RowsetMetaSPtrRange auto&& rs_metas) {
361
12
    if (std::ranges::empty(rs_metas)) {
362
0
        VLOG_NOTICE << "there is no version in the header.";
363
0
        return;
364
0
    }
365
12
    _stale_version_path_map.clear();
366
12
    _next_path_id = 1;
367
12
    _construct_versioned_tracker(rs_metas);
368
12
}
Unexecuted instantiation: _ZN5doris25TimestampedVersionTracker27construct_versioned_trackerITkNS_19RowsetMetaSPtrRangeENSt6ranges13elements_viewINS2_8ref_viewIKSt13unordered_mapINS_7VersionESt10shared_ptrINS_10RowsetMetaEENS_13HashOfVersionESt8equal_toIS6_ESaISt4pairIKS6_S9_EEEEELm1EEEEEvOT_
369
370
void TimestampedVersionTracker::construct_versioned_tracker(
371
667
        RowsetMetaSPtrRange auto&& rs_metas, RowsetMetaSPtrRange auto&& stale_metas) {
372
667
    if (std::ranges::empty(rs_metas)) {
373
626
        VLOG_NOTICE << "there is no version in the header.";
374
626
        return;
375
626
    }
376
41
    _stale_version_path_map.clear();
377
41
    _next_path_id = 1;
378
41
    _construct_versioned_tracker(rs_metas);
379
380
    // Init `_stale_version_path_map`.
381
41
    _init_stale_version_path_map(rs_metas, stale_metas);
382
41
}
_ZN5doris25TimestampedVersionTracker27construct_versioned_trackerITkNS_19RowsetMetaSPtrRangeERSt6vectorISt10shared_ptrINS_10RowsetMetaEESaIS5_EETkNS_19RowsetMetaSPtrRangeES8_EEvOT_OT0_
Line
Count
Source
371
1
        RowsetMetaSPtrRange auto&& rs_metas, RowsetMetaSPtrRange auto&& stale_metas) {
372
1
    if (std::ranges::empty(rs_metas)) {
373
0
        VLOG_NOTICE << "there is no version in the header.";
374
0
        return;
375
0
    }
376
1
    _stale_version_path_map.clear();
377
1
    _next_path_id = 1;
378
1
    _construct_versioned_tracker(rs_metas);
379
380
    // Init `_stale_version_path_map`.
381
1
    _init_stale_version_path_map(rs_metas, stale_metas);
382
1
}
_ZN5doris25TimestampedVersionTracker27construct_versioned_trackerITkNS_19RowsetMetaSPtrRangeENSt6ranges13elements_viewINS2_8ref_viewIKSt13unordered_mapINS_7VersionESt10shared_ptrINS_10RowsetMetaEENS_13HashOfVersionESt8equal_toIS6_ESaISt4pairIKS6_S9_EEEEELm1EEETkNS_19RowsetMetaSPtrRangeESK_EEvOT_OT0_
Line
Count
Source
371
666
        RowsetMetaSPtrRange auto&& rs_metas, RowsetMetaSPtrRange auto&& stale_metas) {
372
666
    if (std::ranges::empty(rs_metas)) {
373
626
        VLOG_NOTICE << "there is no version in the header.";
374
626
        return;
375
626
    }
376
40
    _stale_version_path_map.clear();
377
40
    _next_path_id = 1;
378
40
    _construct_versioned_tracker(rs_metas);
379
380
    // Init `_stale_version_path_map`.
381
40
    _init_stale_version_path_map(rs_metas, stale_metas);
382
40
}
383
384
void TimestampedVersionTracker::_init_stale_version_path_map(
385
41
        RowsetMetaSPtrRange auto&& rs_metas, RowsetMetaSPtrRange auto&& stale_metas) {
386
41
    if (std::ranges::empty(stale_metas)) {
387
40
        return;
388
40
    }
389
390
    // Sort stale meta by version diff (second version - first version).
391
1
    std::list<RowsetMetaSharedPtr> sorted_stale_metas;
392
7
    for (const auto& rs : stale_metas) {
393
7
        sorted_stale_metas.emplace_back(rs);
394
7
    }
395
396
    // 1. sort the existing rowsets by version in ascending order.
397
14
    sorted_stale_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
398
        // Compare by version diff between `version.first` and `version.second`.
399
14
        int64_t a_diff = a->version().second - a->version().first;
400
14
        int64_t b_diff = b->version().second - b->version().first;
401
402
14
        int64_t diff = a_diff - b_diff;
403
14
        if (diff < 0) {
404
5
            return true;
405
9
        } else if (diff > 0) {
406
4
            return false;
407
4
        }
408
        // When the version diff is equal, compare the rowset`s stale time
409
5
        return a->stale_at() < b->stale_at();
410
14
    });
_ZZN5doris25TimestampedVersionTracker28_init_stale_version_path_mapITkNS_19RowsetMetaSPtrRangeERSt6vectorISt10shared_ptrINS_10RowsetMetaEESaIS5_EETkNS_19RowsetMetaSPtrRangeES8_EEvOT_OT0_ENKUlRKS5_SE_E_clESE_SE_
Line
Count
Source
397
14
    sorted_stale_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
398
        // Compare by version diff between `version.first` and `version.second`.
399
14
        int64_t a_diff = a->version().second - a->version().first;
400
14
        int64_t b_diff = b->version().second - b->version().first;
401
402
14
        int64_t diff = a_diff - b_diff;
403
14
        if (diff < 0) {
404
5
            return true;
405
9
        } else if (diff > 0) {
406
4
            return false;
407
4
        }
408
        // When the version diff is equal, compare the rowset`s stale time
409
5
        return a->stale_at() < b->stale_at();
410
14
    });
Unexecuted instantiation: _ZZN5doris25TimestampedVersionTracker28_init_stale_version_path_mapITkNS_19RowsetMetaSPtrRangeERNSt6ranges13elements_viewINS2_8ref_viewIKSt13unordered_mapINS_7VersionESt10shared_ptrINS_10RowsetMetaEENS_13HashOfVersionESt8equal_toIS6_ESaISt4pairIKS6_S9_EEEEELm1EEETkNS_19RowsetMetaSPtrRangeESL_EEvOT_OT0_ENKUlRKS9_SR_E_clESR_SR_
411
412
    // first_version -> (second_version -> rowset_meta)
413
1
    std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>> stale_map;
414
415
    // 2. generate stale path from stale_metas. traverse sorted_stale_metas and each time add stale_meta to stale_map.
416
    // when a stale path in stale_map can replace stale_meta in sorted_stale_metas, stale_map remove rowset_metas of a stale path
417
    // and add the path to `_stale_version_path_map`.
418
7
    for (auto& stale_meta : sorted_stale_metas) {
419
7
        std::vector<RowsetMetaSharedPtr> stale_path;
420
        // 2.1 find a path in `stale_map` can replace current `stale_meta` version.
421
7
        bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(),
422
7
                                           stale_meta->end_version(), &stale_path);
423
424
        // 2.2 add version to `version_graph`.
425
7
        Version stale_meta_version = stale_meta->version();
426
7
        add_version(stale_meta_version);
427
428
        // 2.3 find the path.
429
7
        if (r) {
430
            // Add the path to `_stale_version_path_map`.
431
1
            add_stale_path_version(stale_path);
432
            // Remove `stale_path` from `stale_map`.
433
2
            for (auto stale_item : stale_path) {
434
2
                stale_map[stale_item->start_version()].erase(stale_item->end_version());
435
436
2
                if (stale_map[stale_item->start_version()].empty()) {
437
2
                    stale_map.erase(stale_item->start_version());
438
2
                }
439
2
            }
440
1
        }
441
442
        // 2.4 add `stale_meta` to `stale_map`.
443
7
        auto start_iter = stale_map.find(stale_meta->start_version());
444
7
        if (start_iter != stale_map.end()) {
445
0
            start_iter->second[stale_meta->end_version()] = stale_meta;
446
7
        } else {
447
7
            std::unordered_map<int64_t, RowsetMetaSharedPtr> item;
448
7
            item[stale_meta->end_version()] = stale_meta;
449
7
            stale_map[stale_meta->start_version()] = std::move(item);
450
7
        }
451
7
    }
452
453
    // 3. generate stale path from `rs_metas`.
454
5
    for (const auto& stale_meta : rs_metas) {
455
5
        std::vector<RowsetMetaSharedPtr> stale_path;
456
        // 3.1 find a path in stale_map can replace current `stale_meta` version.
457
5
        bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(),
458
5
                                           stale_meta->end_version(), &stale_path);
459
460
        // 3.2 find the path.
461
5
        if (r) {
462
            // Add the path to `_stale_version_path_map`.
463
2
            add_stale_path_version(stale_path);
464
            // Remove `stale_path` from `stale_map`.
465
4
            for (auto stale_item : stale_path) {
466
4
                stale_map[stale_item->start_version()].erase(stale_item->end_version());
467
468
4
                if (stale_map[stale_item->start_version()].empty()) {
469
4
                    stale_map.erase(stale_item->start_version());
470
4
                }
471
4
            }
472
2
        }
473
5
    }
474
475
    // 4. process remain stale `rowset_meta` in `stale_map`.
476
1
    auto map_iter = stale_map.begin();
477
2
    while (map_iter != stale_map.end()) {
478
1
        auto second_iter = map_iter->second.begin();
479
2
        while (second_iter != map_iter->second.end()) {
480
            // Each remain stale `rowset_meta` generate a stale path.
481
1
            std::vector<RowsetMetaSharedPtr> stale_path;
482
1
            stale_path.push_back(second_iter->second);
483
1
            add_stale_path_version(stale_path);
484
485
1
            second_iter++;
486
1
        }
487
1
        map_iter++;
488
1
    }
489
1
}
_ZN5doris25TimestampedVersionTracker28_init_stale_version_path_mapITkNS_19RowsetMetaSPtrRangeERSt6vectorISt10shared_ptrINS_10RowsetMetaEESaIS5_EETkNS_19RowsetMetaSPtrRangeES8_EEvOT_OT0_
Line
Count
Source
385
1
        RowsetMetaSPtrRange auto&& rs_metas, RowsetMetaSPtrRange auto&& stale_metas) {
386
1
    if (std::ranges::empty(stale_metas)) {
387
0
        return;
388
0
    }
389
390
    // Sort stale meta by version diff (second version - first version).
391
1
    std::list<RowsetMetaSharedPtr> sorted_stale_metas;
392
7
    for (const auto& rs : stale_metas) {
393
7
        sorted_stale_metas.emplace_back(rs);
394
7
    }
395
396
    // 1. sort the existing rowsets by version in ascending order.
397
1
    sorted_stale_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
398
        // Compare by version diff between `version.first` and `version.second`.
399
1
        int64_t a_diff = a->version().second - a->version().first;
400
1
        int64_t b_diff = b->version().second - b->version().first;
401
402
1
        int64_t diff = a_diff - b_diff;
403
1
        if (diff < 0) {
404
1
            return true;
405
1
        } else if (diff > 0) {
406
1
            return false;
407
1
        }
408
        // When the version diff is equal, compare the rowset`s stale time
409
1
        return a->stale_at() < b->stale_at();
410
1
    });
411
412
    // first_version -> (second_version -> rowset_meta)
413
1
    std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>> stale_map;
414
415
    // 2. generate stale path from stale_metas. traverse sorted_stale_metas and each time add stale_meta to stale_map.
416
    // when a stale path in stale_map can replace stale_meta in sorted_stale_metas, stale_map remove rowset_metas of a stale path
417
    // and add the path to `_stale_version_path_map`.
418
7
    for (auto& stale_meta : sorted_stale_metas) {
419
7
        std::vector<RowsetMetaSharedPtr> stale_path;
420
        // 2.1 find a path in `stale_map` can replace current `stale_meta` version.
421
7
        bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(),
422
7
                                           stale_meta->end_version(), &stale_path);
423
424
        // 2.2 add version to `version_graph`.
425
7
        Version stale_meta_version = stale_meta->version();
426
7
        add_version(stale_meta_version);
427
428
        // 2.3 find the path.
429
7
        if (r) {
430
            // Add the path to `_stale_version_path_map`.
431
1
            add_stale_path_version(stale_path);
432
            // Remove `stale_path` from `stale_map`.
433
2
            for (auto stale_item : stale_path) {
434
2
                stale_map[stale_item->start_version()].erase(stale_item->end_version());
435
436
2
                if (stale_map[stale_item->start_version()].empty()) {
437
2
                    stale_map.erase(stale_item->start_version());
438
2
                }
439
2
            }
440
1
        }
441
442
        // 2.4 add `stale_meta` to `stale_map`.
443
7
        auto start_iter = stale_map.find(stale_meta->start_version());
444
7
        if (start_iter != stale_map.end()) {
445
0
            start_iter->second[stale_meta->end_version()] = stale_meta;
446
7
        } else {
447
7
            std::unordered_map<int64_t, RowsetMetaSharedPtr> item;
448
7
            item[stale_meta->end_version()] = stale_meta;
449
7
            stale_map[stale_meta->start_version()] = std::move(item);
450
7
        }
451
7
    }
452
453
    // 3. generate stale path from `rs_metas`.
454
5
    for (const auto& stale_meta : rs_metas) {
455
5
        std::vector<RowsetMetaSharedPtr> stale_path;
456
        // 3.1 find a path in stale_map can replace current `stale_meta` version.
457
5
        bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(),
458
5
                                           stale_meta->end_version(), &stale_path);
459
460
        // 3.2 find the path.
461
5
        if (r) {
462
            // Add the path to `_stale_version_path_map`.
463
2
            add_stale_path_version(stale_path);
464
            // Remove `stale_path` from `stale_map`.
465
4
            for (auto stale_item : stale_path) {
466
4
                stale_map[stale_item->start_version()].erase(stale_item->end_version());
467
468
4
                if (stale_map[stale_item->start_version()].empty()) {
469
4
                    stale_map.erase(stale_item->start_version());
470
4
                }
471
4
            }
472
2
        }
473
5
    }
474
475
    // 4. process remain stale `rowset_meta` in `stale_map`.
476
1
    auto map_iter = stale_map.begin();
477
2
    while (map_iter != stale_map.end()) {
478
1
        auto second_iter = map_iter->second.begin();
479
2
        while (second_iter != map_iter->second.end()) {
480
            // Each remain stale `rowset_meta` generate a stale path.
481
1
            std::vector<RowsetMetaSharedPtr> stale_path;
482
1
            stale_path.push_back(second_iter->second);
483
1
            add_stale_path_version(stale_path);
484
485
1
            second_iter++;
486
1
        }
487
1
        map_iter++;
488
1
    }
489
1
}
_ZN5doris25TimestampedVersionTracker28_init_stale_version_path_mapITkNS_19RowsetMetaSPtrRangeERNSt6ranges13elements_viewINS2_8ref_viewIKSt13unordered_mapINS_7VersionESt10shared_ptrINS_10RowsetMetaEENS_13HashOfVersionESt8equal_toIS6_ESaISt4pairIKS6_S9_EEEEELm1EEETkNS_19RowsetMetaSPtrRangeESL_EEvOT_OT0_
Line
Count
Source
385
40
        RowsetMetaSPtrRange auto&& rs_metas, RowsetMetaSPtrRange auto&& stale_metas) {
386
40
    if (std::ranges::empty(stale_metas)) {
387
40
        return;
388
40
    }
389
390
    // Sort stale meta by version diff (second version - first version).
391
0
    std::list<RowsetMetaSharedPtr> sorted_stale_metas;
392
0
    for (const auto& rs : stale_metas) {
393
0
        sorted_stale_metas.emplace_back(rs);
394
0
    }
395
396
    // 1. sort the existing rowsets by version in ascending order.
397
0
    sorted_stale_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
398
        // Compare by version diff between `version.first` and `version.second`.
399
0
        int64_t a_diff = a->version().second - a->version().first;
400
0
        int64_t b_diff = b->version().second - b->version().first;
401
402
0
        int64_t diff = a_diff - b_diff;
403
0
        if (diff < 0) {
404
0
            return true;
405
0
        } else if (diff > 0) {
406
0
            return false;
407
0
        }
408
        // When the version diff is equal, compare the rowset`s stale time
409
0
        return a->stale_at() < b->stale_at();
410
0
    });
411
412
    // first_version -> (second_version -> rowset_meta)
413
0
    std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>> stale_map;
414
415
    // 2. generate stale path from stale_metas. traverse sorted_stale_metas and each time add stale_meta to stale_map.
416
    // when a stale path in stale_map can replace stale_meta in sorted_stale_metas, stale_map remove rowset_metas of a stale path
417
    // and add the path to `_stale_version_path_map`.
418
0
    for (auto& stale_meta : sorted_stale_metas) {
419
0
        std::vector<RowsetMetaSharedPtr> stale_path;
420
        // 2.1 find a path in `stale_map` can replace current `stale_meta` version.
421
0
        bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(),
422
0
                                           stale_meta->end_version(), &stale_path);
423
424
        // 2.2 add version to `version_graph`.
425
0
        Version stale_meta_version = stale_meta->version();
426
0
        add_version(stale_meta_version);
427
428
        // 2.3 find the path.
429
0
        if (r) {
430
            // Add the path to `_stale_version_path_map`.
431
0
            add_stale_path_version(stale_path);
432
            // Remove `stale_path` from `stale_map`.
433
0
            for (auto stale_item : stale_path) {
434
0
                stale_map[stale_item->start_version()].erase(stale_item->end_version());
435
436
0
                if (stale_map[stale_item->start_version()].empty()) {
437
0
                    stale_map.erase(stale_item->start_version());
438
0
                }
439
0
            }
440
0
        }
441
442
        // 2.4 add `stale_meta` to `stale_map`.
443
0
        auto start_iter = stale_map.find(stale_meta->start_version());
444
0
        if (start_iter != stale_map.end()) {
445
0
            start_iter->second[stale_meta->end_version()] = stale_meta;
446
0
        } else {
447
0
            std::unordered_map<int64_t, RowsetMetaSharedPtr> item;
448
0
            item[stale_meta->end_version()] = stale_meta;
449
0
            stale_map[stale_meta->start_version()] = std::move(item);
450
0
        }
451
0
    }
452
453
    // 3. generate stale path from `rs_metas`.
454
0
    for (const auto& stale_meta : rs_metas) {
455
0
        std::vector<RowsetMetaSharedPtr> stale_path;
456
        // 3.1 find a path in stale_map can replace current `stale_meta` version.
457
0
        bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(),
458
0
                                           stale_meta->end_version(), &stale_path);
459
460
        // 3.2 find the path.
461
0
        if (r) {
462
            // Add the path to `_stale_version_path_map`.
463
0
            add_stale_path_version(stale_path);
464
            // Remove `stale_path` from `stale_map`.
465
0
            for (auto stale_item : stale_path) {
466
0
                stale_map[stale_item->start_version()].erase(stale_item->end_version());
467
468
0
                if (stale_map[stale_item->start_version()].empty()) {
469
0
                    stale_map.erase(stale_item->start_version());
470
0
                }
471
0
            }
472
0
        }
473
0
    }
474
475
    // 4. process remain stale `rowset_meta` in `stale_map`.
476
0
    auto map_iter = stale_map.begin();
477
0
    while (map_iter != stale_map.end()) {
478
0
        auto second_iter = map_iter->second.begin();
479
0
        while (second_iter != map_iter->second.end()) {
480
            // Each remain stale `rowset_meta` generate a stale path.
481
0
            std::vector<RowsetMetaSharedPtr> stale_path;
482
0
            stale_path.push_back(second_iter->second);
483
0
            add_stale_path_version(stale_path);
484
485
0
            second_iter++;
486
0
        }
487
0
        map_iter++;
488
0
    }
489
0
}
490
491
} // namespace doris