be/src/storage/version_graph.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/version_graph.h" |
19 | | |
20 | | #include <cctz/time_zone.h> |
21 | | #include <stddef.h> |
22 | | |
23 | | #include <algorithm> |
24 | | // IWYU pragma: no_include <bits/chrono.h> |
25 | | #include <chrono> // IWYU pragma: keep |
26 | | #include <list> |
27 | | #include <memory> |
28 | | #include <optional> |
29 | | #include <ostream> |
30 | | #include <ranges> |
31 | | #include <utility> |
32 | | |
33 | | #include "common/logging.h" |
34 | | |
35 | | namespace doris { |
36 | | #include "common/compile_check_begin.h" |
37 | | |
38 | | using namespace ErrorCode; |
39 | | |
40 | | void TimestampedVersionTracker::construct_versioned_tracker( |
41 | 0 | const RowsetMetaMapContainer& rs_metas) { |
42 | 0 | construct_versioned_tracker(std::views::values(rs_metas)); |
43 | 0 | } |
44 | | |
45 | | void TimestampedVersionTracker::construct_versioned_tracker( |
46 | 466k | const RowsetMetaMapContainer& rs_metas, const RowsetMetaMapContainer& stale_metas) { |
47 | 466k | construct_versioned_tracker(std::views::values(rs_metas), std::views::values(stale_metas)); |
48 | 466k | } |
49 | | |
50 | | bool TimestampedVersionTracker::_find_path_from_stale_map( |
51 | | const std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>>& |
52 | | stale_map, |
53 | | int64_t first_version, int64_t second_version, |
54 | 72.3k | std::vector<RowsetMetaSharedPtr>* stale_path) { |
55 | 72.3k | auto first_iter = stale_map.find(first_version); |
56 | | // If `first_version` not in `stale_map`, there is no path. |
57 | 72.3k | if (first_iter == stale_map.end()) { |
58 | 34.7k | return false; |
59 | 34.7k | } |
60 | 37.5k | auto& second_version_map = first_iter->second; |
61 | 37.5k | auto second_iter = second_version_map.find(second_version); |
62 | | // If second_version in `stale_map`, find a path. |
63 | 37.5k | if (second_iter != second_version_map.end()) { |
64 | 4.22k | auto row_meta = second_iter->second; |
65 | | // Add rowset to path. |
66 | 4.22k | stale_path->push_back(row_meta); |
67 | 4.22k | return true; |
68 | 4.22k | } |
69 | | |
70 | | // Traverse the first version map to backtracking `_find_path_from_stale_map`. |
71 | 33.3k | auto map_iter = second_version_map.begin(); |
72 | 33.3k | while (map_iter != second_version_map.end()) { |
73 | | // The version greater than `second_version`, we can't find path in `stale_map`. |
74 | 33.3k | if (map_iter->first > second_version) { |
75 | 0 | map_iter++; |
76 | 0 | continue; |
77 | 0 | } |
78 | | // Backtracking `_find_path_from_stale_map` find from `map_iter->first + 1` to `second_version`. |
79 | 33.3k | stale_path->push_back(map_iter->second); |
80 | 33.3k | bool r = _find_path_from_stale_map(stale_map, map_iter->first + 1, second_version, |
81 | 33.3k | stale_path); |
82 | 33.3k | if (r) { |
83 | 33.3k | return true; |
84 | 33.3k | } |
85 | | // There is no path in current version, pop and continue. |
86 | 1 | stale_path->pop_back(); |
87 | 1 | map_iter++; |
88 | 1 | } |
89 | | |
90 | 1 | return false; |
91 | 33.3k | } |
92 | | |
93 | 2.52k | void TimestampedVersionTracker::get_stale_version_path_json_doc(rapidjson::Document& path_arr) { |
94 | 2.52k | auto path_arr_iter = _stale_version_path_map.begin(); |
95 | | |
96 | | // Do loop version path. |
97 | 5.23k | while (path_arr_iter != _stale_version_path_map.end()) { |
98 | 2.71k | auto path_id = path_arr_iter->first; |
99 | 2.71k | auto path_version_path = path_arr_iter->second; |
100 | | |
101 | 2.71k | rapidjson::Document item; |
102 | 2.71k | item.SetObject(); |
103 | | // Add `path_id` to item. |
104 | 2.71k | auto path_id_str = std::to_string(path_id); |
105 | 2.71k | rapidjson::Value path_id_value; |
106 | 2.71k | path_id_value.SetString(path_id_str.c_str(), |
107 | 2.71k | static_cast<rapidjson::SizeType>(path_id_str.length()), |
108 | 2.71k | path_arr.GetAllocator()); |
109 | 2.71k | item.AddMember("path id", path_id_value, path_arr.GetAllocator()); |
110 | | |
111 | | // Add max create time to item. |
112 | 2.71k | auto time_zone = cctz::local_time_zone(); |
113 | | |
114 | 2.71k | auto tp = std::chrono::system_clock::from_time_t(path_version_path->max_create_time()); |
115 | 2.71k | auto create_time_str = cctz::format("%Y-%m-%d %H:%M:%S %z", tp, time_zone); |
116 | | |
117 | 2.71k | rapidjson::Value create_time_value; |
118 | 2.71k | create_time_value.SetString(create_time_str.c_str(), |
119 | 2.71k | static_cast<rapidjson::SizeType>(create_time_str.length()), |
120 | 2.71k | path_arr.GetAllocator()); |
121 | 2.71k | item.AddMember("last create time", create_time_value, path_arr.GetAllocator()); |
122 | | |
123 | | // Add path list to item. |
124 | 2.71k | std::stringstream path_list_stream; |
125 | 2.71k | path_list_stream << path_id_str; |
126 | 2.71k | auto path_list_ptr = path_version_path->timestamped_versions(); |
127 | 2.71k | auto path_list_iter = path_list_ptr.begin(); |
128 | 14.6k | while (path_list_iter != path_list_ptr.end()) { |
129 | 11.8k | path_list_stream << " -> "; |
130 | 11.8k | path_list_stream << "["; |
131 | 11.8k | path_list_stream << (*path_list_iter)->version().first; |
132 | 11.8k | path_list_stream << "-"; |
133 | 11.8k | path_list_stream << (*path_list_iter)->version().second; |
134 | 11.8k | path_list_stream << "]"; |
135 | 11.8k | path_list_iter++; |
136 | 11.8k | } |
137 | 2.71k | std::string path_list = path_list_stream.str(); |
138 | 2.71k | rapidjson::Value path_list_value; |
139 | 2.71k | path_list_value.SetString(path_list.c_str(), |
140 | 2.71k | static_cast<rapidjson::SizeType>(path_list.length()), |
141 | 2.71k | path_arr.GetAllocator()); |
142 | 2.71k | item.AddMember("path list", path_list_value, path_arr.GetAllocator()); |
143 | | |
144 | | // Add item to `path_arr`. |
145 | 2.71k | path_arr.PushBack(item, path_arr.GetAllocator()); |
146 | | |
147 | 2.71k | path_arr_iter++; |
148 | 2.71k | } |
149 | 2.52k | } |
150 | | |
151 | | void TimestampedVersionTracker::recover_versioned_tracker( |
152 | 1 | const std::map<int64_t, PathVersionListSharedPtr>& stale_version_path_map) { |
153 | 1 | auto _path_map_iter = stale_version_path_map.begin(); |
154 | | // Recover `stale_version_path_map`. |
155 | 1 | while (_path_map_iter != stale_version_path_map.end()) { |
156 | | // Add `PathVersionListSharedPtr` to map. |
157 | 0 | _stale_version_path_map[_path_map_iter->first] = _path_map_iter->second; |
158 | |
|
159 | 0 | std::vector<TimestampedVersionSharedPtr>& timestamped_versions = |
160 | 0 | _path_map_iter->second->timestamped_versions(); |
161 | 0 | std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter = |
162 | 0 | timestamped_versions.begin(); |
163 | 0 | while (version_path_iter != timestamped_versions.end()) { |
164 | | // Add version to `_version_graph`. |
165 | 0 | _version_graph.add_version_to_graph((*version_path_iter)->version()); |
166 | 0 | ++version_path_iter; |
167 | 0 | } |
168 | 0 | ++_path_map_iter; |
169 | 0 | } |
170 | 1 | LOG(INFO) << "recover_versioned_tracker current map info " << get_current_path_map_str(); |
171 | 1 | } |
172 | | |
173 | 402k | void TimestampedVersionTracker::add_version(const Version& version) { |
174 | 402k | _version_graph.add_version_to_graph(version); |
175 | 402k | } |
176 | | |
177 | | void TimestampedVersionTracker::add_stale_path_version( |
178 | 14.7k | const std::vector<RowsetMetaSharedPtr>& stale_rs_metas) { |
179 | 14.7k | if (stale_rs_metas.empty()) { |
180 | 1 | VLOG_NOTICE << "there is no version in the stale_rs_metas."; |
181 | 1 | return; |
182 | 1 | } |
183 | | |
184 | 14.7k | PathVersionListSharedPtr ptr(new TimestampedVersionPathContainer()); |
185 | 118k | for (auto rs : stale_rs_metas) { |
186 | 118k | TimestampedVersionSharedPtr vt_ptr(new TimestampedVersion(rs->version(), rs->stale_at())); |
187 | 118k | ptr->add_timestamped_version(vt_ptr); |
188 | 118k | } |
189 | | |
190 | 14.7k | std::vector<TimestampedVersionSharedPtr>& timestamped_versions = ptr->timestamped_versions(); |
191 | | |
192 | 14.7k | struct TimestampedVersionPtrCompare { |
193 | 14.7k | bool operator()(const TimestampedVersionSharedPtr ptr1, |
194 | 255k | const TimestampedVersionSharedPtr ptr2) { |
195 | 255k | return ptr1->version().first < ptr2->version().first; |
196 | 255k | } |
197 | 14.7k | }; |
198 | 14.7k | sort(timestamped_versions.begin(), timestamped_versions.end(), TimestampedVersionPtrCompare()); |
199 | 14.7k | _stale_version_path_map[_next_path_id] = ptr; |
200 | 14.7k | _next_path_id++; |
201 | 14.7k | } |
202 | | |
203 | | // Capture consistent versions from graph. |
204 | | Status TimestampedVersionTracker::capture_consistent_versions( |
205 | 1.59M | const Version& spec_version, std::vector<Version>* version_path) const { |
206 | 1.59M | return _version_graph.capture_consistent_versions(spec_version, version_path); |
207 | 1.59M | } |
208 | | |
209 | | Status TimestampedVersionTracker::capture_consistent_versions_with_validator( |
210 | | const Version& spec_version, std::vector<Version>& version_path, |
211 | 11 | const std::function<bool(int64_t, int64_t)>& validator) const { |
212 | 11 | return _version_graph.capture_consistent_versions_with_validator(spec_version, version_path, |
213 | 11 | validator); |
214 | 11 | } |
215 | | |
216 | | Status TimestampedVersionTracker::capture_consistent_versions_prefer_cache( |
217 | | const Version& spec_version, std::vector<Version>& version_path, |
218 | 15 | const std::function<bool(int64_t, int64_t)>& validator) const { |
219 | 15 | return _version_graph.capture_consistent_versions_prefer_cache(spec_version, version_path, |
220 | 15 | validator); |
221 | 15 | } |
222 | | |
223 | | Status TimestampedVersionTracker::capture_consistent_versions_with_validator_mow( |
224 | | const Version& spec_version, std::vector<Version>& version_path, |
225 | 13 | const std::function<bool(int64_t, int64_t)>& validator) const { |
226 | 13 | return _version_graph.capture_consistent_versions_with_validator_mow(spec_version, version_path, |
227 | 13 | validator); |
228 | 13 | } |
229 | | |
230 | | void TimestampedVersionTracker::capture_expired_paths( |
231 | 679k | int64_t stale_sweep_endtime, std::vector<int64_t>* path_version_vec) const { |
232 | 679k | std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter = |
233 | 679k | _stale_version_path_map.begin(); |
234 | | |
235 | 701k | while (iter != _stale_version_path_map.end()) { |
236 | 22.2k | int64_t max_create_time = iter->second->max_create_time(); |
237 | 22.2k | if (max_create_time <= stale_sweep_endtime) { |
238 | 10.2k | int64_t path_version = iter->first; |
239 | 10.2k | path_version_vec->push_back(path_version); |
240 | 10.2k | } |
241 | 22.2k | ++iter; |
242 | 22.2k | } |
243 | 679k | } |
244 | | |
245 | 10.2k | PathVersionListSharedPtr TimestampedVersionTracker::fetch_path_version_by_id(int64_t path_id) { |
246 | 10.2k | if (_stale_version_path_map.count(path_id) == 0) { |
247 | 0 | VLOG_NOTICE << "path version " << path_id << " does not exist!"; |
248 | 0 | return nullptr; |
249 | 0 | } |
250 | | |
251 | 10.2k | return _stale_version_path_map[path_id]; |
252 | 10.2k | } |
253 | | |
254 | 10.2k | PathVersionListSharedPtr TimestampedVersionTracker::fetch_and_delete_path_by_id(int64_t path_id) { |
255 | 10.2k | if (_stale_version_path_map.count(path_id) == 0) { |
256 | 0 | VLOG_NOTICE << "path version " << path_id << " does not exist!"; |
257 | 0 | return nullptr; |
258 | 0 | } |
259 | | |
260 | 10.2k | VLOG_NOTICE << get_current_path_map_str(); |
261 | 10.2k | PathVersionListSharedPtr ptr = fetch_path_version_by_id(path_id); |
262 | | |
263 | 10.2k | _stale_version_path_map.erase(path_id); |
264 | | |
265 | 79.1k | for (auto& version : ptr->timestamped_versions()) { |
266 | 79.1k | static_cast<void>(_version_graph.delete_version_from_graph(version->version())); |
267 | 79.1k | } |
268 | 10.2k | return ptr; |
269 | 10.2k | } |
270 | | |
271 | 14 | std::string TimestampedVersionTracker::get_current_path_map_str() { |
272 | 14 | std::stringstream tracker_info; |
273 | 14 | tracker_info << "current expired next_path_id " << _next_path_id << std::endl; |
274 | | |
275 | 14 | std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter = |
276 | 14 | _stale_version_path_map.begin(); |
277 | 49 | while (iter != _stale_version_path_map.end()) { |
278 | 35 | tracker_info << "current expired path_version " << iter->first; |
279 | 35 | std::vector<TimestampedVersionSharedPtr>& timestamped_versions = |
280 | 35 | iter->second->timestamped_versions(); |
281 | 35 | std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter = |
282 | 35 | timestamped_versions.begin(); |
283 | 35 | int64_t max_create_time = -1; |
284 | 95 | while (version_path_iter != timestamped_versions.end()) { |
285 | 60 | if (max_create_time < (*version_path_iter)->get_create_time()) { |
286 | 35 | max_create_time = (*version_path_iter)->get_create_time(); |
287 | 35 | } |
288 | 60 | tracker_info << " -> ["; |
289 | 60 | tracker_info << (*version_path_iter)->version().first; |
290 | 60 | tracker_info << ","; |
291 | 60 | tracker_info << (*version_path_iter)->version().second; |
292 | 60 | tracker_info << "]"; |
293 | | |
294 | 60 | ++version_path_iter; |
295 | 60 | } |
296 | | |
297 | 35 | tracker_info << std::endl; |
298 | 35 | ++iter; |
299 | 35 | } |
300 | 14 | return tracker_info.str(); |
301 | 14 | } |
302 | | |
303 | 5.10k | double TimestampedVersionTracker::get_orphan_vertex_ratio() { |
304 | 5.10k | return _version_graph.get_orphan_vertex_ratio(); |
305 | 5.10k | } |
306 | | |
307 | 5 | std::string TimestampedVersionTracker::debug_string() const { |
308 | 5 | return _version_graph.debug_string(); |
309 | 5 | } |
310 | | |
311 | 118k | void TimestampedVersionPathContainer::add_timestamped_version(TimestampedVersionSharedPtr version) { |
312 | | // Compare and refresh `_max_create_time`. |
313 | 118k | if (version->get_create_time() > _max_create_time) { |
314 | 17.6k | _max_create_time = version->get_create_time(); |
315 | 17.6k | } |
316 | 118k | _timestamped_versions_container.push_back(version); |
317 | 118k | } |
318 | | |
319 | 37.9k | std::vector<TimestampedVersionSharedPtr>& TimestampedVersionPathContainer::timestamped_versions() { |
320 | 37.9k | return _timestamped_versions_container; |
321 | 37.9k | } |
322 | | |
323 | | void VersionGraph::construct_version_graph(const RowsetMetaMapContainer& rs_metas, |
324 | 0 | int64_t* max_version) { |
325 | 0 | construct_version_graph(std::views::values(rs_metas), max_version); |
326 | 0 | } |
327 | | |
328 | | void VersionGraph::reconstruct_version_graph(const RowsetMetaMapContainer& rs_metas, |
329 | 0 | int64_t* max_version) { |
330 | 0 | reconstruct_version_graph(std::views::values(rs_metas), max_version); |
331 | 0 | } |
332 | | |
333 | 401k | void VersionGraph::add_version_to_graph(const Version& version) { |
334 | | // Add version.first as new vertex of version graph if not exist. |
335 | 401k | int64_t start_vertex_value = version.first; |
336 | 401k | int64_t end_vertex_value = version.second + 1; |
337 | | |
338 | | // Add vertex to graph. |
339 | 401k | _add_vertex_to_graph(start_vertex_value); |
340 | 401k | _add_vertex_to_graph(end_vertex_value); |
341 | | |
342 | 401k | int64_t start_vertex_index = _vertex_index_map[start_vertex_value]; |
343 | 401k | int64_t end_vertex_index = _vertex_index_map[end_vertex_value]; |
344 | | |
345 | | // We assume this version is new version, so we just add two edges |
346 | | // into version graph. add one edge from `start_version` to `end_version` |
347 | | // Make sure the vertex's edges are sorted by version in descending order when inserting. |
348 | 401k | auto end_vertex_it = _version_graph[start_vertex_index].edges.begin(); |
349 | 405k | while (end_vertex_it != _version_graph[start_vertex_index].edges.end()) { |
350 | 259k | if (_version_graph[*end_vertex_it].value < _version_graph[end_vertex_index].value) { |
351 | 255k | break; |
352 | 255k | } |
353 | 3.48k | end_vertex_it++; |
354 | 3.48k | } |
355 | 401k | _version_graph[start_vertex_index].edges.insert(end_vertex_it, end_vertex_index); |
356 | | |
357 | | // We add reverse edge(from end_version to start_version) to graph |
358 | | // Make sure the vertex's edges are sorted by version in descending order when inserting. |
359 | 401k | auto start_vertex_it = _version_graph[end_vertex_index].edges.begin(); |
360 | 441k | while (start_vertex_it != _version_graph[end_vertex_index].edges.end()) { |
361 | 40.0k | if (_version_graph[*start_vertex_it].value < _version_graph[start_vertex_index].value) { |
362 | 485 | break; |
363 | 485 | } |
364 | 39.5k | start_vertex_it++; |
365 | 39.5k | } |
366 | 401k | _version_graph[end_vertex_index].edges.insert(start_vertex_it, start_vertex_index); |
367 | 401k | } |
368 | | |
369 | 79.1k | Status VersionGraph::delete_version_from_graph(const Version& version) { |
370 | 79.1k | int64_t start_vertex_value = version.first; |
371 | 79.1k | int64_t end_vertex_value = version.second + 1; |
372 | | |
373 | 79.1k | if (_vertex_index_map.find(start_vertex_value) == _vertex_index_map.end() || |
374 | 79.1k | _vertex_index_map.find(end_vertex_value) == _vertex_index_map.end()) { |
375 | 0 | return Status::Error<HEADER_DELETE_VERSION>( |
376 | 0 | "vertex for version does not exists. version={}-{}", version.first, version.second); |
377 | 0 | } |
378 | | |
379 | 79.1k | int64_t start_vertex_index = _vertex_index_map[start_vertex_value]; |
380 | 79.1k | int64_t end_vertex_index = _vertex_index_map[end_vertex_value]; |
381 | | // Remove edge and its reverse edge. |
382 | | // When there are same versions in edges, just remove the first version. |
383 | 79.1k | auto start_edges_iter = _version_graph[start_vertex_index].edges.begin(); |
384 | 153k | while (start_edges_iter != _version_graph[start_vertex_index].edges.end()) { |
385 | 153k | if (*start_edges_iter == end_vertex_index) { |
386 | 79.1k | _version_graph[start_vertex_index].edges.erase(start_edges_iter); |
387 | 79.1k | break; |
388 | 79.1k | } |
389 | 74.5k | start_edges_iter++; |
390 | 74.5k | } |
391 | | |
392 | 79.1k | auto end_edges_iter = _version_graph[end_vertex_index].edges.begin(); |
393 | 155k | while (end_edges_iter != _version_graph[end_vertex_index].edges.end()) { |
394 | 155k | if (*end_edges_iter == start_vertex_index) { |
395 | 79.1k | _version_graph[end_vertex_index].edges.erase(end_edges_iter); |
396 | 79.1k | break; |
397 | 79.1k | } |
398 | 76.0k | end_edges_iter++; |
399 | 76.0k | } |
400 | | |
401 | | // Here we do not delete vertex in `_version_graph` even if its edges are empty. |
402 | | // the `_version_graph` will be rebuilt when doing trash sweep. |
403 | 79.1k | return Status::OK(); |
404 | 79.1k | } |
405 | | |
406 | 1.73M | void VersionGraph::_add_vertex_to_graph(int64_t vertex_value) { |
407 | | // Vertex with vertex_value already exists. |
408 | 1.73M | if (_vertex_index_map.find(vertex_value) != _vertex_index_map.end()) { |
409 | 287k | VLOG_NOTICE << "vertex with vertex value already exists. value=" << vertex_value; |
410 | 287k | return; |
411 | 287k | } |
412 | | |
413 | 1.45M | _version_graph.emplace_back(Vertex(vertex_value)); |
414 | 1.45M | _vertex_index_map[vertex_value] = _version_graph.size() - 1; |
415 | 1.45M | } |
416 | | |
417 | | Status VersionGraph::capture_consistent_versions(const Version& spec_version, |
418 | 1.58M | std::vector<Version>* version_path) const { |
419 | 1.58M | if (spec_version.first > spec_version.second) { |
420 | 0 | return Status::Error<INVALID_ARGUMENT, false>( |
421 | 0 | "invalid specified version. spec_version={}-{}", spec_version.first, |
422 | 0 | spec_version.second); |
423 | 0 | } |
424 | | |
425 | 1.58M | int64_t cur_idx = -1; |
426 | 1.59M | for (size_t i = 0; i < _version_graph.size(); i++) { |
427 | 1.59M | if (_version_graph[i].value == spec_version.first) { |
428 | 1.59M | cur_idx = i; |
429 | 1.59M | break; |
430 | 1.59M | } |
431 | 1.59M | } |
432 | | |
433 | 1.58M | if (cur_idx < 0) { |
434 | 0 | return Status::InternalError<false>( |
435 | 0 | "failed to find path in version_graph. spec_version: {}-{}", spec_version.first, |
436 | 0 | spec_version.second); |
437 | 0 | } |
438 | | |
439 | 1.58M | int64_t end_value = spec_version.second + 1; |
440 | 7.75M | while (_version_graph[cur_idx].value < end_value) { |
441 | 6.15M | int64_t next_idx = -1; |
442 | 6.15M | for (const auto& it : _version_graph[cur_idx].edges) { |
443 | | // Only consider incremental versions. |
444 | 6.15M | if (_version_graph[it].value < _version_graph[cur_idx].value) { |
445 | 1 | break; |
446 | 1 | } |
447 | | |
448 | 6.15M | if (_version_graph[it].value > end_value) { |
449 | 8 | continue; |
450 | 8 | } |
451 | | |
452 | | // Considering edges had been sorted by version in descending order, |
453 | | // This version is the largest version that smaller than `end_version`. |
454 | 6.15M | next_idx = it; |
455 | 6.15M | break; |
456 | 6.15M | } |
457 | | |
458 | 6.16M | if (next_idx > -1) { |
459 | 6.16M | if (version_path != nullptr) { |
460 | 6.16M | version_path->emplace_back(_version_graph[cur_idx].value, |
461 | 6.16M | _version_graph[next_idx].value - 1); |
462 | 6.16M | } |
463 | 6.16M | cur_idx = next_idx; |
464 | 18.4E | } else { |
465 | 18.4E | return Status::InternalError<false>( |
466 | 18.4E | "fail to find path in version_graph. spec_version: {}-{}", spec_version.first, |
467 | 18.4E | spec_version.second); |
468 | 18.4E | } |
469 | 6.15M | } |
470 | | |
471 | 1.59M | if (VLOG_TRACE_IS_ON && version_path != nullptr) { |
472 | 0 | std::stringstream shortest_path_for_debug; |
473 | 0 | for (const auto& version : *version_path) { |
474 | 0 | shortest_path_for_debug << version << ' '; |
475 | 0 | } |
476 | 0 | VLOG_TRACE << "success to find path for spec_version. spec_version=" << spec_version |
477 | 0 | << ", path=" << shortest_path_for_debug.str(); |
478 | 0 | } |
479 | | |
480 | 1.59M | return Status::OK(); |
481 | 1.58M | } |
482 | | |
483 | | Status VersionGraph::capture_consistent_versions_prefer_cache( |
484 | | const Version& spec_version, std::vector<Version>& version_path, |
485 | 15 | const std::function<bool(int64_t, int64_t)>& validator) const { |
486 | 15 | if (spec_version.first > spec_version.second) { |
487 | 0 | return Status::Error<INVALID_ARGUMENT, false>( |
488 | 0 | "invalid specified version. spec_version={}-{}", spec_version.first, |
489 | 0 | spec_version.second); |
490 | 0 | } |
491 | | |
492 | 15 | int64_t cur_idx = -1; |
493 | 15 | for (size_t i = 0; i < _version_graph.size(); i++) { |
494 | 15 | if (_version_graph[i].value == spec_version.first) { |
495 | 15 | cur_idx = i; |
496 | 15 | break; |
497 | 15 | } |
498 | 15 | } |
499 | | |
500 | 15 | if (cur_idx < 0) { |
501 | 0 | return Status::InternalError<false>("failed to find path in version_graph. spec_version={}", |
502 | 0 | spec_version.to_string()); |
503 | 0 | } |
504 | | |
505 | 15 | int64_t end_value = spec_version.second + 1; |
506 | 134 | while (_version_graph[cur_idx].value < end_value) { |
507 | 119 | int64_t next_idx = -1; |
508 | 119 | int64_t first_idx = -1; |
509 | 189 | for (const auto& it : _version_graph[cur_idx].edges) { |
510 | | // Only consider incremental versions. |
511 | 189 | if (_version_graph[it].value < _version_graph[cur_idx].value) { |
512 | 55 | break; |
513 | 55 | } |
514 | 134 | if (first_idx == -1) { |
515 | 119 | first_idx = it; |
516 | 119 | } |
517 | | |
518 | 134 | if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) { |
519 | 70 | continue; |
520 | 70 | } |
521 | | |
522 | 64 | next_idx = it; |
523 | 64 | break; |
524 | 134 | } |
525 | | |
526 | 119 | if (next_idx > -1) { |
527 | 64 | version_path.emplace_back(_version_graph[cur_idx].value, |
528 | 64 | _version_graph[next_idx].value - 1); |
529 | | |
530 | 64 | cur_idx = next_idx; |
531 | 64 | } else if (first_idx != -1) { |
532 | | // if all edges are not in cache, use the first edge if possible |
533 | 55 | version_path.emplace_back(_version_graph[cur_idx].value, |
534 | 55 | _version_graph[first_idx].value - 1); |
535 | 55 | cur_idx = first_idx; |
536 | 55 | } else { |
537 | 0 | return Status::OK(); |
538 | 0 | } |
539 | 119 | } |
540 | 15 | return Status::OK(); |
541 | 15 | } |
542 | | |
543 | | Status VersionGraph::capture_consistent_versions_with_validator( |
544 | | const Version& spec_version, std::vector<Version>& version_path, |
545 | 11 | const std::function<bool(int64_t, int64_t)>& validator) const { |
546 | 11 | if (spec_version.first > spec_version.second) { |
547 | 0 | return Status::Error<INVALID_ARGUMENT, false>( |
548 | 0 | "invalid specified version. spec_version={}-{}", spec_version.first, |
549 | 0 | spec_version.second); |
550 | 0 | } |
551 | | |
552 | 11 | int64_t cur_idx = -1; |
553 | 11 | for (size_t i = 0; i < _version_graph.size(); i++) { |
554 | 11 | if (_version_graph[i].value == spec_version.first) { |
555 | 11 | cur_idx = i; |
556 | 11 | break; |
557 | 11 | } |
558 | 11 | } |
559 | | |
560 | 11 | if (cur_idx < 0) { |
561 | 0 | return Status::InternalError<false>("failed to find path in version_graph. spec_version={}", |
562 | 0 | spec_version.to_string()); |
563 | 0 | } |
564 | | |
565 | 11 | int64_t end_value = spec_version.second + 1; |
566 | 54 | while (_version_graph[cur_idx].value < end_value) { |
567 | 52 | int64_t next_idx = -1; |
568 | 66 | for (const auto& it : _version_graph[cur_idx].edges) { |
569 | | // Only consider incremental versions. |
570 | 66 | if (_version_graph[it].value < _version_graph[cur_idx].value) { |
571 | 9 | break; |
572 | 9 | } |
573 | | |
574 | 57 | if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) { |
575 | 14 | continue; |
576 | 14 | } |
577 | | |
578 | 43 | next_idx = it; |
579 | 43 | break; |
580 | 57 | } |
581 | | |
582 | 52 | if (next_idx > -1) { |
583 | 43 | version_path.emplace_back(_version_graph[cur_idx].value, |
584 | 43 | _version_graph[next_idx].value - 1); |
585 | | |
586 | 43 | cur_idx = next_idx; |
587 | 43 | } else { |
588 | 9 | return Status::OK(); |
589 | 9 | } |
590 | 52 | } |
591 | 2 | return Status::OK(); |
592 | 11 | } |
593 | | |
594 | | Status VersionGraph::capture_consistent_versions_with_validator_mow( |
595 | | const Version& spec_version, std::vector<Version>& version_path, |
596 | 13 | const std::function<bool(int64_t, int64_t)>& validator) const { |
597 | 13 | if (spec_version.first > spec_version.second) { |
598 | 0 | return Status::Error<INVALID_ARGUMENT, false>( |
599 | 0 | "invalid specified version. spec_version={}-{}", spec_version.first, |
600 | 0 | spec_version.second); |
601 | 0 | } |
602 | | |
603 | 13 | int64_t cur_idx = -1; |
604 | 13 | for (size_t i = 0; i < _version_graph.size(); i++) { |
605 | 13 | if (_version_graph[i].value == spec_version.first) { |
606 | 13 | cur_idx = i; |
607 | 13 | break; |
608 | 13 | } |
609 | 13 | } |
610 | | |
611 | 13 | if (cur_idx < 0) { |
612 | 0 | return Status::InternalError<false>("failed to find path in version_graph. spec_version={}", |
613 | 0 | spec_version.to_string()); |
614 | 0 | } |
615 | | |
616 | 13 | int64_t end_value = spec_version.second + 1; |
617 | 64 | while (_version_graph[cur_idx].value < end_value) { |
618 | 59 | int64_t next_idx = -1; |
619 | 69 | for (const auto& it : _version_graph[cur_idx].edges) { |
620 | | // Only consider incremental versions. |
621 | 69 | if (_version_graph[it].value < _version_graph[cur_idx].value) { |
622 | 0 | break; |
623 | 0 | } |
624 | | |
625 | 69 | if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) { |
626 | 18 | if (_version_graph[cur_idx].value + 1 == _version_graph[it].value) { |
627 | 8 | break; |
628 | 8 | } |
629 | 10 | end_value = std::min(_version_graph[it].value, end_value); |
630 | 10 | continue; |
631 | 18 | } |
632 | | |
633 | 51 | next_idx = it; |
634 | 51 | break; |
635 | 69 | } |
636 | | |
637 | 59 | if (next_idx > -1) { |
638 | 51 | version_path.emplace_back(_version_graph[cur_idx].value, |
639 | 51 | _version_graph[next_idx].value - 1); |
640 | | |
641 | 51 | cur_idx = next_idx; |
642 | 51 | } else { |
643 | 8 | return Status::OK(); |
644 | 8 | } |
645 | 59 | } |
646 | 5 | return Status::OK(); |
647 | 13 | } |
648 | | |
649 | 5.10k | double VersionGraph::get_orphan_vertex_ratio() { |
650 | 5.10k | int64_t vertex_num = _version_graph.size(); |
651 | 5.10k | int64_t orphan_vertex_num = 0; |
652 | 118k | for (auto& iter : _version_graph) { |
653 | 118k | if (iter.edges.empty()) { |
654 | 68.9k | ++orphan_vertex_num; |
655 | 68.9k | } |
656 | 118k | } |
657 | 5.10k | return static_cast<double>(orphan_vertex_num) / static_cast<double>(vertex_num); |
658 | 5.10k | } |
659 | | |
660 | 5 | std::string VersionGraph::debug_string() const { |
661 | 5 | std::stringstream ss; |
662 | 5 | ss << "VersionGraph: ["; |
663 | 100 | for (size_t i = 0; i < _version_graph.size(); ++i) { |
664 | 95 | ss << "{value: " << _version_graph[i].value << ", edges: ["; |
665 | 208 | for (const auto& edge : _version_graph[i].edges) { |
666 | 208 | if (_version_graph[edge].value > _version_graph[i].value) { |
667 | 104 | ss << _version_graph[edge].value << ", "; |
668 | 104 | } |
669 | 208 | } |
670 | 95 | ss << "]}, "; |
671 | 95 | } |
672 | 5 | ss << "]"; |
673 | 5 | return ss.str(); |
674 | 5 | } |
675 | | |
676 | | #include "common/compile_check_end.h" |
677 | | } // namespace doris |