/root/doris/be/src/olap/version_graph.cpp
| Line | Count | Source | 
| 1 |  | // Licensed to the Apache Software Foundation (ASF) under one | 
| 2 |  | // or more contributor license agreements.  See the NOTICE file | 
| 3 |  | // distributed with this work for additional information | 
| 4 |  | // regarding copyright ownership.  The ASF licenses this file | 
| 5 |  | // to you under the Apache License, Version 2.0 (the | 
| 6 |  | // "License"); you may not use this file except in compliance | 
| 7 |  | // with the License.  You may obtain a copy of the License at | 
| 8 |  | // | 
| 9 |  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
| 10 |  | // | 
| 11 |  | // Unless required by applicable law or agreed to in writing, | 
| 12 |  | // software distributed under the License is distributed on an | 
| 13 |  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 14 |  | // KIND, either express or implied.  See the License for the | 
| 15 |  | // specific language governing permissions and limitations | 
| 16 |  | // under the License. | 
| 17 |  |  | 
| 18 |  | #include "olap/version_graph.h" | 
| 19 |  |  | 
| 20 |  | #include <cctz/time_zone.h> | 
| 21 |  | #include <stddef.h> | 
| 22 |  |  | 
| 23 |  | #include <algorithm> | 
| 24 |  | // IWYU pragma: no_include <bits/chrono.h> | 
| 25 |  | #include <chrono> // IWYU pragma: keep | 
| 26 |  | #include <list> | 
| 27 |  | #include <memory> | 
| 28 |  | #include <optional> | 
| 29 |  | #include <ostream> | 
| 30 |  | #include <ranges> | 
| 31 |  | #include <utility> | 
| 32 |  |  | 
| 33 |  | #include "common/logging.h" | 
| 34 |  |  | 
| 35 |  | namespace doris { | 
| 36 |  | #include "common/compile_check_begin.h" | 
| 37 |  |  | 
| 38 |  | using namespace ErrorCode; | 
| 39 |  |  | 
| 40 |  | void TimestampedVersionTracker::construct_versioned_tracker( | 
| 41 | 0 |         const RowsetMetaMapContainer& rs_metas) { | 
| 42 | 0 |     construct_versioned_tracker(std::views::values(rs_metas)); | 
| 43 | 0 | } | 
| 44 |  |  | 
| 45 |  | void TimestampedVersionTracker::construct_versioned_tracker( | 
| 46 | 666 |         const RowsetMetaMapContainer& rs_metas, const RowsetMetaMapContainer& stale_metas) { | 
| 47 | 666 |     construct_versioned_tracker(std::views::values(rs_metas), std::views::values(stale_metas)); | 
| 48 | 666 | } | 
| 49 |  |  | 
| 50 |  | bool TimestampedVersionTracker::_find_path_from_stale_map( | 
| 51 |  |         const std::unordered_map<int64_t, std::unordered_map<int64_t, RowsetMetaSharedPtr>>& | 
| 52 |  |                 stale_map, | 
| 53 |  |         int64_t first_version, int64_t second_version, | 
| 54 | 16 |         std::vector<RowsetMetaSharedPtr>* stale_path) { | 
| 55 | 16 |     auto first_iter = stale_map.find(first_version); | 
| 56 |  |     // If `first_version` not in `stale_map`, there is no path. | 
| 57 | 16 |     if (first_iter == stale_map.end()) { | 
| 58 | 9 |         return false; | 
| 59 | 9 |     } | 
| 60 | 7 |     auto& second_version_map = first_iter->second; | 
| 61 | 7 |     auto second_iter = second_version_map.find(second_version); | 
| 62 |  |     // If second_version in `stale_map`, find a path. | 
| 63 | 7 |     if (second_iter != second_version_map.end()) { | 
| 64 | 3 |         auto row_meta = second_iter->second; | 
| 65 |  |         // Add rowset to path. | 
| 66 | 3 |         stale_path->push_back(row_meta); | 
| 67 | 3 |         return true; | 
| 68 | 3 |     } | 
| 69 |  |  | 
| 70 |  |     // Traverse the first version map to backtracking  `_find_path_from_stale_map`. | 
| 71 | 4 |     auto map_iter = second_version_map.begin(); | 
| 72 | 5 |     while (map_iter != second_version_map.end()) { | 
| 73 |  |         // The version greater than `second_version`, we can't find path in `stale_map`. | 
| 74 | 4 |         if (map_iter->first > second_version) { | 
| 75 | 0 |             map_iter++; | 
| 76 | 0 |             continue; | 
| 77 | 0 |         } | 
| 78 |  |         // Backtracking `_find_path_from_stale_map` find from `map_iter->first + 1` to `second_version`. | 
| 79 | 4 |         stale_path->push_back(map_iter->second); | 
| 80 | 4 |         bool r = _find_path_from_stale_map(stale_map, map_iter->first + 1, second_version, | 
| 81 | 4 |                                            stale_path); | 
| 82 | 4 |         if (r) { | 
| 83 | 3 |             return true; | 
| 84 | 3 |         } | 
| 85 |  |         // There is no path in current version, pop and continue. | 
| 86 | 1 |         stale_path->pop_back(); | 
| 87 | 1 |         map_iter++; | 
| 88 | 1 |     } | 
| 89 |  |  | 
| 90 | 1 |     return false; | 
| 91 | 4 | } | 
| 92 |  |  | 
| 93 | 35 | void TimestampedVersionTracker::get_stale_version_path_json_doc(rapidjson::Document& path_arr) { | 
| 94 | 35 |     auto path_arr_iter = _stale_version_path_map.begin(); | 
| 95 |  |  | 
| 96 |  |     // Do loop version path. | 
| 97 | 133 |     while (path_arr_iter != _stale_version_path_map.end()) { | 
| 98 | 98 |         auto path_id = path_arr_iter->first; | 
| 99 | 98 |         auto path_version_path = path_arr_iter->second; | 
| 100 |  |  | 
| 101 | 98 |         rapidjson::Document item; | 
| 102 | 98 |         item.SetObject(); | 
| 103 |  |         // Add `path_id` to item. | 
| 104 | 98 |         auto path_id_str = std::to_string(path_id); | 
| 105 | 98 |         rapidjson::Value path_id_value; | 
| 106 | 98 |         path_id_value.SetString(path_id_str.c_str(), | 
| 107 | 98 |                                 static_cast<rapidjson::SizeType>(path_id_str.length()), | 
| 108 | 98 |                                 path_arr.GetAllocator()); | 
| 109 | 98 |         item.AddMember("path id", path_id_value, path_arr.GetAllocator()); | 
| 110 |  |  | 
| 111 |  |         // Add max create time to item. | 
| 112 | 98 |         auto time_zone = cctz::local_time_zone(); | 
| 113 |  |  | 
| 114 | 98 |         auto tp = std::chrono::system_clock::from_time_t(path_version_path->max_create_time()); | 
| 115 | 98 |         auto create_time_str = cctz::format("%Y-%m-%d %H:%M:%S %z", tp, time_zone); | 
| 116 |  |  | 
| 117 | 98 |         rapidjson::Value create_time_value; | 
| 118 | 98 |         create_time_value.SetString(create_time_str.c_str(), | 
| 119 | 98 |                                     static_cast<rapidjson::SizeType>(create_time_str.length()), | 
| 120 | 98 |                                     path_arr.GetAllocator()); | 
| 121 | 98 |         item.AddMember("last create time", create_time_value, path_arr.GetAllocator()); | 
| 122 |  |  | 
| 123 |  |         // Add path list to item. | 
| 124 | 98 |         std::stringstream path_list_stream; | 
| 125 | 98 |         path_list_stream << path_id_str; | 
| 126 | 98 |         auto path_list_ptr = path_version_path->timestamped_versions(); | 
| 127 | 98 |         auto path_list_iter = path_list_ptr.begin(); | 
| 128 | 645 |         while (path_list_iter != path_list_ptr.end()) { | 
| 129 | 547 |             path_list_stream << " -> "; | 
| 130 | 547 |             path_list_stream << "["; | 
| 131 | 547 |             path_list_stream << (*path_list_iter)->version().first; | 
| 132 | 547 |             path_list_stream << "-"; | 
| 133 | 547 |             path_list_stream << (*path_list_iter)->version().second; | 
| 134 | 547 |             path_list_stream << "]"; | 
| 135 | 547 |             path_list_iter++; | 
| 136 | 547 |         } | 
| 137 | 98 |         std::string path_list = path_list_stream.str(); | 
| 138 | 98 |         rapidjson::Value path_list_value; | 
| 139 | 98 |         path_list_value.SetString(path_list.c_str(), | 
| 140 | 98 |                                   static_cast<rapidjson::SizeType>(path_list.length()), | 
| 141 | 98 |                                   path_arr.GetAllocator()); | 
| 142 | 98 |         item.AddMember("path list", path_list_value, path_arr.GetAllocator()); | 
| 143 |  |  | 
| 144 |  |         // Add item to `path_arr`. | 
| 145 | 98 |         path_arr.PushBack(item, path_arr.GetAllocator()); | 
| 146 |  |  | 
| 147 | 98 |         path_arr_iter++; | 
| 148 | 98 |     } | 
| 149 | 35 | } | 
| 150 |  |  | 
| 151 |  | void TimestampedVersionTracker::recover_versioned_tracker( | 
| 152 | 1 |         const std::map<int64_t, PathVersionListSharedPtr>& stale_version_path_map) { | 
| 153 | 1 |     auto _path_map_iter = stale_version_path_map.begin(); | 
| 154 |  |     // Recover `stale_version_path_map`. | 
| 155 | 1 |     while (_path_map_iter != stale_version_path_map.end()) { | 
| 156 |  |         // Add `PathVersionListSharedPtr` to map. | 
| 157 | 0 |         _stale_version_path_map[_path_map_iter->first] = _path_map_iter->second; | 
| 158 |  | 
 | 
| 159 | 0 |         std::vector<TimestampedVersionSharedPtr>& timestamped_versions = | 
| 160 | 0 |                 _path_map_iter->second->timestamped_versions(); | 
| 161 | 0 |         std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter = | 
| 162 | 0 |                 timestamped_versions.begin(); | 
| 163 | 0 |         while (version_path_iter != timestamped_versions.end()) { | 
| 164 |  |             // Add version to `_version_graph`. | 
| 165 | 0 |             _version_graph.add_version_to_graph((*version_path_iter)->version()); | 
| 166 | 0 |             ++version_path_iter; | 
| 167 | 0 |         } | 
| 168 | 0 |         ++_path_map_iter; | 
| 169 | 0 |     } | 
| 170 | 1 |     LOG(INFO) << "recover_versioned_tracker current map info " << get_current_path_map_str(); | 
| 171 | 1 | } | 
| 172 |  |  | 
| 173 | 11.7k | void TimestampedVersionTracker::add_version(const Version& version) { | 
| 174 | 11.7k |     _version_graph.add_version_to_graph(version); | 
| 175 | 11.7k | } | 
| 176 |  |  | 
| 177 |  | void TimestampedVersionTracker::add_stale_path_version( | 
| 178 | 137 |         const std::vector<RowsetMetaSharedPtr>& stale_rs_metas) { | 
| 179 | 137 |     if (stale_rs_metas.empty()) { | 
| 180 | 1 |         VLOG_NOTICE << "there is no version in the stale_rs_metas."; | 
| 181 | 1 |         return; | 
| 182 | 1 |     } | 
| 183 |  |  | 
| 184 | 136 |     PathVersionListSharedPtr ptr(new TimestampedVersionPathContainer()); | 
| 185 | 637 |     for (auto rs : stale_rs_metas) { | 
| 186 | 637 |         TimestampedVersionSharedPtr vt_ptr(new TimestampedVersion(rs->version(), rs->stale_at())); | 
| 187 | 637 |         ptr->add_timestamped_version(vt_ptr); | 
| 188 | 637 |     } | 
| 189 |  |  | 
| 190 | 136 |     std::vector<TimestampedVersionSharedPtr>& timestamped_versions = ptr->timestamped_versions(); | 
| 191 |  |  | 
| 192 | 136 |     struct TimestampedVersionPtrCompare { | 
| 193 | 136 |         bool operator()(const TimestampedVersionSharedPtr ptr1, | 
| 194 | 1.11k |                         const TimestampedVersionSharedPtr ptr2) { | 
| 195 | 1.11k |             return ptr1->version().first < ptr2->version().first; | 
| 196 | 1.11k |         } | 
| 197 | 136 |     }; | 
| 198 | 136 |     sort(timestamped_versions.begin(), timestamped_versions.end(), TimestampedVersionPtrCompare()); | 
| 199 | 136 |     _stale_version_path_map[_next_path_id] = ptr; | 
| 200 | 136 |     _next_path_id++; | 
| 201 | 136 | } | 
| 202 |  |  | 
| 203 |  | // Capture consistent versions from graph. | 
| 204 |  | Status TimestampedVersionTracker::capture_consistent_versions( | 
| 205 | 26 |         const Version& spec_version, std::vector<Version>* version_path) const { | 
| 206 | 26 |     return _version_graph.capture_consistent_versions(spec_version, version_path); | 
| 207 | 26 | } | 
| 208 |  |  | 
| 209 |  | Status TimestampedVersionTracker::capture_consistent_versions_with_validator( | 
| 210 |  |         const Version& spec_version, std::vector<Version>& version_path, | 
| 211 | 9 |         const std::function<bool(int64_t, int64_t)>& validator) const { | 
| 212 | 9 |     return _version_graph.capture_consistent_versions_with_validator(spec_version, version_path, | 
| 213 | 9 |                                                                      validator); | 
| 214 | 9 | } | 
| 215 |  |  | 
| 216 |  | Status TimestampedVersionTracker::capture_consistent_versions_prefer_cache( | 
| 217 |  |         const Version& spec_version, std::vector<Version>& version_path, | 
| 218 | 13 |         const std::function<bool(int64_t, int64_t)>& validator) const { | 
| 219 | 13 |     return _version_graph.capture_consistent_versions_prefer_cache(spec_version, version_path, | 
| 220 | 13 |                                                                    validator); | 
| 221 | 13 | } | 
| 222 |  |  | 
| 223 |  | Status TimestampedVersionTracker::capture_consistent_versions_with_validator_mow( | 
| 224 |  |         const Version& spec_version, std::vector<Version>& version_path, | 
| 225 | 11 |         const std::function<bool(int64_t, int64_t)>& validator) const { | 
| 226 | 11 |     return _version_graph.capture_consistent_versions_with_validator_mow(spec_version, version_path, | 
| 227 | 11 |                                                                          validator); | 
| 228 | 11 | } | 
| 229 |  |  | 
| 230 |  | void TimestampedVersionTracker::capture_expired_paths( | 
| 231 | 5 |         int64_t stale_sweep_endtime, std::vector<int64_t>* path_version_vec) const { | 
| 232 | 5 |     std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter = | 
| 233 | 5 |             _stale_version_path_map.begin(); | 
| 234 |  |  | 
| 235 | 19 |     while (iter != _stale_version_path_map.end()) { | 
| 236 | 14 |         int64_t max_create_time = iter->second->max_create_time(); | 
| 237 | 14 |         if (max_create_time <= stale_sweep_endtime) { | 
| 238 | 9 |             int64_t path_version = iter->first; | 
| 239 | 9 |             path_version_vec->push_back(path_version); | 
| 240 | 9 |         } | 
| 241 | 14 |         ++iter; | 
| 242 | 14 |     } | 
| 243 | 5 | } | 
| 244 |  |  | 
| 245 | 13 | PathVersionListSharedPtr TimestampedVersionTracker::fetch_path_version_by_id(int64_t path_id) { | 
| 246 | 13 |     if (_stale_version_path_map.count(path_id) == 0) { | 
| 247 | 0 |         VLOG_NOTICE << "path version " << path_id << " does not exist!"; | 
| 248 | 0 |         return nullptr; | 
| 249 | 0 |     } | 
| 250 |  |  | 
| 251 | 13 |     return _stale_version_path_map[path_id]; | 
| 252 | 13 | } | 
| 253 |  |  | 
| 254 | 13 | PathVersionListSharedPtr TimestampedVersionTracker::fetch_and_delete_path_by_id(int64_t path_id) { | 
| 255 | 13 |     if (_stale_version_path_map.count(path_id) == 0) { | 
| 256 | 0 |         VLOG_NOTICE << "path version " << path_id << " does not exist!"; | 
| 257 | 0 |         return nullptr; | 
| 258 | 0 |     } | 
| 259 |  |  | 
| 260 | 13 |     VLOG_NOTICE << get_current_path_map_str(); | 
| 261 | 13 |     PathVersionListSharedPtr ptr = fetch_path_version_by_id(path_id); | 
| 262 |  |  | 
| 263 | 13 |     _stale_version_path_map.erase(path_id); | 
| 264 |  |  | 
| 265 | 23 |     for (auto& version : ptr->timestamped_versions()) { | 
| 266 | 23 |         static_cast<void>(_version_graph.delete_version_from_graph(version->version())); | 
| 267 | 23 |     } | 
| 268 | 13 |     return ptr; | 
| 269 | 13 | } | 
| 270 |  |  | 
| 271 | 1 | std::string TimestampedVersionTracker::get_current_path_map_str() { | 
| 272 | 1 |     std::stringstream tracker_info; | 
| 273 | 1 |     tracker_info << "current expired next_path_id " << _next_path_id << std::endl; | 
| 274 |  |  | 
| 275 | 1 |     std::map<int64_t, PathVersionListSharedPtr>::const_iterator iter = | 
| 276 | 1 |             _stale_version_path_map.begin(); | 
| 277 | 1 |     while (iter != _stale_version_path_map.end()) { | 
| 278 | 0 |         tracker_info << "current expired path_version " << iter->first; | 
| 279 | 0 |         std::vector<TimestampedVersionSharedPtr>& timestamped_versions = | 
| 280 | 0 |                 iter->second->timestamped_versions(); | 
| 281 | 0 |         std::vector<TimestampedVersionSharedPtr>::iterator version_path_iter = | 
| 282 | 0 |                 timestamped_versions.begin(); | 
| 283 | 0 |         int64_t max_create_time = -1; | 
| 284 | 0 |         while (version_path_iter != timestamped_versions.end()) { | 
| 285 | 0 |             if (max_create_time < (*version_path_iter)->get_create_time()) { | 
| 286 | 0 |                 max_create_time = (*version_path_iter)->get_create_time(); | 
| 287 | 0 |             } | 
| 288 | 0 |             tracker_info << " -> ["; | 
| 289 | 0 |             tracker_info << (*version_path_iter)->version().first; | 
| 290 | 0 |             tracker_info << ","; | 
| 291 | 0 |             tracker_info << (*version_path_iter)->version().second; | 
| 292 | 0 |             tracker_info << "]"; | 
| 293 |  | 
 | 
| 294 | 0 |             ++version_path_iter; | 
| 295 | 0 |         } | 
| 296 |  | 
 | 
| 297 | 0 |         tracker_info << std::endl; | 
| 298 | 0 |         ++iter; | 
| 299 | 0 |     } | 
| 300 | 1 |     return tracker_info.str(); | 
| 301 | 1 | } | 
| 302 |  |  | 
| 303 | 1 | double TimestampedVersionTracker::get_orphan_vertex_ratio() { | 
| 304 | 1 |     return _version_graph.get_orphan_vertex_ratio(); | 
| 305 | 1 | } | 
| 306 |  |  | 
| 307 | 5 | std::string TimestampedVersionTracker::debug_string() const { | 
| 308 | 5 |     return _version_graph.debug_string(); | 
| 309 | 5 | } | 
| 310 |  |  | 
| 311 | 637 | void TimestampedVersionPathContainer::add_timestamped_version(TimestampedVersionSharedPtr version) { | 
| 312 |  |     // Compare and refresh `_max_create_time`. | 
| 313 | 637 |     if (version->get_create_time() > _max_create_time) { | 
| 314 | 136 |         _max_create_time = version->get_create_time(); | 
| 315 | 136 |     } | 
| 316 | 637 |     _timestamped_versions_container.push_back(version); | 
| 317 | 637 | } | 
| 318 |  |  | 
| 319 | 262 | std::vector<TimestampedVersionSharedPtr>& TimestampedVersionPathContainer::timestamped_versions() { | 
| 320 | 262 |     return _timestamped_versions_container; | 
| 321 | 262 | } | 
| 322 |  |  | 
| 323 |  | void VersionGraph::construct_version_graph(const RowsetMetaMapContainer& rs_metas, | 
| 324 | 0 |                                            int64_t* max_version) { | 
| 325 | 0 |     construct_version_graph(std::views::values(rs_metas), max_version); | 
| 326 | 0 | } | 
| 327 |  |  | 
| 328 |  | void VersionGraph::reconstruct_version_graph(const RowsetMetaMapContainer& rs_metas, | 
| 329 | 0 |                                              int64_t* max_version) { | 
| 330 | 0 |     reconstruct_version_graph(std::views::values(rs_metas), max_version); | 
| 331 | 0 | } | 
| 332 |  |  | 
| 333 | 11.7k | void VersionGraph::add_version_to_graph(const Version& version) { | 
| 334 |  |     // Add version.first as new vertex of version graph if not exist. | 
| 335 | 11.7k |     int64_t start_vertex_value = version.first; | 
| 336 | 11.7k |     int64_t end_vertex_value = version.second + 1; | 
| 337 |  |  | 
| 338 |  |     // Add vertex to graph. | 
| 339 | 11.7k |     _add_vertex_to_graph(start_vertex_value); | 
| 340 | 11.7k |     _add_vertex_to_graph(end_vertex_value); | 
| 341 |  |  | 
| 342 | 11.7k |     int64_t start_vertex_index = _vertex_index_map[start_vertex_value]; | 
| 343 | 11.7k |     int64_t end_vertex_index = _vertex_index_map[end_vertex_value]; | 
| 344 |  |  | 
| 345 |  |     // We assume this version is new version, so we just add two edges | 
| 346 |  |     // into version graph. add one edge from `start_version` to `end_version` | 
| 347 |  |     // Make sure the vertex's edges are sorted by version in descending order when inserting. | 
| 348 | 11.7k |     auto end_vertex_it = _version_graph[start_vertex_index].edges.begin(); | 
| 349 | 11.7k |     while (end_vertex_it != _version_graph[start_vertex_index].edges.end()) { | 
| 350 | 11.3k |         if (_version_graph[*end_vertex_it].value < _version_graph[end_vertex_index].value) { | 
| 351 | 11.3k |             break; | 
| 352 | 11.3k |         } | 
| 353 | 37 |         end_vertex_it++; | 
| 354 | 37 |     } | 
| 355 | 11.7k |     _version_graph[start_vertex_index].edges.insert(end_vertex_it, end_vertex_index); | 
| 356 |  |  | 
| 357 |  |     // We add reverse edge(from end_version to start_version) to graph | 
| 358 |  |     // Make sure the vertex's edges are sorted by version in descending order when inserting. | 
| 359 | 11.7k |     auto start_vertex_it = _version_graph[end_vertex_index].edges.begin(); | 
| 360 | 11.9k |     while (start_vertex_it != _version_graph[end_vertex_index].edges.end()) { | 
| 361 | 216 |         if (_version_graph[*start_vertex_it].value < _version_graph[start_vertex_index].value) { | 
| 362 | 17 |             break; | 
| 363 | 17 |         } | 
| 364 | 199 |         start_vertex_it++; | 
| 365 | 199 |     } | 
| 366 | 11.7k |     _version_graph[end_vertex_index].edges.insert(start_vertex_it, start_vertex_index); | 
| 367 | 11.7k | } | 
| 368 |  |  | 
| 369 | 32 | Status VersionGraph::delete_version_from_graph(const Version& version) { | 
| 370 | 32 |     int64_t start_vertex_value = version.first; | 
| 371 | 32 |     int64_t end_vertex_value = version.second + 1; | 
| 372 |  |  | 
| 373 | 32 |     if (_vertex_index_map.find(start_vertex_value) == _vertex_index_map.end() || | 
| 374 | 32 |         _vertex_index_map.find(end_vertex_value) == _vertex_index_map.end()) { | 
| 375 | 0 |         return Status::Error<HEADER_DELETE_VERSION>( | 
| 376 | 0 |                 "vertex for version does not exists. version={}-{}", version.first, version.second); | 
| 377 | 0 |     } | 
| 378 |  |  | 
| 379 | 32 |     int64_t start_vertex_index = _vertex_index_map[start_vertex_value]; | 
| 380 | 32 |     int64_t end_vertex_index = _vertex_index_map[end_vertex_value]; | 
| 381 |  |     // Remove edge and its reverse edge. | 
| 382 |  |     // When there are same versions in edges, just remove the first version. | 
| 383 | 32 |     auto start_edges_iter = _version_graph[start_vertex_index].edges.begin(); | 
| 384 | 47 |     while (start_edges_iter != _version_graph[start_vertex_index].edges.end()) { | 
| 385 | 47 |         if (*start_edges_iter == end_vertex_index) { | 
| 386 | 32 |             _version_graph[start_vertex_index].edges.erase(start_edges_iter); | 
| 387 | 32 |             break; | 
| 388 | 32 |         } | 
| 389 | 15 |         start_edges_iter++; | 
| 390 | 15 |     } | 
| 391 |  |  | 
| 392 | 32 |     auto end_edges_iter = _version_graph[end_vertex_index].edges.begin(); | 
| 393 | 65 |     while (end_edges_iter != _version_graph[end_vertex_index].edges.end()) { | 
| 394 | 65 |         if (*end_edges_iter == start_vertex_index) { | 
| 395 | 32 |             _version_graph[end_vertex_index].edges.erase(end_edges_iter); | 
| 396 | 32 |             break; | 
| 397 | 32 |         } | 
| 398 | 33 |         end_edges_iter++; | 
| 399 | 33 |     } | 
| 400 |  |  | 
| 401 |  |     // Here we do not delete vertex in `_version_graph` even if its edges are empty. | 
| 402 |  |     // the `_version_graph` will be rebuilt when doing trash sweep. | 
| 403 | 32 |     return Status::OK(); | 
| 404 | 32 | } | 
| 405 |  |  | 
| 406 | 23.8k | void VersionGraph::_add_vertex_to_graph(int64_t vertex_value) { | 
| 407 |  |     // Vertex with vertex_value already exists. | 
| 408 | 23.8k |     if (_vertex_index_map.find(vertex_value) != _vertex_index_map.end()) { | 
| 409 | 11.4k |         VLOG_NOTICE << "vertex with vertex value already exists. value=" << vertex_value; | 
| 410 | 11.4k |         return; | 
| 411 | 11.4k |     } | 
| 412 |  |  | 
| 413 | 12.4k |     _version_graph.emplace_back(Vertex(vertex_value)); | 
| 414 | 12.4k |     _vertex_index_map[vertex_value] = _version_graph.size() - 1; | 
| 415 | 12.4k | } | 
| 416 |  |  | 
| 417 |  | Status VersionGraph::capture_consistent_versions(const Version& spec_version, | 
| 418 | 28 |                                                  std::vector<Version>* version_path) const { | 
| 419 | 28 |     if (spec_version.first > spec_version.second) { | 
| 420 | 0 |         return Status::Error<INVALID_ARGUMENT, false>( | 
| 421 | 0 |                 "invalid specified version. spec_version={}-{}", spec_version.first, | 
| 422 | 0 |                 spec_version.second); | 
| 423 | 0 |     } | 
| 424 |  |  | 
| 425 | 28 |     int64_t cur_idx = -1; | 
| 426 | 32 |     for (size_t i = 0; i < _version_graph.size(); i++) { | 
| 427 | 32 |         if (_version_graph[i].value == spec_version.first) { | 
| 428 | 28 |             cur_idx = i; | 
| 429 | 28 |             break; | 
| 430 | 28 |         } | 
| 431 | 32 |     } | 
| 432 |  |  | 
| 433 | 28 |     if (cur_idx < 0) { | 
| 434 | 0 |         return Status::InternalError<false>( | 
| 435 | 0 |                 "failed to find path in version_graph. spec_version: {}-{}", spec_version.first, | 
| 436 | 0 |                 spec_version.second); | 
| 437 | 0 |     } | 
| 438 |  |  | 
| 439 | 28 |     int64_t end_value = spec_version.second + 1; | 
| 440 | 107 |     while (_version_graph[cur_idx].value < end_value) { | 
| 441 | 80 |         int64_t next_idx = -1; | 
| 442 | 84 |         for (const auto& it : _version_graph[cur_idx].edges) { | 
| 443 |  |             // Only consider incremental versions. | 
| 444 | 84 |             if (_version_graph[it].value < _version_graph[cur_idx].value) { | 
| 445 | 1 |                 break; | 
| 446 | 1 |             } | 
| 447 |  |  | 
| 448 | 83 |             if (_version_graph[it].value > end_value) { | 
| 449 | 4 |                 continue; | 
| 450 | 4 |             } | 
| 451 |  |  | 
| 452 |  |             // Considering edges had been sorted by version in descending order, | 
| 453 |  |             // This version is the largest version that smaller than `end_version`. | 
| 454 | 79 |             next_idx = it; | 
| 455 | 79 |             break; | 
| 456 | 83 |         } | 
| 457 |  |  | 
| 458 | 80 |         if (next_idx > -1) { | 
| 459 | 79 |             if (version_path != nullptr) { | 
| 460 | 59 |                 version_path->emplace_back(_version_graph[cur_idx].value, | 
| 461 | 59 |                                            _version_graph[next_idx].value - 1); | 
| 462 | 59 |             } | 
| 463 | 79 |             cur_idx = next_idx; | 
| 464 | 79 |         } else { | 
| 465 | 1 |             return Status::InternalError<false>( | 
| 466 | 1 |                     "fail to find path in version_graph. spec_version: {}-{}", spec_version.first, | 
| 467 | 1 |                     spec_version.second); | 
| 468 | 1 |         } | 
| 469 | 80 |     } | 
| 470 |  |  | 
| 471 | 27 |     if (VLOG_TRACE_IS_ON && version_path != nullptr) { | 
| 472 | 0 |         std::stringstream shortest_path_for_debug; | 
| 473 | 0 |         for (const auto& version : *version_path) { | 
| 474 | 0 |             shortest_path_for_debug << version << ' '; | 
| 475 | 0 |         } | 
| 476 | 0 |         VLOG_TRACE << "success to find path for spec_version. spec_version=" << spec_version | 
| 477 | 0 |                    << ", path=" << shortest_path_for_debug.str(); | 
| 478 | 0 |     } | 
| 479 |  |  | 
| 480 | 27 |     return Status::OK(); | 
| 481 | 28 | } | 
| 482 |  |  | 
| 483 |  | Status VersionGraph::capture_consistent_versions_prefer_cache( | 
| 484 |  |         const Version& spec_version, std::vector<Version>& version_path, | 
| 485 | 13 |         const std::function<bool(int64_t, int64_t)>& validator) const { | 
| 486 | 13 |     if (spec_version.first > spec_version.second) { | 
| 487 | 0 |         return Status::Error<INVALID_ARGUMENT, false>( | 
| 488 | 0 |                 "invalid specified version. spec_version={}-{}", spec_version.first, | 
| 489 | 0 |                 spec_version.second); | 
| 490 | 0 |     } | 
| 491 |  |  | 
| 492 | 13 |     int64_t cur_idx = -1; | 
| 493 | 13 |     for (size_t i = 0; i < _version_graph.size(); i++) { | 
| 494 | 13 |         if (_version_graph[i].value == spec_version.first) { | 
| 495 | 13 |             cur_idx = i; | 
| 496 | 13 |             break; | 
| 497 | 13 |         } | 
| 498 | 13 |     } | 
| 499 |  |  | 
| 500 | 13 |     if (cur_idx < 0) { | 
| 501 | 0 |         return Status::InternalError<false>("failed to find path in version_graph. spec_version={}", | 
| 502 | 0 |                                             spec_version.to_string()); | 
| 503 | 0 |     } | 
| 504 |  |  | 
| 505 | 13 |     int64_t end_value = spec_version.second + 1; | 
| 506 | 90 |     while (_version_graph[cur_idx].value < end_value) { | 
| 507 | 77 |         int64_t next_idx = -1; | 
| 508 | 77 |         int64_t first_idx = -1; | 
| 509 | 107 |         for (const auto& it : _version_graph[cur_idx].edges) { | 
| 510 |  |             // Only consider incremental versions. | 
| 511 | 107 |             if (_version_graph[it].value < _version_graph[cur_idx].value) { | 
| 512 | 15 |                 break; | 
| 513 | 15 |             } | 
| 514 | 92 |             if (first_idx == -1) { | 
| 515 | 77 |                 first_idx = it; | 
| 516 | 77 |             } | 
| 517 |  |  | 
| 518 | 92 |             if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) { | 
| 519 | 30 |                 continue; | 
| 520 | 30 |             } | 
| 521 |  |  | 
| 522 | 62 |             next_idx = it; | 
| 523 | 62 |             break; | 
| 524 | 92 |         } | 
| 525 |  |  | 
| 526 | 77 |         if (next_idx > -1) { | 
| 527 | 62 |             version_path.emplace_back(_version_graph[cur_idx].value, | 
| 528 | 62 |                                       _version_graph[next_idx].value - 1); | 
| 529 |  |  | 
| 530 | 62 |             cur_idx = next_idx; | 
| 531 | 62 |         } else if (first_idx != -1) { | 
| 532 |  |             // if all edges are not in cache, use the first edge if possible | 
| 533 | 15 |             version_path.emplace_back(_version_graph[cur_idx].value, | 
| 534 | 15 |                                       _version_graph[first_idx].value - 1); | 
| 535 | 15 |             cur_idx = first_idx; | 
| 536 | 15 |         } else { | 
| 537 | 0 |             return Status::OK(); | 
| 538 | 0 |         } | 
| 539 | 77 |     } | 
| 540 | 13 |     return Status::OK(); | 
| 541 | 13 | } | 
| 542 |  |  | 
| 543 |  | Status VersionGraph::capture_consistent_versions_with_validator( | 
| 544 |  |         const Version& spec_version, std::vector<Version>& version_path, | 
| 545 | 9 |         const std::function<bool(int64_t, int64_t)>& validator) const { | 
| 546 | 9 |     if (spec_version.first > spec_version.second) { | 
| 547 | 0 |         return Status::Error<INVALID_ARGUMENT, false>( | 
| 548 | 0 |                 "invalid specified version. spec_version={}-{}", spec_version.first, | 
| 549 | 0 |                 spec_version.second); | 
| 550 | 0 |     } | 
| 551 |  |  | 
| 552 | 9 |     int64_t cur_idx = -1; | 
| 553 | 9 |     for (size_t i = 0; i < _version_graph.size(); i++) { | 
| 554 | 9 |         if (_version_graph[i].value == spec_version.first) { | 
| 555 | 9 |             cur_idx = i; | 
| 556 | 9 |             break; | 
| 557 | 9 |         } | 
| 558 | 9 |     } | 
| 559 |  |  | 
| 560 | 9 |     if (cur_idx < 0) { | 
| 561 | 0 |         return Status::InternalError<false>("failed to find path in version_graph. spec_version={}", | 
| 562 | 0 |                                             spec_version.to_string()); | 
| 563 | 0 |     } | 
| 564 |  |  | 
| 565 | 9 |     int64_t end_value = spec_version.second + 1; | 
| 566 | 50 |     while (_version_graph[cur_idx].value < end_value) { | 
| 567 | 48 |         int64_t next_idx = -1; | 
| 568 | 60 |         for (const auto& it : _version_graph[cur_idx].edges) { | 
| 569 |  |             // Only consider incremental versions. | 
| 570 | 60 |             if (_version_graph[it].value < _version_graph[cur_idx].value) { | 
| 571 | 7 |                 break; | 
| 572 | 7 |             } | 
| 573 |  |  | 
| 574 | 53 |             if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) { | 
| 575 | 12 |                 continue; | 
| 576 | 12 |             } | 
| 577 |  |  | 
| 578 | 41 |             next_idx = it; | 
| 579 | 41 |             break; | 
| 580 | 53 |         } | 
| 581 |  |  | 
| 582 | 48 |         if (next_idx > -1) { | 
| 583 | 41 |             version_path.emplace_back(_version_graph[cur_idx].value, | 
| 584 | 41 |                                       _version_graph[next_idx].value - 1); | 
| 585 |  |  | 
| 586 | 41 |             cur_idx = next_idx; | 
| 587 | 41 |         } else { | 
| 588 | 7 |             return Status::OK(); | 
| 589 | 7 |         } | 
| 590 | 48 |     } | 
| 591 | 2 |     return Status::OK(); | 
| 592 | 9 | } | 
| 593 |  |  | 
| 594 |  | Status VersionGraph::capture_consistent_versions_with_validator_mow( | 
| 595 |  |         const Version& spec_version, std::vector<Version>& version_path, | 
| 596 | 11 |         const std::function<bool(int64_t, int64_t)>& validator) const { | 
| 597 | 11 |     if (spec_version.first > spec_version.second) { | 
| 598 | 0 |         return Status::Error<INVALID_ARGUMENT, false>( | 
| 599 | 0 |                 "invalid specified version. spec_version={}-{}", spec_version.first, | 
| 600 | 0 |                 spec_version.second); | 
| 601 | 0 |     } | 
| 602 |  |  | 
| 603 | 11 |     int64_t cur_idx = -1; | 
| 604 | 11 |     for (size_t i = 0; i < _version_graph.size(); i++) { | 
| 605 | 11 |         if (_version_graph[i].value == spec_version.first) { | 
| 606 | 11 |             cur_idx = i; | 
| 607 | 11 |             break; | 
| 608 | 11 |         } | 
| 609 | 11 |     } | 
| 610 |  |  | 
| 611 | 11 |     if (cur_idx < 0) { | 
| 612 | 0 |         return Status::InternalError<false>("failed to find path in version_graph. spec_version={}", | 
| 613 | 0 |                                             spec_version.to_string()); | 
| 614 | 0 |     } | 
| 615 |  |  | 
| 616 | 11 |     int64_t end_value = spec_version.second + 1; | 
| 617 | 60 |     while (_version_graph[cur_idx].value < end_value) { | 
| 618 | 55 |         int64_t next_idx = -1; | 
| 619 | 65 |         for (const auto& it : _version_graph[cur_idx].edges) { | 
| 620 |  |             // Only consider incremental versions. | 
| 621 | 65 |             if (_version_graph[it].value < _version_graph[cur_idx].value) { | 
| 622 | 0 |                 break; | 
| 623 | 0 |             } | 
| 624 |  |  | 
| 625 | 65 |             if (!validator(_version_graph[cur_idx].value, _version_graph[it].value - 1)) { | 
| 626 | 16 |                 if (_version_graph[cur_idx].value + 1 == _version_graph[it].value) { | 
| 627 | 6 |                     break; | 
| 628 | 6 |                 } | 
| 629 | 10 |                 end_value = std::min(_version_graph[it].value, end_value); | 
| 630 | 10 |                 continue; | 
| 631 | 16 |             } | 
| 632 |  |  | 
| 633 | 49 |             next_idx = it; | 
| 634 | 49 |             break; | 
| 635 | 65 |         } | 
| 636 |  |  | 
| 637 | 55 |         if (next_idx > -1) { | 
| 638 | 49 |             version_path.emplace_back(_version_graph[cur_idx].value, | 
| 639 | 49 |                                       _version_graph[next_idx].value - 1); | 
| 640 |  |  | 
| 641 | 49 |             cur_idx = next_idx; | 
| 642 | 49 |         } else { | 
| 643 | 6 |             return Status::OK(); | 
| 644 | 6 |         } | 
| 645 | 55 |     } | 
| 646 | 5 |     return Status::OK(); | 
| 647 | 11 | } | 
| 648 |  |  | 
| 649 | 2 | double VersionGraph::get_orphan_vertex_ratio() { | 
| 650 | 2 |     int64_t vertex_num = _version_graph.size(); | 
| 651 | 2 |     int64_t orphan_vertex_num = 0; | 
| 652 | 15 |     for (auto& iter : _version_graph) { | 
| 653 | 15 |         if (iter.edges.empty()) { | 
| 654 | 6 |             ++orphan_vertex_num; | 
| 655 | 6 |         } | 
| 656 | 15 |     } | 
| 657 | 2 |     return static_cast<double>(orphan_vertex_num) / static_cast<double>(vertex_num); | 
| 658 | 2 | } | 
| 659 |  |  | 
| 660 | 5 | std::string VersionGraph::debug_string() const { | 
| 661 | 5 |     std::stringstream ss; | 
| 662 | 5 |     ss << "VersionGraph: ["; | 
| 663 | 100 |     for (size_t i = 0; i < _version_graph.size(); ++i) { | 
| 664 | 95 |         ss << "{value: " << _version_graph[i].value << ", edges: ["; | 
| 665 | 208 |         for (const auto& edge : _version_graph[i].edges) { | 
| 666 | 208 |             if (_version_graph[edge].value > _version_graph[i].value) { | 
| 667 | 104 |                 ss << _version_graph[edge].value << ", "; | 
| 668 | 104 |             } | 
| 669 | 208 |         } | 
| 670 | 95 |         ss << "]}, "; | 
| 671 | 95 |     } | 
| 672 | 5 |     ss << "]"; | 
| 673 | 5 |     return ss.str(); | 
| 674 | 5 | } | 
| 675 |  |  | 
| 676 |  | #include "common/compile_check_end.h" | 
| 677 |  | } // namespace doris |