/root/doris/be/src/olap/version_graph.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "olap/version_graph.h" |
19 | | |
20 | | #include <cctz/time_zone.h> |
21 | | #include <stddef.h> |
22 | | |
23 | | #include <algorithm> |
24 | | // IWYU pragma: no_include <bits/chrono.h> |
25 | | #include <chrono> // IWYU pragma: keep |
26 | | #include <list> |
27 | | #include <memory> |
28 | | #include <optional> |
29 | | #include <ostream> |
30 | | #include <utility> |
31 | | |
32 | | #include "common/logging.h" |
33 | | |
34 | | namespace doris { |
35 | | using namespace ErrorCode; |
36 | | |
37 | | void TimestampedVersionTracker::_construct_versioned_tracker( |
38 | 53 | const std::vector<RowsetMetaSharedPtr>& rs_metas) { |
39 | 53 | int64_t max_version = 0; |
40 | | |
41 | | // construct the rowset graph |
42 | 53 | _version_graph.reconstruct_version_graph(rs_metas, &max_version); |
43 | 53 | } |
44 | | |
45 | | void TimestampedVersionTracker::construct_versioned_tracker( |
46 | 12 | const std::vector<RowsetMetaSharedPtr>& rs_metas) { |
47 | 12 | if (rs_metas.empty()) { Branch (47:9): [True: 0, False: 12]
|
48 | 0 | VLOG_NOTICE << "there is no version in the header."; Line | Count | Source | 42 | 0 | #define VLOG_NOTICE VLOG(3) |
|
49 | 0 | return; |
50 | 0 | } |
51 | 12 | _stale_version_path_map.clear(); |
52 | 12 | _next_path_id = 1; |
53 | 12 | _construct_versioned_tracker(rs_metas); |
54 | 12 | } |
55 | | |
56 | | void TimestampedVersionTracker::construct_versioned_tracker( |
57 | | const std::vector<RowsetMetaSharedPtr>& rs_metas, |
58 | 660 | const std::vector<RowsetMetaSharedPtr>& stale_metas) { |
59 | 660 | if (rs_metas.empty()) { Branch (59:9): [True: 619, False: 41]
|
60 | 619 | VLOG_NOTICE << "there is no version in the header."; Line | Count | Source | 42 | 0 | #define VLOG_NOTICE VLOG(3) |
|
61 | 619 | return; |
62 | 619 | } |
63 | 41 | _stale_version_path_map.clear(); |
64 | 41 | _next_path_id = 1; |
65 | 41 | _construct_versioned_tracker(rs_metas); |
66 | | |
67 | | // Init `_stale_version_path_map`. |
68 | 41 | _init_stale_version_path_map(rs_metas, stale_metas); |
69 | 41 | } |
70 | | |
71 | | void TimestampedVersionTracker::_init_stale_version_path_map( |
72 | | const std::vector<RowsetMetaSharedPtr>& rs_metas, |
73 | 41 | const std::vector<RowsetMetaSharedPtr>& stale_metas) { |
74 | 41 | if (stale_metas.empty()) { Branch (74:9): [True: 40, False: 1]
|
75 | 40 | return; |
76 | 40 | } |
77 | | |
78 | | // Sort stale meta by version diff (second version - first version). |
79 | 1 | std::list<RowsetMetaSharedPtr> sorted_stale_metas; |
80 | 7 | for (auto& rs : stale_metas) { Branch (80:19): [True: 7, False: 1]
|
81 | 7 | sorted_stale_metas.emplace_back(rs); |
82 | 7 | } |
83 | | |
84 | | // 1. sort the existing rowsets by version in ascending order. |
85 | 14 | sorted_stale_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) { |
86 | | // Compare by version diff between `version.first` and `version.second`. |
87 | 14 | int64_t a_diff = a->version().second - a->version().first; |
88 | 14 | int64_t b_diff = b->version().second - b->version().first; |
89 | | |
90 | 14 | int diff = a_diff - b_diff; |
91 | 14 | if (diff < 0) { Branch (91:13): [True: 5, False: 9]
|
92 | 5 | return true; |
93 | 9 | } else if (diff > 0) { Branch (93:20): [True: 4, False: 5]
|
94 | 4 | return false; |
95 | 4 | } |
96 | | // When the version diff is equal, compare the rowset`s stale time |
97 | 5 | return a->stale_at() < b->stale_at(); |
98 | 14 | }); |
99 | | |
100 | | // first_version -> (second_version -> rowset_meta) |
101 | 1 | std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>> stale_map; |
102 | | |
103 | | // 2. generate stale path from stale_metas. traverse sorted_stale_metas and each time add stale_meta to stale_map. |
104 | | // when a stale path in stale_map can replace stale_meta in sorted_stale_metas, stale_map remove rowset_metas of a stale path |
105 | | // and add the path to `_stale_version_path_map`. |
106 | 7 | for (auto& stale_meta : sorted_stale_metas) { Branch (106:27): [True: 7, False: 1]
|
107 | 7 | std::vector<RowsetMetaSharedPtr> stale_path; |
108 | | // 2.1 find a path in `stale_map` can replace current `stale_meta` version. |
109 | 7 | bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(), |
110 | 7 | stale_meta->end_version(), &stale_path); |
111 | | |
112 | | // 2.2 add version to `version_graph`. |
113 | 7 | Version stale_meta_version = stale_meta->version(); |
114 | 7 | add_version(stale_meta_version); |
115 | | |
116 | | // 2.3 find the path. |
117 | 7 | if (r) { Branch (117:13): [True: 1, False: 6]
|
118 | | // Add the path to `_stale_version_path_map`. |
119 | 1 | add_stale_path_version(stale_path); |
120 | | // Remove `stale_path` from `stale_map`. |
121 | 2 | for (auto stale_item : stale_path) { Branch (121:34): [True: 2, False: 1]
|
122 | 2 | stale_map[stale_item->start_version()].erase(stale_item->end_version()); |
123 | | |
124 | 2 | if (stale_map[stale_item->start_version()].empty()) { Branch (124:21): [True: 2, False: 0]
|
125 | 2 | stale_map.erase(stale_item->start_version()); |
126 | 2 | } |
127 | 2 | } |
128 | 1 | } |
129 | | |
130 | | // 2.4 add `stale_meta` to `stale_map`. |
131 | 7 | auto start_iter = stale_map.find(stale_meta->start_version()); |
132 | 7 | if (start_iter != stale_map.end()) { Branch (132:13): [True: 0, False: 7]
|
133 | 0 | start_iter->second[stale_meta->end_version()] = stale_meta; |
134 | 7 | } else { |
135 | 7 | std::unordered_map<int64_t, RowsetMetaSharedPtr> item; |
136 | 7 | item[stale_meta->end_version()] = stale_meta; |
137 | 7 | stale_map[stale_meta->start_version()] = std::move(item); |
138 | 7 | } |
139 | 7 | } |
140 | | |
141 | | // 3. generate stale path from `rs_metas`. |
142 | 5 | for (auto& stale_meta : rs_metas) { Branch (142:27): [True: 5, False: 1]
|
143 | 5 | std::vector<RowsetMetaSharedPtr> stale_path; |
144 | | // 3.1 find a path in stale_map can replace current `stale_meta` version. |
145 | 5 | bool r = _find_path_from_stale_map(stale_map, stale_meta->start_version(), |
146 | 5 | stale_meta->end_version(), &stale_path); |
147 | | |
148 | | // 3.2 find the path. |
149 | 5 | if (r) { Branch (149:13): [True: 2, False: 3]
|
150 | | // Add the path to `_stale_version_path_map`. |
151 | 2 | add_stale_path_version(stale_path); |
152 | | // Remove `stale_path` from `stale_map`. |
153 | 4 | for (auto stale_item : stale_path) { Branch (153:34): [True: 4, False: 2]
|
154 | 4 | stale_map[stale_item->start_version()].erase(stale_item->end_version()); |
155 | | |
156 | 4 | if (stale_map[stale_item->start_version()].empty()) { Branch (156:21): [True: 4, False: 0]
|
157 | 4 | stale_map.erase(stale_item->start_version()); |
158 | 4 | } |
159 | 4 | } |
160 | 2 | } |
161 | 5 | } |
162 | | |
163 | | // 4. process remain stale `rowset_meta` in `stale_map`. |
164 | 1 | auto map_iter = stale_map.begin(); |
165 | 2 | while (map_iter != stale_map.end()) { Branch (165:12): [True: 1, False: 1]
|
166 | 1 | auto second_iter = map_iter->second.begin(); |
167 | 2 | while (second_iter != map_iter->second.end()) { Branch (167:16): [True: 1, False: 1]
|
168 | | // Each remain stale `rowset_meta` generate a stale path. |
169 | 1 | std::vector<RowsetMetaSharedPtr> stale_path; |
170 | 1 | stale_path.push_back(second_iter->second); |
171 | 1 | add_stale_path_version(stale_path); |
172 | | |
173 | 1 | second_iter++; |
174 | 1 | } |
175 | 1 | map_iter++; |
176 | 1 | } |
177 | 1 | } |
178 | | |
179 | | bool TimestampedVersionTracker::_find_path_from_stale_map( |
180 | | const std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>>& |
181 | | stale_map, |
182 | | int64_t first_version, int64_t second_version, |
183 | 16 | std::vector<RowsetMetaSharedPtr>* stale_path) { |
184 | 16 | auto first_iter = stale_map.find(first_version); |
185 | | // If `first_version` not in `stale_map`, there is no path. |
186 | 16 | if (first_iter == stale_map.end()) { Branch (186:9): [True: 9, False: 7]
|
187 | 9 | return false; |
188 | 9 | } |
189 | 7 | auto& second_version_map = first_iter->second; |
190 | 7 | auto second_iter = second_version_map.find(second_version); |
191 | | // If second_version in `stale_map`, find a path. |
192 | 7 | if (second_iter != second_version_map.end()) { Branch (192:9): [True: 3, False: 4]
|
193 | 3 | auto row_meta = second_iter->second; |
194 | | // Add rowset to path. |
195 | 3 | stale_path->push_back(row_meta); |
196 | 3 | return true; |
197 | 3 | } |
198 | | |
199 | | // Traverse the first version map to backtracking `_find_path_from_stale_map`. |
200 | 4 | auto map_iter = second_version_map.begin(); |
201 | 5 | while (map_iter != second_version_map.end()) { Branch (201:12): [True: 4, False: 1]
|
202 | | // The version greater than `second_version`, we can't find path in `stale_map`. |
203 | 4 | if (map_iter->first > second_version) { Branch (203:13): [True: 0, False: 4]
|
204 | 0 | map_iter++; |
205 | 0 | continue; |
206 | 0 | } |
207 | | // Backtracking `_find_path_from_stale_map` find from `map_iter->first + 1` to `second_version`. |
208 | 4 | stale_path->push_back(map_iter->second); |
209 | 4 | bool r = _find_path_from_stale_map(stale_map, map_iter->first + 1, second_version, |
210 | 4 | stale_path); |
211 | 4 | if (r) { Branch (211:13): [True: 3, False: 1]
|
212 | 3 | return true; |
213 | 3 | } |
214 | | // There is no path in current version, pop and continue. |
215 | 1 | stale_path->pop_back(); |
216 | 1 | map_iter++; |
217 | 1 | } |
218 | | |
219 | 1 | return false; |
220 | 4 | } |
221 | | |
222 | 35 | void TimestampedVersionTracker::get_stale_version_path_json_doc(rapidjson::Document& path_arr) { |
223 | 35 | auto path_arr_iter = _stale_version_path_map.begin(); |
224 | | |
225 | | // Do loop version path. |
226 | 133 | while (path_arr_iter != _stale_version_path_map.end()) { Branch (226:12): [True: 98, False: 35]
|
227 | 98 | auto path_id = path_arr_iter->first; |
228 | 98 | auto path_version_path = path_arr_iter->second; |
229 | | |
230 | 98 | rapidjson::Document item; |
231 | 98 | item.SetObject(); |
232 | | // Add `path_id` to item. |
233 | 98 | auto path_id_str = std::to_string(path_id); |
234 | 98 | rapidjson::Value path_id_value; |
235 | 98 | path_id_value.SetString(path_id_str.c_str(), path_id_str.length(), path_arr.GetAllocator()); |
236 | 98 | item.AddMember("path id", path_id_value, path_arr.GetAllocator()); |
237 | | |
238 | | // Add max create time to item. |
239 | 98 | auto time_zone = cctz::local_time_zone(); |
240 | | |
241 | 98 | auto tp = std::chrono::system_clock::from_time_t(path_version_path->max_create_time()); |
242 | 98 | auto create_time_str = cctz::format("%Y-%m-%d %H:%M:%S %z", tp, time_zone); |
243 | | |
244 | 98 | rapidjson::Value create_time_value; |
245 | 98 | create_time_value.SetString(create_time_str.c_str(), create_time_str.length(), |
246 | 98 | path_arr.GetAllocator()); |
247 | 98 | item.AddMember("last create time", create_time_value, path_arr.GetAllocator()); |
248 | | |
249 | | // Add path list to item. |
250 | 98 | std::stringstream path_list_stream; |
251 | 98 | path_list_stream << path_id_str; |
252 | 98 | auto path_list_ptr = path_version_path->timestamped_versions(); |
253 | 98 | auto path_list_iter = path_list_ptr.begin(); |
254 | 645 | while (path_list_iter != path_list_ptr.end()) { Branch (254:16): [True: 547, False: 98]
|
255 | 547 | path_list_stream << " -> "; |
256 | 547 | path_list_stream << "["; |
257 | 547 | path_list_stream << (*path_list_iter)->version().first; |
258 | 547 | path_list_stream << "-"; |
259 | 547 | path_list_stream << (*path_list_iter)->version().second; |
260 | 547 | path_list_stream << "]"; |
261 | 547 | path_list_iter++; |
262 | 547 | } |
263 | 98 | std::string path_list = path_list_stream.str(); |
264 | 98 | rapidjson::Value path_list_value; |
265 | 98 | path_list_value.SetString(path_list.c_str(), path_list.length(), path_arr.GetAllocator()); |
266 | 98 | item.AddMember("path list", path_list_value, path_arr.GetAllocator()); |
267 | | |
268 | | // Add item to `path_arr`. |
269 | 98 | path_arr.PushBack(item, path_arr.GetAllocator()); |
270 | | |
271 | 98 | path_arr_iter++; |
272 | 98 | } |
273 | 35 | } |
274 | | |
275 | | void TimestampedVersionTracker::recover_versioned_tracker( |
276 | 1 | const std::map<int64_t, PathVersionListSharedPtr>& stale_version_path_map) { |
277 | 1 | auto _path_map_iter = stale_version_path_map.begin(); |
278 | | // Recover `stale_version_path_map`. |
279 | 1 | while (_path_map_iter != stale_version_path_map.end()) { Branch (279:12): [True: 0, False: 1]
|
280 | | // Add `PathVersionListSharedPtr` to map. |
281 | 0 | _stale_version_path_map[_path_map_iter->first] = _path_map_iter->second; |
282 | |
|
283 | 0 | std::vector<TimestampedVersionSharedPtr>& timestamped_versions = |
284 | 0 | _path_map_iter->second->timestamped_versions(); |
285 | 0 | std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter = |
286 | 0 | timestamped_versions.begin(); |
287 | 0 | while (version_path_iter != timestamped_versions.end()) { Branch (287:16): [True: 0, False: 0]
|
288 | | // Add version to `_version_graph`. |
289 | 0 | _version_graph.add_version_to_graph((*version_path_iter)->version()); |
290 | 0 | ++version_path_iter; |
291 | 0 | } |
292 | 0 | ++_path_map_iter; |
293 | 0 | } |
294 | 1 | LOG(INFO) << "recover_versioned_tracker current map info " << get_current_path_map_str(); |
295 | 1 | } |
296 | | |
297 | 11.7k | void TimestampedVersionTracker::add_version(const Version& version) { |
298 | 11.7k | _version_graph.add_version_to_graph(version); |
299 | 11.7k | } |
300 | | |
301 | | void TimestampedVersionTracker::add_stale_path_version( |
302 | 136 | const std::vector<RowsetMetaSharedPtr>& stale_rs_metas) { |
303 | 136 | if (stale_rs_metas.empty()) { Branch (303:9): [True: 1, False: 135]
|
304 | 1 | VLOG_NOTICE << "there is no version in the stale_rs_metas."; Line | Count | Source | 42 | 0 | #define VLOG_NOTICE VLOG(3) |
|
305 | 1 | return; |
306 | 1 | } |
307 | | |
308 | 135 | PathVersionListSharedPtr ptr(new TimestampedVersionPathContainer()); |
309 | 617 | for (auto rs : stale_rs_metas) { Branch (309:18): [True: 617, False: 135]
|
310 | 617 | TimestampedVersionSharedPtr vt_ptr(new TimestampedVersion(rs->version(), rs->stale_at())); |
311 | 617 | ptr->add_timestamped_version(vt_ptr); |
312 | 617 | } |
313 | | |
314 | 135 | std::vector<TimestampedVersionSharedPtr>& timestamped_versions = ptr->timestamped_versions(); |
315 | | |
316 | 135 | struct TimestampedVersionPtrCompare { |
317 | 135 | bool operator()(const TimestampedVersionSharedPtr ptr1, |
318 | 1.05k | const TimestampedVersionSharedPtr ptr2) { |
319 | 1.05k | return ptr1->version().first < ptr2->version().first; |
320 | 1.05k | } |
321 | 135 | }; |
322 | 135 | sort(timestamped_versions.begin(), timestamped_versions.end(), TimestampedVersionPtrCompare()); |
323 | 135 | _stale_version_path_map[_next_path_id] = ptr; |
324 | 135 | _next_path_id++; |
325 | 135 | } |
326 | | |
327 | | // Capture consistent versions from graph. |
328 | | Status TimestampedVersionTracker::capture_consistent_versions( |
329 | 26 | const Version& spec_version, std::vector<Version>* version_path) const { |
330 | 26 | return _version_graph.capture_consistent_versions(spec_version, version_path); |
331 | 26 | } |
332 | | |
333 | | Status TimestampedVersionTracker::capture_consistent_versions_with_validator( |
334 | | const Version& spec_version, std::vector<Version>& version_path, |
335 | 9 | const std::function<bool(int64_t, int64_t)>& validator) const { |
336 | 9 | return _version_graph.capture_consistent_versions_with_validator(spec_version, version_path, |
337 | 9 | validator); |
338 | 9 | } |
339 | | |
340 | | Status TimestampedVersionTracker::capture_consistent_versions_prefer_cache( |
341 | | const Version& spec_version, std::vector<Version>& version_path, |
342 | 13 | const std::function<bool(int64_t, int64_t)>& validator) const { |
343 | 13 | return _version_graph.capture_consistent_versions_prefer_cache(spec_version, version_path, |
344 | 13 | validator); |
345 | 13 | } |
346 | | |
347 | | Status TimestampedVersionTracker::capture_consistent_versions_with_validator_mow( |
348 | | const Version& spec_version, std::vector<Version>& version_path, |
349 | 11 | const std::function<bool(int64_t, int64_t)>& validator) const { |
350 | 11 | return _version_graph.capture_consistent_versions_with_validator_mow(spec_version, version_path, |
351 | 11 | validator); |
352 | 11 | } |
353 | | |
354 | | void TimestampedVersionTracker::capture_expired_paths( |
355 | 5 | int64_t stale_sweep_endtime, std::vector<int64_t>* path_version_vec) const { |
356 | 5 | std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter = |
357 | 5 | _stale_version_path_map.begin(); |
358 | | |
359 | 19 | while (iter != _stale_version_path_map.end()) { Branch (359:12): [True: 14, False: 5]
|
360 | 14 | int64_t max_create_time = iter->second->max_create_time(); |
361 | 14 | if (max_create_time <= stale_sweep_endtime) { Branch (361:13): [True: 9, False: 5]
|
362 | 9 | int64_t path_version = iter->first; |
363 | 9 | path_version_vec->push_back(path_version); |
364 | 9 | } |
365 | 14 | ++iter; |
366 | 14 | } |
367 | 5 | } |
368 | | |
369 | 13 | PathVersionListSharedPtr TimestampedVersionTracker::fetch_path_version_by_id(int64_t path_id) { |
370 | 13 | if (_stale_version_path_map.count(path_id) == 0) { Branch (370:9): [True: 0, False: 13]
|
371 | 0 | VLOG_NOTICE << "path version " << path_id << " does not exist!"; Line | Count | Source | 42 | 0 | #define VLOG_NOTICE VLOG(3) |
|
372 | 0 | return nullptr; |
373 | 0 | } |
374 | | |
375 | 13 | return _stale_version_path_map[path_id]; |
376 | 13 | } |
377 | | |
378 | 13 | PathVersionListSharedPtr TimestampedVersionTracker::fetch_and_delete_path_by_id(int64_t path_id) { |
379 | 13 | if (_stale_version_path_map.count(path_id) == 0) { Branch (379:9): [True: 0, False: 13]
|
380 | 0 | VLOG_NOTICE << "path version " << path_id << " does not exist!"; Line | Count | Source | 42 | 0 | #define VLOG_NOTICE VLOG(3) |
|
381 | 0 | return nullptr; |
382 | 0 | } |
383 | | |
384 | 13 | VLOG_NOTICE << get_current_path_map_str(); Line | Count | Source | 42 | 0 | #define VLOG_NOTICE VLOG(3) |
|
385 | 13 | PathVersionListSharedPtr ptr = fetch_path_version_by_id(path_id); |
386 | | |
387 | 13 | _stale_version_path_map.erase(path_id); |
388 | | |
389 | 23 | for (auto& version : ptr->timestamped_versions()) { Branch (389:24): [True: 23, False: 13]
|
390 | 23 | static_cast<void>(_version_graph.delete_version_from_graph(version->version())); |
391 | 23 | } |
392 | 13 | return ptr; |
393 | 13 | } |
394 | | |
395 | 1 | std::string TimestampedVersionTracker::get_current_path_map_str() { |
396 | 1 | std::stringstream tracker_info; |
397 | 1 | tracker_info << "current expired next_path_id " << _next_path_id << std::endl; |
398 | | |
399 | 1 | std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter = |
400 | 1 | _stale_version_path_map.begin(); |
401 | 1 | while (iter != _stale_version_path_map.end()) { Branch (401:12): [True: 0, False: 1]
|
402 | 0 | tracker_info << "current expired path_version " << iter->first; |
403 | 0 | std::vector<TimestampedVersionSharedPtr>& timestamped_versions = |
404 | 0 | iter->second->timestamped_versions(); |
405 | 0 | std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter = |
406 | 0 | timestamped_versions.begin(); |
407 | 0 | int64_t max_create_time = -1; |
408 | 0 | while (version_path_iter != timestamped_versions.end()) { Branch (408:16): [True: 0, False: 0]
|
409 | 0 | if (max_create_time < (*version_path_iter)->get_create_time()) { Branch (409:17): [True: 0, False: 0]
|
410 | 0 | max_create_time = (*version_path_iter)->get_create_time(); |
411 | 0 | } |
412 | 0 | tracker_info << " -> ["; |
413 | 0 | tracker_info << (*version_path_iter)->version().first; |
414 | 0 | tracker_info << ","; |
415 | 0 | tracker_info << (*version_path_iter)->version().second; |
416 | 0 | tracker_info << "]"; |
417 | |
|
418 | 0 | ++version_path_iter; |
419 | 0 | } |
420 | |
|
421 | 0 | tracker_info << std::endl; |
422 | 0 | ++iter; |
423 | 0 | } |
424 | 1 | return tracker_info.str(); |
425 | 1 | } |
426 | | |
427 | 1 | double TimestampedVersionTracker::get_orphan_vertex_ratio() { |
428 | 1 | return _version_graph.get_orphan_vertex_ratio(); |
429 | 1 | } |
430 | | |
431 | 5 | std::string TimestampedVersionTracker::debug_string() const { |
432 | 5 | return _version_graph.debug_string(); |
433 | 5 | } |
434 | | |
435 | 617 | void TimestampedVersionPathContainer::add_timestamped_version(TimestampedVersionSharedPtr version) { |
436 | | // Compare and refresh `_max_create_time`. |
437 | 617 | if (version->get_create_time() > _max_create_time) { Branch (437:9): [True: 135, False: 482]
|
438 | 135 | _max_create_time = version->get_create_time(); |
439 | 135 | } |
440 | 617 | _timestamped_versions_container.push_back(version); |
441 | 617 | } |
442 | | |
443 | 261 | std::vector<TimestampedVersionSharedPtr>& TimestampedVersionPathContainer::timestamped_versions() { |
444 | 261 | return _timestamped_versions_container; |
445 | 261 | } |
446 | | |
447 | | void VersionGraph::construct_version_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas, |
448 | 58 | int64_t* max_version) { |
449 | 58 | if (rs_metas.empty()) { Branch (449:9): [True: 0, False: 58]
|
450 | 0 | VLOG_NOTICE << "there is no version in the header."; Line | Count | Source | 42 | 0 | #define VLOG_NOTICE VLOG(3) |
|
451 | 0 | return; |
452 | 0 | } |
453 | | |
454 | | // Distill vertex values from versions in TabletMeta. |
455 | 58 | std::vector<int64_t> vertex_values; |
456 | 58 | vertex_values.reserve(2 * rs_metas.size()); |
457 | | |
458 | 391 | for (size_t i = 0; i < rs_metas.size(); ++i) { Branch (458:24): [True: 333, False: 58]
|
459 | 333 | vertex_values.push_back(rs_metas[i]->start_version()); |
460 | 333 | vertex_values.push_back(rs_metas[i]->end_version() + 1); |
461 | 333 | if (max_version != nullptr and *max_version < rs_metas[i]->end_version()) { Branch (461:13): [True: 333, False: 0]
Branch (461:40): [True: 270, False: 63]
|
462 | 270 | *max_version = rs_metas[i]->end_version(); |
463 | 270 | } |
464 | 333 | } |
465 | 58 | std::sort(vertex_values.begin(), vertex_values.end()); |
466 | | |
467 | | // Items in `vertex_values` are sorted, but not unique. |
468 | | // we choose unique items in `vertex_values` to create vertexes. |
469 | 58 | int64_t last_vertex_value = -1; |
470 | 724 | for (size_t i = 0; i < vertex_values.size(); ++i) { Branch (470:24): [True: 666, False: 58]
|
471 | 666 | if (i != 0 && vertex_values[i] == last_vertex_value) { Branch (471:13): [True: 608, False: 58]
Branch (471:23): [True: 291, False: 317]
|
472 | 291 | continue; |
473 | 291 | } |
474 | | |
475 | | // Add vertex to graph. |
476 | 375 | _add_vertex_to_graph(vertex_values[i]); |
477 | 375 | last_vertex_value = vertex_values[i]; |
478 | 375 | } |
479 | | // Create edges for version graph according to TabletMeta's versions. |
480 | 391 | for (size_t i = 0; i < rs_metas.size(); ++i) { Branch (480:24): [True: 333, False: 58]
|
481 | | // Versions in header are unique. |
482 | | // We ensure `_vertex_index_map` has its `start_version`. |
483 | 333 | int64_t start_vertex_index = _vertex_index_map[rs_metas[i]->start_version()]; |
484 | 333 | int64_t end_vertex_index = _vertex_index_map[rs_metas[i]->end_version() + 1]; |
485 | | // Add one edge from `start_version` to `end_version`. |
486 | 333 | _version_graph[start_vertex_index].edges.push_front(end_vertex_index); |
487 | | // Add reverse edge from `end_version` to `start_version`. |
488 | 333 | _version_graph[end_vertex_index].edges.push_front(start_vertex_index); |
489 | 333 | } |
490 | | |
491 | | // Sort edges by version in descending order. |
492 | 375 | for (auto& vertex : _version_graph) { Branch (492:23): [True: 375, False: 58]
|
493 | 375 | vertex.edges.sort([this](const int& vertex_idx_a, const int& vertex_idx_b) { |
494 | 340 | return _version_graph[vertex_idx_a].value > _version_graph[vertex_idx_b].value; |
495 | 340 | }); |
496 | 375 | } |
497 | 58 | } |
498 | | |
499 | | void VersionGraph::reconstruct_version_graph(const std::vector<RowsetMetaSharedPtr>& rs_metas, |
500 | 54 | int64_t* max_version) { |
501 | 54 | _version_graph.clear(); |
502 | 54 | _vertex_index_map.clear(); |
503 | | |
504 | 54 | construct_version_graph(rs_metas, max_version); |
505 | 54 | } |
506 | | |
507 | 11.7k | void VersionGraph::add_version_to_graph(const Version& version) { |
508 | | // Add version.first as new vertex of version graph if not exist. |
509 | 11.7k | int64_t start_vertex_value = version.first; |
510 | 11.7k | int64_t end_vertex_value = version.second + 1; |
511 | | |
512 | | // Add vertex to graph. |
513 | 11.7k | _add_vertex_to_graph(start_vertex_value); |
514 | 11.7k | _add_vertex_to_graph(end_vertex_value); |
515 | | |
516 | 11.7k | int64_t start_vertex_index = _vertex_index_map[start_vertex_value]; |
517 | 11.7k | int64_t end_vertex_index = _vertex_index_map[end_vertex_value]; |
518 | | |
519 | | // We assume this version is new version, so we just add two edges |
520 | | // into version graph. add one edge from `start_version` to `end_version` |
521 | | // Make sure the vertex's edges are sorted by version in descending order when inserting. |
522 | 11.7k | auto end_vertex_it = _version_graph[start_vertex_index].edges.begin(); |
523 | 11.7k | while (end_vertex_it != _version_graph[start_vertex_index].edges.end()) { Branch (523:12): [True: 11.3k, False: 431]
|
524 | 11.3k | if (_version_graph[*end_vertex_it].value < _version_graph[end_vertex_index].value) { Branch (524:13): [True: 11.3k, False: 37]
|
525 | 11.3k | break; |
526 | 11.3k | } |
527 | 37 | end_vertex_it++; |
528 | 37 | } |
529 | 11.7k | _version_graph[start_vertex_index].edges.insert(end_vertex_it, end_vertex_index); |
530 | | |
531 | | // We add reverse edge(from end_version to start_version) to graph |
532 | | // Make sure the vertex's edges are sorted by version in descending order when inserting. |
533 | 11.7k | auto start_vertex_it = _version_graph[end_vertex_index].edges.begin(); |
534 | 11.9k | while (start_vertex_it != _version_graph[end_vertex_index].edges.end()) { Branch (534:12): [True: 215, False: 11.7k]
|
535 | 215 | if (_version_graph[*start_vertex_it].value < _version_graph[start_vertex_index].value) { Branch (535:13): [True: 17, False: 198]
|
536 | 17 | break; |
537 | 17 | } |
538 | 198 | start_vertex_it++; |
539 | 198 | } |
540 | 11.7k | _version_graph[end_vertex_index].edges.insert(start_vertex_it, start_vertex_index); |
541 | 11.7k | } |
542 | | |
543 | 32 | Status VersionGraph::delete_version_from_graph(const Version& version) { |
544 | 32 | int64_t start_vertex_value = version.first; |
545 | 32 | int64_t end_vertex_value = version.second + 1; |
546 | | |
547 | 32 | if (_vertex_index_map.find(start_vertex_value) == _vertex_index_map.end() || Branch (547:9): [True: 0, False: 32]
Branch (547:9): [True: 0, False: 32]
|
548 | 32 | _vertex_index_map.find(end_vertex_value) == _vertex_index_map.end()) { Branch (548:9): [True: 0, False: 32]
|
549 | 0 | return Status::Error<HEADER_DELETE_VERSION>( |
550 | 0 | "vertex for version does not exists. version={}-{}", version.first, version.second); |
551 | 0 | } |
552 | | |
553 | 32 | int64_t start_vertex_index = _vertex_index_map[start_vertex_value]; |
554 | 32 | int64_t end_vertex_index = _vertex_index_map[end_vertex_value]; |
555 | | // Remove edge and its reverse edge. |
556 | | // When there are same versions in edges, just remove the first version. |
557 | 32 | auto start_edges_iter = _version_graph[start_vertex_index].edges.begin(); |
558 | 47 | while (start_edges_iter != _version_graph[start_vertex_index].edges.end()) { Branch (558:12): [True: 47, False: 0]
|
559 | 47 | if (*start_edges_iter == end_vertex_index) { Branch (559:13): [True: 32, False: 15]
|
560 | 32 | _version_graph[start_vertex_index].edges.erase(start_edges_iter); |
561 | 32 | break; |
562 | 32 | } |
563 | 15 | start_edges_iter++; |
564 | 15 | } |
565 | | |
566 | 32 | auto end_edges_iter = _version_graph[end_vertex_index].edges.begin(); |
567 | 65 | while (end_edges_iter != _version_graph[end_vertex_index].edges.end()) { Branch (567:12): [True: 65, False: 0]
|
568 | 65 | if (*end_edges_iter == start_vertex_index) { Branch (568:13): [True: 32, False: 33]
|
569 | 32 | _version_graph[end_vertex_index].edges.erase(end_edges_iter); |
570 | 32 | break; |
571 | 32 | } |
572 | 33 | end_edges_iter++; |
573 | 33 | } |
574 | | |
575 | | // Here we do not delete vertex in `_version_graph` even if its edges are empty. |
576 | | // the `_version_graph` will be rebuilt when doing trash sweep. |
577 | 32 | return Status::OK(); |
578 | 32 | } |
579 | | |
580 | 23.8k | void VersionGraph::_add_vertex_to_graph(int64_t vertex_value) { |
581 | | // Vertex with vertex_value already exists. |
582 | 23.8k | if (_vertex_index_map.find(vertex_value) != _vertex_index_map.end()) { Branch (582:9): [True: 11.4k, False: 12.4k]
|
583 | 11.4k | VLOG_NOTICE << "vertex with vertex value already exists. value=" << vertex_value; Line | Count | Source | 42 | 0 | #define VLOG_NOTICE VLOG(3) |
|
584 | 11.4k | return; |
585 | 11.4k | } |
586 | | |
587 | 12.4k | _version_graph.emplace_back(Vertex(vertex_value)); |
588 | 12.4k | _vertex_index_map[vertex_value] = _version_graph.size() - 1; |
589 | 12.4k | } |
590 | | |
591 | | Status VersionGraph::capture_consistent_versions(const Version& spec_version, |
592 | 28 | std::vector<Version>* version_path) const { |
593 | 28 | if (spec_version.first > spec_version.second) { Branch (593:9): [True: 0, False: 28]
|
594 | 0 | return Status::Error<INVALID_ARGUMENT, false>( |
595 | 0 | "invalid specified version. spec_version={}-{}", spec_version.first, |
596 | 0 | spec_version.second); |
597 | 0 | } |
598 | | |
599 | 28 | int64_t cur_idx = -1; |
600 | 32 | for (size_t i = 0; i < _version_graph.size(); i++) { Branch (600:24): [True: 32, False: 0]
|
601 | 32 | if (_version_graph[i].value == spec_version.first) { Branch (601:13): [True: 28, False: 4]
|
602 | 28 | cur_idx = i; |
603 | 28 | break; |
604 | 28 | } |
605 | 32 | } |
606 | | |
607 | 28 | if (cur_idx < 0) { Branch (607:9): [True: 0, False: 28]
|
608 | 0 | return Status::InternalError<false>( |
609 | 0 | "failed to find path in version_graph. spec_version: {}-{}", spec_version.first, |
610 | 0 | spec_version.second); |
611 | 0 | } |
612 | | |
613 | 28 | int64_t end_value = spec_version.second + 1; |
614 | 107 | while (_version_graph[cur_idx].value < end_value) { Branch (614:12): [True: 80, False: 27]
|
615 | 80 | int64_t next_idx = -1; |
616 | 84 | for (const auto& it : _version_graph[cur_idx].edges) { Branch (616:29): [True: 84, False: 0]
|
617 | | // Only consider incremental versions. |
618 | 84 | if (_version_graph[it].value < _version_graph[cur_idx].value) { Branch (618:17): [True: 1, False: 83]
|
619 | 1 | break; |
620 | 1 | } |
621 | | |
622 | 83 | if (_version_graph[it].value > end_value) { Branch (622:17): [True: 4, False: 79]
|
623 | 4 | continue; |
624 | 4 | } |
625 | | |
626 | | // Considering edges had been sorted by version in descending order, |
627 | | // This version is the largest version that smaller than `end_version`. |
628 | 79 | next_idx = it; |
629 | 79 | break; |
630 | 83 | } |
631 | | |
632 | 80 | if (next_idx > -1) { Branch (632:13): [True: 79, False: 1]
|
633 | 79 | if (version_path != nullptr) { Branch (633:17): [True: 79, False: 0]
|
634 | 79 | version_path->emplace_back(_version_graph[cur_idx].value, |
635 | 79 | _version_graph[next_idx].value - 1); |
636 | 79 | } |
637 | 79 | cur_idx = next_idx; |
638 | 79 | } else { |
639 | 1 | return Status::InternalError<false>( |
640 | 1 | "fail to find path in version_graph. spec_version: {}-{}", spec_version.first, |
641 | 1 | spec_version.second); |
642 | 1 | } |
643 | 80 | } |
644 | | |
645 | 27 | if (VLOG_TRACE_IS_ON && version_path != nullptr) {Line | Count | Source | 50 | 27 | #define VLOG_TRACE_IS_ON VLOG_IS_ON(10) |
Branch (645:29): [True: 0, False: 0]
|
646 | 0 | std::stringstream shortest_path_for_debug; |
647 | 0 | for (const auto& version : *version_path) { Branch (647:34): [True: 0, False: 0]
|
648 | 0 | shortest_path_for_debug << version << ' '; |
649 | 0 | } |
650 | 0 | VLOG_TRACE << "success to find path for spec_version. spec_version=" << spec_version Line | Count | Source | 40 | 0 | #define VLOG_TRACE VLOG(10) |
|
651 | 0 | << ", path=" << shortest_path_for_debug.str(); |
652 | 0 | } |
653 | | |
654 | 27 | return Status::OK(); |
655 | 28 | } |
656 | | |
657 | | Status VersionGraph::capture_consistent_versions_prefer_cache( |
658 | | const Version& spec_version, std::vector<Version>& version_path, |
659 | 13 | const std::function<bool(int64_t, int64_t)>& validator) const { |
660 | 13 | if (spec_version.first > spec_version.second) { Branch (660:9): [True: 0, False: 13]
|
661 | 0 | return Status::Error<INVALID_ARGUMENT, false>( |
662 | 0 | "invalid specified version. spec_version={}-{}", spec_version.first, |
663 | 0 | spec_version.second); |
664 | 0 | } |
665 | | |
666 | 13 | int64_t cur_idx = -1; |
667 | 13 | for (size_t i = 0; i < _version_graph.size(); i++) { Branch (667:24): [True: 13, False: 0]
|
668 | 13 | if (_version_graph[i].value == spec_version.first) { Branch (668:13): [True: 13, False: 0]
|
669 | 13 | cur_idx = i; |
670 | 13 | break; |
671 | 13 | } |
672 | 13 | } |
673 | | |
674 | 13 | if (cur_idx < 0) { Branch (674:9): [True: 0, False: 13]
|
675 | 0 | return Status::InternalError<false>("failed to find path in version_graph. spec_version={}", |
676 | 0 | spec_version.to_string()); |
677 | 0 | } |
678 | | |
679 | 13 | int64_t end_value = spec_version.second + 1; |
680 | 90 | while (_version_graph[cur_idx].value < end_value) { Branch (680:12): [True: 77, False: 13]
|
681 | 77 | int64_t next_idx = -1; |
682 | 77 | int64_t first_idx = -1; |
683 | 107 | for (const auto& it : _version_graph[cur_idx].edges) { Branch (683:29): [True: 107, False: 0]
|
684 | | // Only consider incremental versions. |
685 | 107 | if (_version_graph[it].value < _version_graph[cur_idx].value) { Branch (685:17): [True: 15, False: 92]
|
686 | 15 | break; |
687 | 15 | } |
688 | 92 | if (first_idx == -1) { Branch (688:17): [True: 77, False: 15]
|
689 | 77 | first_idx = it; |
690 | 77 | } |
691 | | |
692 | 92 | if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) { Branch (692:17): [True: 30, False: 62]
|
693 | 30 | continue; |
694 | 30 | } |
695 | | |
696 | 62 | next_idx = it; |
697 | 62 | break; |
698 | 92 | } |
699 | | |
700 | 77 | if (next_idx > -1) { Branch (700:13): [True: 62, False: 15]
|
701 | 62 | version_path.emplace_back(_version_graph[cur_idx].value, |
702 | 62 | _version_graph[next_idx].value - 1); |
703 | | |
704 | 62 | cur_idx = next_idx; |
705 | 62 | } else if (first_idx != -1) { Branch (705:20): [True: 15, False: 0]
|
706 | | // if all edges are not in cache, use the first edge if possible |
707 | 15 | version_path.emplace_back(_version_graph[cur_idx].value, |
708 | 15 | _version_graph[first_idx].value - 1); |
709 | 15 | cur_idx = first_idx; |
710 | 15 | } else { |
711 | 0 | return Status::OK(); |
712 | 0 | } |
713 | 77 | } |
714 | 13 | return Status::OK(); |
715 | 13 | } |
716 | | |
717 | | Status VersionGraph::capture_consistent_versions_with_validator( |
718 | | const Version& spec_version, std::vector<Version>& version_path, |
719 | 9 | const std::function<bool(int64_t, int64_t)>& validator) const { |
720 | 9 | if (spec_version.first > spec_version.second) { Branch (720:9): [True: 0, False: 9]
|
721 | 0 | return Status::Error<INVALID_ARGUMENT, false>( |
722 | 0 | "invalid specified version. spec_version={}-{}", spec_version.first, |
723 | 0 | spec_version.second); |
724 | 0 | } |
725 | | |
726 | 9 | int64_t cur_idx = -1; |
727 | 9 | for (size_t i = 0; i < _version_graph.size(); i++) { Branch (727:24): [True: 9, False: 0]
|
728 | 9 | if (_version_graph[i].value == spec_version.first) { Branch (728:13): [True: 9, False: 0]
|
729 | 9 | cur_idx = i; |
730 | 9 | break; |
731 | 9 | } |
732 | 9 | } |
733 | | |
734 | 9 | if (cur_idx < 0) { Branch (734:9): [True: 0, False: 9]
|
735 | 0 | return Status::InternalError<false>("failed to find path in version_graph. spec_version={}", |
736 | 0 | spec_version.to_string()); |
737 | 0 | } |
738 | | |
739 | 9 | int64_t end_value = spec_version.second + 1; |
740 | 50 | while (_version_graph[cur_idx].value < end_value) { Branch (740:12): [True: 48, False: 2]
|
741 | 48 | int64_t next_idx = -1; |
742 | 60 | for (const auto& it : _version_graph[cur_idx].edges) { Branch (742:29): [True: 60, False: 0]
|
743 | | // Only consider incremental versions. |
744 | 60 | if (_version_graph[it].value < _version_graph[cur_idx].value) { Branch (744:17): [True: 7, False: 53]
|
745 | 7 | break; |
746 | 7 | } |
747 | | |
748 | 53 | if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) { Branch (748:17): [True: 12, False: 41]
|
749 | 12 | continue; |
750 | 12 | } |
751 | | |
752 | 41 | next_idx = it; |
753 | 41 | break; |
754 | 53 | } |
755 | | |
756 | 48 | if (next_idx > -1) { Branch (756:13): [True: 41, False: 7]
|
757 | 41 | version_path.emplace_back(_version_graph[cur_idx].value, |
758 | 41 | _version_graph[next_idx].value - 1); |
759 | | |
760 | 41 | cur_idx = next_idx; |
761 | 41 | } else { |
762 | 7 | return Status::OK(); |
763 | 7 | } |
764 | 48 | } |
765 | 2 | return Status::OK(); |
766 | 9 | } |
767 | | |
768 | | Status VersionGraph::capture_consistent_versions_with_validator_mow( |
769 | | const Version& spec_version, std::vector<Version>& version_path, |
770 | 11 | const std::function<bool(int64_t, int64_t)>& validator) const { |
771 | 11 | if (spec_version.first > spec_version.second) { Branch (771:9): [True: 0, False: 11]
|
772 | 0 | return Status::Error<INVALID_ARGUMENT, false>( |
773 | 0 | "invalid specified version. spec_version={}-{}", spec_version.first, |
774 | 0 | spec_version.second); |
775 | 0 | } |
776 | | |
777 | 11 | int64_t cur_idx = -1; |
778 | 11 | for (size_t i = 0; i < _version_graph.size(); i++) { Branch (778:24): [True: 11, False: 0]
|
779 | 11 | if (_version_graph[i].value == spec_version.first) { Branch (779:13): [True: 11, False: 0]
|
780 | 11 | cur_idx = i; |
781 | 11 | break; |
782 | 11 | } |
783 | 11 | } |
784 | | |
785 | 11 | if (cur_idx < 0) { Branch (785:9): [True: 0, False: 11]
|
786 | 0 | return Status::InternalError<false>("failed to find path in version_graph. spec_version={}", |
787 | 0 | spec_version.to_string()); |
788 | 0 | } |
789 | | |
790 | 11 | int64_t end_value = spec_version.second + 1; |
791 | 60 | while (_version_graph[cur_idx].value < end_value) { Branch (791:12): [True: 55, False: 5]
|
792 | 55 | int64_t next_idx = -1; |
793 | 65 | for (const auto& it : _version_graph[cur_idx].edges) { Branch (793:29): [True: 65, False: 0]
|
794 | | // Only consider incremental versions. |
795 | 65 | if (_version_graph[it].value < _version_graph[cur_idx].value) { Branch (795:17): [True: 0, False: 65]
|
796 | 0 | break; |
797 | 0 | } |
798 | | |
799 | 65 | if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) { Branch (799:17): [True: 16, False: 49]
|
800 | 16 | if (_version_graph[cur_idx].value + 1 == _version_graph[it].value) { Branch (800:21): [True: 6, False: 10]
|
801 | 6 | break; |
802 | 6 | } |
803 | 10 | end_value = std::min(_version_graph[it].value, end_value); |
804 | 10 | continue; |
805 | 16 | } |
806 | | |
807 | 49 | next_idx = it; |
808 | 49 | break; |
809 | 65 | } |
810 | | |
811 | 55 | if (next_idx > -1) { Branch (811:13): [True: 49, False: 6]
|
812 | 49 | version_path.emplace_back(_version_graph[cur_idx].value, |
813 | 49 | _version_graph[next_idx].value - 1); |
814 | | |
815 | 49 | cur_idx = next_idx; |
816 | 49 | } else { |
817 | 6 | return Status::OK(); |
818 | 6 | } |
819 | 55 | } |
820 | 5 | return Status::OK(); |
821 | 11 | } |
822 | | |
823 | 2 | double VersionGraph::get_orphan_vertex_ratio() { |
824 | 2 | int64_t vertex_num = _version_graph.size(); |
825 | 2 | int64_t orphan_vertex_num = 0; |
826 | 15 | for (auto& iter : _version_graph) { Branch (826:21): [True: 15, False: 2]
|
827 | 15 | if (iter.edges.empty()) { Branch (827:13): [True: 6, False: 9]
|
828 | 6 | ++orphan_vertex_num; |
829 | 6 | } |
830 | 15 | } |
831 | 2 | return orphan_vertex_num / (double)vertex_num; |
832 | 2 | } |
833 | | |
834 | 5 | std::string VersionGraph::debug_string() const { |
835 | 5 | std::stringstream ss; |
836 | 5 | ss << "VersionGraph: ["; |
837 | 100 | for (size_t i = 0; i < _version_graph.size(); ++i) { Branch (837:24): [True: 95, False: 5]
|
838 | 95 | ss << "{value: " << _version_graph[i].value << ", edges: ["; |
839 | 208 | for (const auto& edge : _version_graph[i].edges) { Branch (839:31): [True: 208, False: 95]
|
840 | 208 | if (_version_graph[edge].value > _version_graph[i].value) { Branch (840:17): [True: 104, False: 104]
|
841 | 104 | ss << _version_graph[edge].value << ", "; |
842 | 104 | } |
843 | 208 | } |
844 | 95 | ss << "]}, "; |
845 | 95 | } |
846 | 5 | ss << "]"; |
847 | 5 | return ss.str(); |
848 | 5 | } |
849 | | } // namespace doris |