/root/doris/be/src/olap/base_tablet.cpp
| Line | Count | Source | 
| 1 |  | // Licensed to the Apache Software Foundation (ASF) under one | 
| 2 |  | // or more contributor license agreements.  See the NOTICE file | 
| 3 |  | // distributed with this work for additional information | 
| 4 |  | // regarding copyright ownership.  The ASF licenses this file | 
| 5 |  | // to you under the Apache License, Version 2.0 (the | 
| 6 |  | // "License"); you may not use this file except in compliance | 
| 7 |  | // with the License.  You may obtain a copy of the License at | 
| 8 |  | // | 
| 9 |  | //   http://www.apache.org/licenses/LICENSE-2.0 | 
| 10 |  | // | 
| 11 |  | // Unless required by applicable law or agreed to in writing, | 
| 12 |  | // software distributed under the License is distributed on an | 
| 13 |  | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
| 14 |  | // KIND, either express or implied.  See the License for the | 
| 15 |  | // specific language governing permissions and limitations | 
| 16 |  | // under the License. | 
| 17 |  |  | 
| 18 |  | #include "olap/base_tablet.h" | 
| 19 |  |  | 
| 20 |  | #include <bthread/mutex.h> | 
| 21 |  | #include <fmt/format.h> | 
| 22 |  | #include <rapidjson/prettywriter.h> | 
| 23 |  |  | 
| 24 |  | #include <algorithm> | 
| 25 |  | #include <cstdint> | 
| 26 |  | #include <iterator> | 
| 27 |  | #include <random> | 
| 28 |  | #include <shared_mutex> | 
| 29 |  |  | 
| 30 |  | #include "cloud/cloud_tablet.h" | 
| 31 |  | #include "cloud/config.h" | 
| 32 |  | #include "common/cast_set.h" | 
| 33 |  | #include "common/logging.h" | 
| 34 |  | #include "common/status.h" | 
| 35 |  | #include "olap/calc_delete_bitmap_executor.h" | 
| 36 |  | #include "olap/cumulative_compaction_time_series_policy.h" | 
| 37 |  | #include "olap/delete_bitmap_calculator.h" | 
| 38 |  | #include "olap/iterators.h" | 
| 39 |  | #include "olap/memtable.h" | 
| 40 |  | #include "olap/partial_update_info.h" | 
| 41 |  | #include "olap/primary_key_index.h" | 
| 42 |  | #include "olap/rowid_conversion.h" | 
| 43 |  | #include "olap/rowset/beta_rowset.h" | 
| 44 |  | #include "olap/rowset/rowset.h" | 
| 45 |  | #include "olap/rowset/rowset_fwd.h" | 
| 46 |  | #include "olap/rowset/rowset_reader.h" | 
| 47 |  | #include "olap/tablet_fwd.h" | 
| 48 |  | #include "olap/txn_manager.h" | 
| 49 |  | #include "service/point_query_executor.h" | 
| 50 |  | #include "util/bvar_helper.h" | 
| 51 |  | #include "util/crc32c.h" | 
| 52 |  | #include "util/debug_points.h" | 
| 53 |  | #include "util/doris_metrics.h" | 
| 54 |  | #include "util/key_util.h" | 
| 55 |  | #include "vec/common/assert_cast.h" | 
| 56 |  | #include "vec/common/schema_util.h" | 
| 57 |  | #include "vec/data_types/data_type_factory.hpp" | 
| 58 |  | #include "vec/jsonb/serialize.h" | 
| 59 |  |  | 
| 60 |  | namespace doris { | 
| 61 |  | #include "common/compile_check_begin.h" | 
| 62 |  |  | 
| 63 |  | using namespace ErrorCode; | 
| 64 |  |  | 
| 65 |  | namespace { | 
| 66 |  |  | 
| 67 |  | bvar::LatencyRecorder g_tablet_commit_phase_update_delete_bitmap_latency( | 
| 68 |  |         "doris_pk", "commit_phase_update_delete_bitmap"); | 
| 69 |  | bvar::LatencyRecorder g_tablet_lookup_rowkey_latency("doris_pk", "tablet_lookup_rowkey"); | 
| 70 |  | bvar::Adder<uint64_t> g_tablet_pk_not_found("doris_pk", "lookup_not_found"); | 
| 71 |  | bvar::PerSecond<bvar::Adder<uint64_t>> g_tablet_pk_not_found_per_second( | 
| 72 |  |         "doris_pk", "lookup_not_found_per_second", &g_tablet_pk_not_found, 60); | 
| 73 |  | bvar::LatencyRecorder g_tablet_update_delete_bitmap_latency("doris_pk", "update_delete_bitmap"); | 
| 74 |  |  | 
| 75 |  | static bvar::Adder<size_t> g_total_tablet_num("doris_total_tablet_num"); | 
| 76 |  |  | 
| 77 |  | Status _get_segment_column_iterator(const BetaRowsetSharedPtr& rowset, uint32_t segid, | 
| 78 |  |                                     const TabletColumn& target_column, | 
| 79 |  |                                     SegmentCacheHandle* segment_cache_handle, | 
| 80 |  |                                     std::unique_ptr<segment_v2::ColumnIterator>* column_iterator, | 
| 81 | 0 |                                     OlapReaderStatistics* stats) { | 
| 82 | 0 |     RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, segment_cache_handle, true)); | 
| 83 |  |     // find segment | 
| 84 | 0 |     auto it = std::find_if( | 
| 85 | 0 |             segment_cache_handle->get_segments().begin(), | 
| 86 | 0 |             segment_cache_handle->get_segments().end(), | 
| 87 | 0 |             [&segid](const segment_v2::SegmentSharedPtr& seg) { return seg->id() == segid; }); | 
| 88 | 0 |     if (it == segment_cache_handle->get_segments().end()) { | 
| 89 | 0 |         return Status::NotFound(fmt::format("rowset {} 's segemnt not found, seg_id {}", | 
| 90 | 0 |                                             rowset->rowset_id().to_string(), segid)); | 
| 91 | 0 |     } | 
| 92 | 0 |     segment_v2::SegmentSharedPtr segment = *it; | 
| 93 | 0 |     StorageReadOptions opts; | 
| 94 | 0 |     opts.stats = stats; | 
| 95 | 0 |     RETURN_IF_ERROR(segment->new_column_iterator(target_column, column_iterator, &opts)); | 
| 96 | 0 |     segment_v2::ColumnIteratorOptions opt { | 
| 97 | 0 |             .use_page_cache = !config::disable_storage_page_cache, | 
| 98 | 0 |             .file_reader = segment->file_reader().get(), | 
| 99 | 0 |             .stats = stats, | 
| 100 | 0 |             .io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY, | 
| 101 | 0 |                                      .file_cache_stats = &stats->file_cache_stats}, | 
| 102 | 0 |     }; | 
| 103 | 0 |     RETURN_IF_ERROR((*column_iterator)->init(opt)); | 
| 104 | 0 |     return Status::OK(); | 
| 105 | 0 | } | 
| 106 |  |  | 
| 107 |  | } // namespace | 
| 108 |  |  | 
| 109 |  | extern MetricPrototype METRIC_query_scan_bytes; | 
| 110 |  | extern MetricPrototype METRIC_query_scan_rows; | 
| 111 |  | extern MetricPrototype METRIC_query_scan_count; | 
| 112 |  | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_bytes, MetricUnit::BYTES); | 
| 113 |  | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_finish_count, MetricUnit::OPERATIONS); | 
| 114 |  |  | 
| 115 | 665 | BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta) : _tablet_meta(std::move(tablet_meta)) { | 
| 116 | 665 |     _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( | 
| 117 | 665 |             fmt::format("Tablet.{}", tablet_id()), {{"tablet_id", std::to_string(tablet_id())}}, | 
| 118 | 665 |             MetricEntityType::kTablet); | 
| 119 | 665 |     INT_COUNTER_METRIC_REGISTER(_metric_entity, query_scan_bytes); | 
| 120 | 665 |     INT_COUNTER_METRIC_REGISTER(_metric_entity, query_scan_rows); | 
| 121 | 665 |     INT_COUNTER_METRIC_REGISTER(_metric_entity, query_scan_count); | 
| 122 | 665 |     INT_COUNTER_METRIC_REGISTER(_metric_entity, flush_bytes); | 
| 123 | 665 |     INT_COUNTER_METRIC_REGISTER(_metric_entity, flush_finish_count); | 
| 124 |  |  | 
| 125 |  |     // construct _timestamped_versioned_tracker from rs and stale rs meta | 
| 126 | 665 |     _timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas(), | 
| 127 | 665 |                                                              _tablet_meta->all_stale_rs_metas()); | 
| 128 |  |  | 
| 129 |  |     // if !_tablet_meta->all_rs_metas()[0]->tablet_schema(), | 
| 130 |  |     // that mean the tablet_meta is still no upgrade to doris 1.2 versions. | 
| 131 |  |     // Before doris 1.2 version, rowset metas don't have tablet schema. | 
| 132 |  |     // And when upgrade to doris 1.2 version, | 
| 133 |  |     // all rowset metas will be set the tablet schmea from tablet meta. | 
| 134 | 665 |     if (_tablet_meta->all_rs_metas().empty() || | 
| 135 | 665 |         !_tablet_meta->all_rs_metas().begin()->second->tablet_schema()) { | 
| 136 | 628 |         _max_version_schema = _tablet_meta->tablet_schema(); | 
| 137 | 628 |     } else { | 
| 138 | 37 |         std::vector<RowsetMetaSharedPtr> rowset_metas(_tablet_meta->all_rs_metas().size()); | 
| 139 | 37 |         std::transform(_tablet_meta->all_rs_metas().begin(), _tablet_meta->all_rs_metas().end(), | 
| 140 | 197 |                        rowset_metas.begin(), [](const auto& it) { return it.second; }); | 
| 141 | 37 |         _max_version_schema = tablet_schema_with_merged_max_schema_version(rowset_metas); | 
| 142 | 37 |     } | 
| 143 | 665 |     DCHECK(_max_version_schema); | 
| 144 | 665 |     g_total_tablet_num << 1; | 
| 145 | 665 | } | 
| 146 |  |  | 
| 147 | 665 | BaseTablet::~BaseTablet() { | 
| 148 | 665 |     DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity); | 
| 149 | 665 |     g_total_tablet_num << -1; | 
| 150 | 665 | } | 
| 151 |  |  | 
| 152 |  | TabletSchemaSPtr BaseTablet::tablet_schema_with_merged_max_schema_version( | 
| 153 | 79 |         const std::vector<RowsetMetaSharedPtr>& rowset_metas) { | 
| 154 | 79 |     RowsetMetaSharedPtr max_schema_version_rs = *std::max_element( | 
| 155 | 248 |             rowset_metas.begin(), rowset_metas.end(), [](const auto& a, const auto& b) -> bool { | 
| 156 | 248 |                 return !a->tablet_schema() | 
| 157 | 248 |                                ? true | 
| 158 | 248 |                                : (!b->tablet_schema() | 
| 159 | 248 |                                           ? false | 
| 160 | 248 |                                           : a->tablet_schema()->schema_version() < | 
| 161 | 248 |                                                     b->tablet_schema()->schema_version()); | 
| 162 | 248 |             }); | 
| 163 | 79 |     return max_schema_version_rs->tablet_schema(); | 
| 164 | 79 | } | 
| 165 |  |  | 
| 166 | 257 | Status BaseTablet::set_tablet_state(TabletState state) { | 
| 167 | 257 |     if (_tablet_meta->tablet_state() == TABLET_SHUTDOWN && state != TABLET_SHUTDOWN) { | 
| 168 | 0 |         return Status::Error<META_INVALID_ARGUMENT>( | 
| 169 | 0 |                 "could not change tablet state from shutdown to {}", state); | 
| 170 | 0 |     } | 
| 171 | 257 |     _tablet_meta->set_tablet_state(state); | 
| 172 | 257 |     return Status::OK(); | 
| 173 | 257 | } | 
| 174 |  |  | 
| 175 | 0 | void BaseTablet::update_max_version_schema(const TabletSchemaSPtr& tablet_schema) { | 
| 176 | 0 |     std::lock_guard wrlock(_meta_lock); | 
| 177 |  |     // Double Check for concurrent update | 
| 178 | 0 |     if (!_max_version_schema || | 
| 179 | 0 |         tablet_schema->schema_version() > _max_version_schema->schema_version()) { | 
| 180 | 0 |         _max_version_schema = tablet_schema; | 
| 181 | 0 |     } | 
| 182 | 0 | } | 
| 183 |  |  | 
| 184 | 233 | uint32_t BaseTablet::get_real_compaction_score() const { | 
| 185 | 233 |     std::shared_lock l(_meta_lock); | 
| 186 | 233 |     const auto& rs_metas = _tablet_meta->all_rs_metas(); | 
| 187 | 10.7k |     return std::accumulate(rs_metas.begin(), rs_metas.end(), 0, [](uint32_t score, const auto& it) { | 
| 188 | 10.7k |         return score + it.second->get_compaction_score(); | 
| 189 | 10.7k |     }); | 
| 190 | 233 | } | 
| 191 |  |  | 
| 192 |  | Status BaseTablet::capture_rs_readers_unlocked(const Versions& version_path, | 
| 193 | 0 |                                                std::vector<RowSetSplits>* rs_splits) const { | 
| 194 | 0 |     DCHECK(rs_splits != nullptr && rs_splits->empty()); | 
| 195 | 0 |     for (auto version : version_path) { | 
| 196 | 0 |         auto it = _rs_version_map.find(version); | 
| 197 | 0 |         if (it == _rs_version_map.end()) { | 
| 198 | 0 |             VLOG_NOTICE << "fail to find Rowset in rs_version for version. tablet=" << tablet_id() | 
| 199 | 0 |                         << ", version='" << version.first << "-" << version.second; | 
| 200 |  | 
 | 
| 201 | 0 |             it = _stale_rs_version_map.find(version); | 
| 202 | 0 |             if (it == _stale_rs_version_map.end()) { | 
| 203 | 0 |                 return Status::Error<CAPTURE_ROWSET_READER_ERROR>( | 
| 204 | 0 |                         "fail to find Rowset in stale_rs_version for version. tablet={}, " | 
| 205 | 0 |                         "version={}-{}", | 
| 206 | 0 |                         tablet_id(), version.first, version.second); | 
| 207 | 0 |             } | 
| 208 | 0 |         } | 
| 209 | 0 |         RowsetReaderSharedPtr rs_reader; | 
| 210 | 0 |         auto res = it->second->create_reader(&rs_reader); | 
| 211 | 0 |         if (!res.ok()) { | 
| 212 | 0 |             return Status::Error<CAPTURE_ROWSET_READER_ERROR>( | 
| 213 | 0 |                     "failed to create reader for rowset:{}", it->second->rowset_id().to_string()); | 
| 214 | 0 |         } | 
| 215 | 0 |         rs_splits->emplace_back(std::move(rs_reader)); | 
| 216 | 0 |     } | 
| 217 | 0 |     return Status::OK(); | 
| 218 | 0 | } | 
| 219 |  |  | 
| 220 |  | // snapshot manager may call this api to check if version exists, so that | 
| 221 |  | // the version maybe not exist | 
| 222 |  | RowsetSharedPtr BaseTablet::get_rowset_by_version(const Version& version, | 
| 223 | 12 |                                                   bool find_in_stale) const { | 
| 224 | 12 |     auto iter = _rs_version_map.find(version); | 
| 225 | 12 |     if (iter == _rs_version_map.end()) { | 
| 226 | 0 |         if (find_in_stale) { | 
| 227 | 0 |             return get_stale_rowset_by_version(version); | 
| 228 | 0 |         } | 
| 229 | 0 |         return nullptr; | 
| 230 | 0 |     } | 
| 231 | 12 |     return iter->second; | 
| 232 | 12 | } | 
| 233 |  |  | 
| 234 | 0 | RowsetSharedPtr BaseTablet::get_stale_rowset_by_version(const Version& version) const { | 
| 235 | 0 |     auto iter = _stale_rs_version_map.find(version); | 
| 236 | 0 |     if (iter == _stale_rs_version_map.end()) { | 
| 237 | 0 |         VLOG_NOTICE << "no rowset for version:" << version << ", tablet: " << tablet_id(); | 
| 238 | 0 |         return nullptr; | 
| 239 | 0 |     } | 
| 240 | 0 |     return iter->second; | 
| 241 | 0 | } | 
| 242 |  |  | 
| 243 |  | // Already under _meta_lock | 
| 244 | 42 | RowsetSharedPtr BaseTablet::get_rowset_with_max_version() const { | 
| 245 | 42 |     Version max_version = _tablet_meta->max_version(); | 
| 246 | 42 |     if (max_version.first == -1) { | 
| 247 | 0 |         return nullptr; | 
| 248 | 0 |     } | 
| 249 |  |  | 
| 250 | 42 |     auto iter = _rs_version_map.find(max_version); | 
| 251 | 42 |     if (iter == _rs_version_map.end()) { | 
| 252 | 0 |         DCHECK(false) << "invalid version:" << max_version; | 
| 253 | 0 |         return nullptr; | 
| 254 | 0 |     } | 
| 255 | 42 |     return iter->second; | 
| 256 | 42 | } | 
| 257 |  |  | 
| 258 | 0 | Status BaseTablet::get_all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const { | 
| 259 | 0 |     std::shared_lock rlock(_meta_lock); | 
| 260 | 0 |     return get_all_rs_id_unlocked(max_version, rowset_ids); | 
| 261 | 0 | } | 
| 262 |  |  | 
| 263 |  | Status BaseTablet::get_all_rs_id_unlocked(int64_t max_version, | 
| 264 | 9 |                                           RowsetIdUnorderedSet* rowset_ids) const { | 
| 265 |  |     //  Ensure that the obtained versions of rowsets are continuous | 
| 266 | 9 |     Version spec_version(0, max_version); | 
| 267 | 9 |     Versions version_path; | 
| 268 | 9 |     auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path); | 
| 269 | 9 |     if (!st.ok()) [[unlikely]] { | 
| 270 | 0 |         return st; | 
| 271 | 0 |     } | 
| 272 |  |  | 
| 273 | 11 |     for (auto& ver : version_path) { | 
| 274 | 11 |         if (ver.second == 1) { | 
| 275 |  |             // [0-1] rowset is empty for each tablet, skip it | 
| 276 | 9 |             continue; | 
| 277 | 9 |         } | 
| 278 | 2 |         auto it = _rs_version_map.find(ver); | 
| 279 | 2 |         if (it == _rs_version_map.end()) { | 
| 280 | 0 |             return Status::Error<CAPTURE_ROWSET_ERROR, false>( | 
| 281 | 0 |                     "fail to find Rowset for version. tablet={}, version={}", tablet_id(), | 
| 282 | 0 |                     ver.to_string()); | 
| 283 | 0 |         } | 
| 284 | 2 |         rowset_ids->emplace(it->second->rowset_id()); | 
| 285 | 2 |     } | 
| 286 | 9 |     return Status::OK(); | 
| 287 | 9 | } | 
| 288 |  |  | 
| 289 | 0 | Versions BaseTablet::get_missed_versions(int64_t spec_version) const { | 
| 290 | 0 |     DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version; | 
| 291 |  | 
 | 
| 292 | 0 |     Versions existing_versions; | 
| 293 | 0 |     { | 
| 294 | 0 |         std::shared_lock rdlock(_meta_lock); | 
| 295 | 0 |         for (const auto& [ver, _] : _tablet_meta->all_rs_metas()) { | 
| 296 | 0 |             existing_versions.emplace_back(ver); | 
| 297 | 0 |         } | 
| 298 | 0 |     } | 
| 299 | 0 |     return calc_missed_versions(spec_version, std::move(existing_versions)); | 
| 300 | 0 | } | 
| 301 |  |  | 
| 302 | 2 | Versions BaseTablet::get_missed_versions_unlocked(int64_t spec_version) const { | 
| 303 | 2 |     DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version; | 
| 304 |  |  | 
| 305 | 2 |     Versions existing_versions; | 
| 306 | 8 |     for (const auto& [ver, _] : _tablet_meta->all_rs_metas()) { | 
| 307 | 8 |         existing_versions.emplace_back(ver); | 
| 308 | 8 |     } | 
| 309 | 2 |     return calc_missed_versions(spec_version, std::move(existing_versions)); | 
| 310 | 2 | } | 
| 311 |  |  | 
| 312 | 1 | void BaseTablet::_print_missed_versions(const Versions& missed_versions) const { | 
| 313 | 1 |     std::stringstream ss; | 
| 314 | 1 |     ss << tablet_id() << " has " << missed_versions.size() << " missed version:"; | 
| 315 |  |     // print at most 10 version | 
| 316 | 3 |     for (int i = 0; i < 10 && i < missed_versions.size(); ++i) { | 
| 317 | 2 |         ss << missed_versions[i] << ","; | 
| 318 | 2 |     } | 
| 319 | 1 |     LOG(WARNING) << ss.str(); | 
| 320 | 1 | } | 
| 321 |  |  | 
| 322 | 1 | bool BaseTablet::_reconstruct_version_tracker_if_necessary() { | 
| 323 | 1 |     double orphan_vertex_ratio = _timestamped_version_tracker.get_orphan_vertex_ratio(); | 
| 324 | 1 |     if (orphan_vertex_ratio >= config::tablet_version_graph_orphan_vertex_ratio) { | 
| 325 | 1 |         _timestamped_version_tracker.construct_versioned_tracker( | 
| 326 | 1 |                 _tablet_meta->all_rs_metas(), _tablet_meta->all_stale_rs_metas()); | 
| 327 | 1 |         return true; | 
| 328 | 1 |     } | 
| 329 | 0 |     return false; | 
| 330 | 1 | } | 
| 331 |  |  | 
| 332 |  | // should use this method to get a copy of current tablet meta | 
| 333 |  | // there are some rowset meta in local meta store and in in-memory tablet meta | 
| 334 |  | // but not in tablet meta in local meta store | 
| 335 |  | void BaseTablet::generate_tablet_meta_copy(TabletMeta& new_tablet_meta, | 
| 336 | 0 |                                            bool cloud_get_rowset_meta) const { | 
| 337 | 0 |     std::shared_lock rdlock(_meta_lock); | 
| 338 | 0 |     generate_tablet_meta_copy_unlocked(new_tablet_meta, cloud_get_rowset_meta); | 
| 339 | 0 | } | 
| 340 |  |  | 
| 341 |  | // this is a unlocked version of generate_tablet_meta_copy() | 
| 342 |  | // some method already hold the _meta_lock before calling this, | 
| 343 |  | // such as EngineCloneTask::_finish_clone -> tablet->revise_tablet_meta | 
| 344 |  | void BaseTablet::generate_tablet_meta_copy_unlocked(TabletMeta& new_tablet_meta, | 
| 345 | 4 |                                                     bool cloud_get_rowset_meta) const { | 
| 346 | 4 |     TabletMetaPB tablet_meta_pb; | 
| 347 | 4 |     _tablet_meta->to_meta_pb(&tablet_meta_pb, cloud_get_rowset_meta); | 
| 348 | 4 |     new_tablet_meta.init_from_pb(tablet_meta_pb); | 
| 349 | 4 | } | 
| 350 |  |  | 
| 351 |  | Status BaseTablet::calc_delete_bitmap_between_segments( | 
| 352 |  |         TabletSchemaSPtr schema, RowsetId rowset_id, | 
| 353 | 0 |         const std::vector<segment_v2::SegmentSharedPtr>& segments, DeleteBitmapPtr delete_bitmap) { | 
| 354 | 0 |     size_t const num_segments = segments.size(); | 
| 355 | 0 |     if (num_segments < 2) { | 
| 356 | 0 |         return Status::OK(); | 
| 357 | 0 |     } | 
| 358 |  |  | 
| 359 | 0 |     OlapStopWatch watch; | 
| 360 | 0 |     size_t seq_col_length = 0; | 
| 361 | 0 |     if (schema->has_sequence_col()) { | 
| 362 | 0 |         auto seq_col_idx = schema->sequence_col_idx(); | 
| 363 | 0 |         seq_col_length = schema->column(seq_col_idx).length() + 1; | 
| 364 | 0 |     } | 
| 365 | 0 |     size_t rowid_length = 0; | 
| 366 | 0 |     if (!schema->cluster_key_uids().empty()) { | 
| 367 | 0 |         rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH; | 
| 368 | 0 |     } | 
| 369 |  | 
 | 
| 370 | 0 |     MergeIndexDeleteBitmapCalculator calculator; | 
| 371 | 0 |     RETURN_IF_ERROR(calculator.init(rowset_id, segments, seq_col_length, rowid_length)); | 
| 372 |  |  | 
| 373 | 0 |     RETURN_IF_ERROR(calculator.calculate_all(delete_bitmap)); | 
| 374 |  |  | 
| 375 | 0 |     delete_bitmap->add( | 
| 376 | 0 |             {rowset_id, DeleteBitmap::INVALID_SEGMENT_ID, DeleteBitmap::TEMP_VERSION_COMMON}, | 
| 377 | 0 |             DeleteBitmap::ROWSET_SENTINEL_MARK); | 
| 378 | 0 |     LOG(INFO) << fmt::format( | 
| 379 | 0 |             "construct delete bitmap between segments, " | 
| 380 | 0 |             "tablet: {}, rowset: {}, number of segments: {}, bitmap count: {}, bitmap cardinality: " | 
| 381 | 0 |             "{}, cost {} (us)", | 
| 382 | 0 |             tablet_id(), rowset_id.to_string(), num_segments, | 
| 383 | 0 |             delete_bitmap->get_delete_bitmap_count(), delete_bitmap->cardinality(), | 
| 384 | 0 |             watch.get_elapse_time_us()); | 
| 385 | 0 |     return Status::OK(); | 
| 386 | 0 | } | 
| 387 |  |  | 
| 388 |  | std::vector<RowsetSharedPtr> BaseTablet::get_rowset_by_ids( | 
| 389 | 72 |         const RowsetIdUnorderedSet* specified_rowset_ids) { | 
| 390 | 72 |     std::vector<RowsetSharedPtr> rowsets; | 
| 391 | 72 |     for (auto& rs : _rs_version_map) { | 
| 392 | 15 |         if (!specified_rowset_ids || | 
| 393 | 15 |             specified_rowset_ids->find(rs.second->rowset_id()) != specified_rowset_ids->end()) { | 
| 394 | 1 |             rowsets.push_back(rs.second); | 
| 395 | 1 |         } | 
| 396 | 15 |     } | 
| 397 |  |  | 
| 398 | 72 |     std::sort(rowsets.begin(), rowsets.end(), [](RowsetSharedPtr& lhs, RowsetSharedPtr& rhs) { | 
| 399 | 0 |         return lhs->end_version() > rhs->end_version(); | 
| 400 | 0 |     }); | 
| 401 | 72 |     return rowsets; | 
| 402 | 72 | } | 
| 403 |  |  | 
| 404 |  | Status BaseTablet::lookup_row_data(const Slice& encoded_key, const RowLocation& row_location, | 
| 405 |  |                                    RowsetSharedPtr input_rowset, OlapReaderStatistics& stats, | 
| 406 | 0 |                                    std::string& values, bool write_to_cache) { | 
| 407 | 0 |     MonotonicStopWatch watch; | 
| 408 | 0 |     size_t row_size = 1; | 
| 409 | 0 |     watch.start(); | 
| 410 | 0 |     Defer _defer([&]() { | 
| 411 | 0 |         LOG_EVERY_N(INFO, 500) << "get a single_row, cost(us):" << watch.elapsed_time() / 1000 | 
| 412 | 0 |                                << ", row_size:" << row_size; | 
| 413 | 0 |     }); | 
| 414 |  | 
 | 
| 415 | 0 |     BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(input_rowset); | 
| 416 | 0 |     CHECK(rowset); | 
| 417 | 0 |     const TabletSchemaSPtr tablet_schema = rowset->tablet_schema(); | 
| 418 | 0 |     SegmentCacheHandle segment_cache_handle; | 
| 419 | 0 |     std::unique_ptr<segment_v2::ColumnIterator> column_iterator; | 
| 420 | 0 |     const auto& column = *DORIS_TRY(tablet_schema->column(BeConsts::ROW_STORE_COL)); | 
| 421 | 0 |     RETURN_IF_ERROR(_get_segment_column_iterator(rowset, row_location.segment_id, column, | 
| 422 | 0 |                                                  &segment_cache_handle, &column_iterator, &stats)); | 
| 423 |  |     // get and parse tuple row | 
| 424 | 0 |     vectorized::MutableColumnPtr column_ptr = vectorized::ColumnString::create(); | 
| 425 | 0 |     std::vector<segment_v2::rowid_t> rowids {static_cast<segment_v2::rowid_t>(row_location.row_id)}; | 
| 426 | 0 |     RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 1, column_ptr)); | 
| 427 | 0 |     assert(column_ptr->size() == 1); | 
| 428 | 0 |     auto* string_column = static_cast<vectorized::ColumnString*>(column_ptr.get()); | 
| 429 | 0 |     StringRef value = string_column->get_data_at(0); | 
| 430 | 0 |     values = value.to_string(); | 
| 431 | 0 |     if (write_to_cache) { | 
| 432 | 0 |         RowCache::instance()->insert({tablet_id(), encoded_key}, Slice {value.data, value.size}); | 
| 433 | 0 |     } | 
| 434 | 0 |     return Status::OK(); | 
| 435 | 0 | } | 
| 436 |  |  | 
| 437 |  | Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest_schema, | 
| 438 |  |                                   bool with_seq_col, | 
| 439 |  |                                   const std::vector<RowsetSharedPtr>& specified_rowsets, | 
| 440 |  |                                   RowLocation* row_location, int64_t version, | 
| 441 |  |                                   std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches, | 
| 442 |  |                                   RowsetSharedPtr* rowset, bool with_rowid, | 
| 443 |  |                                   std::string* encoded_seq_value, OlapReaderStatistics* stats, | 
| 444 | 2 |                                   DeleteBitmapPtr delete_bitmap) { | 
| 445 | 2 |     SCOPED_BVAR_LATENCY(g_tablet_lookup_rowkey_latency); | 
| 446 | 2 |     size_t seq_col_length = 0; | 
| 447 |  |     // use the latest tablet schema to decide if the tablet has sequence column currently | 
| 448 | 2 |     const TabletSchema* schema = | 
| 449 | 2 |             (latest_schema == nullptr ? _tablet_meta->tablet_schema().get() : latest_schema); | 
| 450 | 2 |     if (schema->has_sequence_col() && with_seq_col) { | 
| 451 | 2 |         seq_col_length = schema->column(schema->sequence_col_idx()).length() + 1; | 
| 452 | 2 |     } | 
| 453 | 2 |     size_t rowid_length = 0; | 
| 454 | 2 |     if (with_rowid && !schema->cluster_key_uids().empty()) { | 
| 455 | 0 |         rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH; | 
| 456 | 0 |     } | 
| 457 | 2 |     Slice key_without_seq = | 
| 458 | 2 |             Slice(encoded_key.get_data(), encoded_key.get_size() - seq_col_length - rowid_length); | 
| 459 | 2 |     RowLocation loc; | 
| 460 |  |  | 
| 461 | 2 |     auto tablet_delete_bitmap = | 
| 462 | 2 |             delete_bitmap == nullptr ? _tablet_meta->delete_bitmap_ptr() : delete_bitmap; | 
| 463 | 2 |     for (size_t i = 0; i < specified_rowsets.size(); i++) { | 
| 464 | 2 |         const auto& rs = specified_rowsets[i]; | 
| 465 | 2 |         std::vector<KeyBoundsPB> segments_key_bounds; | 
| 466 | 2 |         rs->rowset_meta()->get_segments_key_bounds(&segments_key_bounds); | 
| 467 | 2 |         int num_segments = cast_set<int>(rs->num_segments()); | 
| 468 | 2 |         DCHECK_EQ(segments_key_bounds.size(), num_segments); | 
| 469 | 2 |         std::vector<uint32_t> picked_segments; | 
| 470 | 4 |         for (int j = num_segments - 1; j >= 0; j--) { | 
| 471 | 2 |             if (key_is_not_in_segment(key_without_seq, segments_key_bounds[j], | 
| 472 | 2 |                                       rs->rowset_meta()->is_segments_key_bounds_truncated())) { | 
| 473 | 0 |                 continue; | 
| 474 | 0 |             } | 
| 475 | 2 |             picked_segments.emplace_back(j); | 
| 476 | 2 |         } | 
| 477 | 2 |         if (picked_segments.empty()) { | 
| 478 | 0 |             continue; | 
| 479 | 0 |         } | 
| 480 |  |  | 
| 481 | 2 |         if (UNLIKELY(segment_caches[i] == nullptr)) { | 
| 482 | 1 |             segment_caches[i] = std::make_unique<SegmentCacheHandle>(); | 
| 483 | 1 |             RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( | 
| 484 | 1 |                     std::static_pointer_cast<BetaRowset>(rs), segment_caches[i].get(), true, true)); | 
| 485 | 1 |         } | 
| 486 | 2 |         auto& segments = segment_caches[i]->get_segments(); | 
| 487 | 2 |         DCHECK_EQ(segments.size(), num_segments); | 
| 488 |  |  | 
| 489 | 2 |         for (auto id : picked_segments) { | 
| 490 | 2 |             Status s = segments[id]->lookup_row_key(encoded_key, schema, with_seq_col, with_rowid, | 
| 491 | 2 |                                                     &loc, stats, encoded_seq_value); | 
| 492 | 2 |             if (s.is<KEY_NOT_FOUND>()) { | 
| 493 | 0 |                 continue; | 
| 494 | 0 |             } | 
| 495 | 2 |             if (!s.ok() && !s.is<KEY_ALREADY_EXISTS>()) { | 
| 496 | 0 |                 return s; | 
| 497 | 0 |             } | 
| 498 | 2 |             if (s.ok() && tablet_delete_bitmap->contains_agg_with_cache_if_eligible( | 
| 499 | 1 |                                   {loc.rowset_id, loc.segment_id, version}, loc.row_id)) { | 
| 500 |  |                 // if has sequence col, we continue to compare the sequence_id of | 
| 501 |  |                 // all rowsets, util we find an existing key. | 
| 502 | 0 |                 if (schema->has_sequence_col()) { | 
| 503 | 0 |                     continue; | 
| 504 | 0 |                 } | 
| 505 |  |                 // The key is deleted, we don't need to search for it any more. | 
| 506 | 0 |                 break; | 
| 507 | 0 |             } | 
| 508 |  |             // `st` is either OK or KEY_ALREADY_EXISTS now. | 
| 509 |  |             // for partial update, even if the key is already exists, we still need to | 
| 510 |  |             // read it's original values to keep all columns align. | 
| 511 | 2 |             *row_location = loc; | 
| 512 | 2 |             if (rowset) { | 
| 513 |  |                 // return it's rowset | 
| 514 | 2 |                 *rowset = rs; | 
| 515 | 2 |             } | 
| 516 |  |             // find it and return | 
| 517 | 2 |             return s; | 
| 518 | 2 |         } | 
| 519 | 2 |     } | 
| 520 | 0 |     g_tablet_pk_not_found << 1; | 
| 521 | 0 |     return Status::Error<ErrorCode::KEY_NOT_FOUND>("can't find key in all rowsets"); | 
| 522 | 2 | } | 
| 523 |  |  | 
| 524 |  | // if user pass a token, then all calculation works will submit to a threadpool, | 
| 525 |  | // user can get all delete bitmaps from that token. | 
| 526 |  | // if `token` is nullptr, the calculation will run in local, and user can get the result | 
| 527 |  | // delete bitmap from `delete_bitmap` directly. | 
| 528 |  | Status BaseTablet::calc_delete_bitmap( | 
| 529 |  |         const BaseTabletSPtr& tablet, RowsetSharedPtr rowset, | 
| 530 |  |         const std::vector<segment_v2::SegmentSharedPtr>& segments, | 
| 531 |  |         const std::vector<RowsetSharedPtr>& specified_rowsets, DeleteBitmapPtr delete_bitmap, | 
| 532 |  |         int64_t end_version, CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer, | 
| 533 |  |         DeleteBitmapPtr tablet_delete_bitmap, | 
| 534 | 69 |         std::function<void(segment_v2::SegmentSharedPtr, Status)> callback) { | 
| 535 | 69 |     if (specified_rowsets.empty() || segments.empty()) { | 
| 536 | 68 |         return Status::OK(); | 
| 537 | 68 |     } | 
| 538 |  |  | 
| 539 | 1 |     OlapStopWatch watch; | 
| 540 | 1 |     for (const auto& segment : segments) { | 
| 541 | 1 |         const auto& seg = segment; | 
| 542 | 1 |         if (token != nullptr) { | 
| 543 | 1 |             RETURN_IF_ERROR(token->submit(tablet, rowset, seg, specified_rowsets, end_version, | 
| 544 | 1 |                                           delete_bitmap, rowset_writer, tablet_delete_bitmap, | 
| 545 | 1 |                                           callback)); | 
| 546 | 1 |         } else { | 
| 547 | 0 |             RETURN_IF_ERROR(tablet->calc_segment_delete_bitmap( | 
| 548 | 0 |                     rowset, segment, specified_rowsets, delete_bitmap, end_version, rowset_writer, | 
| 549 | 0 |                     tablet_delete_bitmap)); | 
| 550 | 0 |         } | 
| 551 | 1 |     } | 
| 552 |  |  | 
| 553 | 1 |     return Status::OK(); | 
| 554 | 1 | } | 
| 555 |  |  | 
| 556 |  | Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, | 
| 557 |  |                                               const segment_v2::SegmentSharedPtr& seg, | 
| 558 |  |                                               const std::vector<RowsetSharedPtr>& specified_rowsets, | 
| 559 |  |                                               DeleteBitmapPtr delete_bitmap, int64_t end_version, | 
| 560 |  |                                               RowsetWriter* rowset_writer, | 
| 561 | 1 |                                               DeleteBitmapPtr tablet_delete_bitmap) { | 
| 562 | 1 |     OlapStopWatch watch; | 
| 563 | 1 |     auto rowset_id = rowset->rowset_id(); | 
| 564 | 1 |     Version dummy_version(end_version + 1, end_version + 1); | 
| 565 | 1 |     auto rowset_schema = rowset->tablet_schema(); | 
| 566 |  |  | 
| 567 | 1 |     PartialUpdateInfo* partial_update_info {nullptr}; | 
| 568 | 1 |     bool is_partial_update = rowset_writer && rowset_writer->is_partial_update(); | 
| 569 |  |     // `have_input_seq_column` is for fixed partial update only. For flexible partial update, we should use | 
| 570 |  |     // the skip bitmap to determine wheather a row has specified the sequence column | 
| 571 | 1 |     bool have_input_seq_column = false; | 
| 572 |  |     // `rids_be_overwritten` is for flexible partial update only, it records row ids that is overwritten by | 
| 573 |  |     // another row with higher seqeucne value | 
| 574 | 1 |     std::set<uint32_t> rids_be_overwritten; | 
| 575 | 1 |     if (is_partial_update) { | 
| 576 | 0 |         partial_update_info = rowset_writer->get_partial_update_info().get(); | 
| 577 | 0 |         if (partial_update_info->is_fixed_partial_update() && rowset_schema->has_sequence_col()) { | 
| 578 | 0 |             std::vector<uint32_t> including_cids = | 
| 579 | 0 |                     rowset_writer->get_partial_update_info()->update_cids; | 
| 580 | 0 |             have_input_seq_column = | 
| 581 | 0 |                     rowset_schema->has_sequence_col() && | 
| 582 | 0 |                     (std::find(including_cids.cbegin(), including_cids.cend(), | 
| 583 | 0 |                                rowset_schema->sequence_col_idx()) != including_cids.cend()); | 
| 584 | 0 |         } | 
| 585 | 0 |     } | 
| 586 |  |  | 
| 587 | 1 |     DBUG_EXECUTE_IF("BaseTablet::calc_segment_delete_bitmap.sleep", { | 
| 588 | 1 |         auto target_tablet_id = dp->param<int64_t>("tablet_id", -1); | 
| 589 | 1 |         auto sleep = dp->param<int64_t>("sleep", 10); | 
| 590 | 1 |         if (target_tablet_id == tablet_id()) { | 
| 591 | 1 |             std::this_thread::sleep_for(std::chrono::seconds(sleep)); | 
| 592 | 1 |         } | 
| 593 | 1 |     }); | 
| 594 |  |  | 
| 595 | 1 |     if (rowset_schema->num_variant_columns() > 0) { | 
| 596 |  |         // During partial updates, the extracted columns of a variant should not be included in the rowset schema. | 
| 597 |  |         // This is because the partial update for a variant needs to ignore the extracted columns. | 
| 598 |  |         // Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update, | 
| 599 |  |         // the complete variant is constructed by reading all the sub-columns of the variant. | 
| 600 | 0 |         rowset_schema = rowset_schema->copy_without_variant_extracted_columns(); | 
| 601 | 0 |     } | 
| 602 |  |     // use for partial update | 
| 603 | 1 |     FixedReadPlan read_plan_ori; | 
| 604 | 1 |     FixedReadPlan read_plan_update; | 
| 605 | 1 |     int64_t conflict_rows = 0; | 
| 606 | 1 |     int64_t new_generated_rows = 0; | 
| 607 |  |  | 
| 608 | 1 |     std::map<RowsetId, RowsetSharedPtr> rsid_to_rowset; | 
| 609 | 1 |     rsid_to_rowset[rowset_id] = rowset; | 
| 610 | 1 |     vectorized::Block block = rowset_schema->create_block(); | 
| 611 | 1 |     vectorized::Block ordered_block = block.clone_empty(); | 
| 612 | 1 |     uint32_t pos = 0; | 
| 613 |  |  | 
| 614 | 1 |     RETURN_IF_ERROR(seg->load_pk_index_and_bf(nullptr)); // We need index blocks to iterate | 
| 615 | 1 |     const auto* pk_idx = seg->get_primary_key_index(); | 
| 616 | 1 |     int64_t total = pk_idx->num_rows(); | 
| 617 | 1 |     uint32_t row_id = 0; | 
| 618 | 1 |     int64_t remaining = total; | 
| 619 | 1 |     bool exact_match = false; | 
| 620 | 1 |     std::string last_key; | 
| 621 | 1 |     int batch_size = 1024; | 
| 622 |  |     // The data for each segment may be lookup multiple times. Creating a SegmentCacheHandle | 
| 623 |  |     // will update the lru cache, and there will be obvious lock competition in multithreading | 
| 624 |  |     // scenarios, so using a segment_caches to cache SegmentCacheHandle. | 
| 625 | 1 |     std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size()); | 
| 626 | 2 |     while (remaining > 0) { | 
| 627 | 1 |         std::unique_ptr<segment_v2::IndexedColumnIterator> iter; | 
| 628 | 1 |         RETURN_IF_ERROR(pk_idx->new_iterator(&iter, nullptr)); | 
| 629 |  |  | 
| 630 | 1 |         size_t num_to_read = std::min<int64_t>(batch_size, remaining); | 
| 631 | 1 |         auto index_type = vectorized::DataTypeFactory::instance().create_data_type( | 
| 632 | 1 |                 pk_idx->type_info()->type(), 1, 0); | 
| 633 | 1 |         auto index_column = index_type->create_column(); | 
| 634 | 1 |         Slice last_key_slice(last_key); | 
| 635 | 1 |         RETURN_IF_ERROR(iter->seek_at_or_after(&last_key_slice, &exact_match)); | 
| 636 | 1 |         auto current_ordinal = iter->get_current_ordinal(); | 
| 637 | 1 |         DCHECK(total == remaining + current_ordinal) | 
| 638 | 0 |                 << "total: " << total << ", remaining: " << remaining | 
| 639 | 0 |                 << ", current_ordinal: " << current_ordinal; | 
| 640 |  |  | 
| 641 | 1 |         size_t num_read = num_to_read; | 
| 642 | 1 |         RETURN_IF_ERROR(iter->next_batch(&num_read, index_column)); | 
| 643 | 1 |         DCHECK(num_to_read == num_read) | 
| 644 | 0 |                 << "num_to_read: " << num_to_read << ", num_read: " << num_read; | 
| 645 | 1 |         last_key = index_column->get_data_at(num_read - 1).to_string(); | 
| 646 |  |  | 
| 647 |  |         // exclude last_key, last_key will be read in next batch. | 
| 648 | 1 |         if (num_read == batch_size && num_read != remaining) { | 
| 649 | 0 |             num_read -= 1; | 
| 650 | 0 |         } | 
| 651 | 3 |         for (size_t i = 0; i < num_read; i++, row_id++) { | 
| 652 | 2 |             Slice key = Slice(index_column->get_data_at(i).data, index_column->get_data_at(i).size); | 
| 653 | 2 |             RowLocation loc; | 
| 654 |  |             // calculate row id | 
| 655 | 2 |             if (!_tablet_meta->tablet_schema()->cluster_key_uids().empty()) { | 
| 656 | 0 |                 size_t seq_col_length = 0; | 
| 657 | 0 |                 if (_tablet_meta->tablet_schema()->has_sequence_col()) { | 
| 658 | 0 |                     seq_col_length = | 
| 659 | 0 |                             _tablet_meta->tablet_schema() | 
| 660 | 0 |                                     ->column(_tablet_meta->tablet_schema()->sequence_col_idx()) | 
| 661 | 0 |                                     .length() + | 
| 662 | 0 |                             1; | 
| 663 | 0 |                 } | 
| 664 | 0 |                 size_t rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH; | 
| 665 | 0 |                 Slice key_without_seq = | 
| 666 | 0 |                         Slice(key.get_data(), key.get_size() - seq_col_length - rowid_length); | 
| 667 | 0 |                 Slice rowid_slice = | 
| 668 | 0 |                         Slice(key.get_data() + key_without_seq.get_size() + seq_col_length + 1, | 
| 669 | 0 |                               rowid_length - 1); | 
| 670 | 0 |                 const auto* type_info = | 
| 671 | 0 |                         get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>(); | 
| 672 | 0 |                 const auto* rowid_coder = get_key_coder(type_info->type()); | 
| 673 | 0 |                 RETURN_IF_ERROR(rowid_coder->decode_ascending(&rowid_slice, rowid_length, | 
| 674 | 0 |                                                               (uint8_t*)&row_id)); | 
| 675 | 0 |             } | 
| 676 |  |             // same row in segments should be filtered | 
| 677 | 2 |             if (delete_bitmap->contains({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}, | 
| 678 | 2 |                                         row_id)) { | 
| 679 | 0 |                 continue; | 
| 680 | 0 |             } | 
| 681 |  |  | 
| 682 | 2 |             DBUG_EXECUTE_IF("BaseTablet::calc_segment_delete_bitmap.inject_err", { | 
| 683 | 2 |                 auto p = dp->param("percent", 0.01); | 
| 684 | 2 |                 std::mt19937 gen {std::random_device {}()}; | 
| 685 | 2 |                 std::bernoulli_distribution inject_fault {p}; | 
| 686 | 2 |                 if (inject_fault(gen)) { | 
| 687 | 2 |                     return Status::InternalError( | 
| 688 | 2 |                             "injection error in calc_segment_delete_bitmap, " | 
| 689 | 2 |                             "tablet_id={}, rowset_id={}", | 
| 690 | 2 |                             tablet_id(), rowset_id.to_string()); | 
| 691 | 2 |                 } | 
| 692 | 2 |             }); | 
| 693 |  |  | 
| 694 | 2 |             RowsetSharedPtr rowset_find; | 
| 695 | 2 |             Status st = Status::OK(); | 
| 696 | 2 |             if (tablet_delete_bitmap == nullptr) { | 
| 697 | 2 |                 st = lookup_row_key(key, rowset_schema.get(), true, specified_rowsets, &loc, | 
| 698 | 2 |                                     dummy_version.first - 1, segment_caches, &rowset_find); | 
| 699 | 2 |             } else { | 
| 700 | 0 |                 st = lookup_row_key(key, rowset_schema.get(), true, specified_rowsets, &loc, | 
| 701 | 0 |                                     dummy_version.first - 1, segment_caches, &rowset_find, true, | 
| 702 | 0 |                                     nullptr, nullptr, tablet_delete_bitmap); | 
| 703 | 0 |             } | 
| 704 | 2 |             bool expected_st = st.ok() || st.is<KEY_NOT_FOUND>() || st.is<KEY_ALREADY_EXISTS>(); | 
| 705 |  |             // It's a defensive DCHECK, we need to exclude some common errors to avoid core-dump | 
| 706 |  |             // while stress test | 
| 707 | 2 |             DCHECK(expected_st || st.is<MEM_LIMIT_EXCEEDED>()) | 
| 708 | 0 |                     << "unexpected error status while lookup_row_key:" << st; | 
| 709 | 2 |             if (!expected_st) { | 
| 710 | 0 |                 return st; | 
| 711 | 0 |             } | 
| 712 | 2 |             if (st.is<KEY_NOT_FOUND>()) { | 
| 713 | 0 |                 continue; | 
| 714 | 0 |             } | 
| 715 |  |  | 
| 716 | 2 |             ++conflict_rows; | 
| 717 | 2 |             if (st.is<KEY_ALREADY_EXISTS>() && | 
| 718 | 2 |                 (!is_partial_update || | 
| 719 | 1 |                  (partial_update_info->is_fixed_partial_update() && have_input_seq_column))) { | 
| 720 |  |                 // `st.is<KEY_ALREADY_EXISTS>()` means that there exists a row with the same key and larger value | 
| 721 |  |                 // in seqeunce column. | 
| 722 |  |                 // - If the current load is not a partial update, we just delete current row. | 
| 723 |  |                 // - Otherwise, it means that we are doing the alignment process in publish phase due to conflicts | 
| 724 |  |                 // during concurrent partial updates. And there exists another load which introduces a row with | 
| 725 |  |                 // the same keys and larger sequence column value published successfully after the commit phase | 
| 726 |  |                 // of the current load. | 
| 727 |  |                 //     - If the columns we update include sequence column, we should delete the current row becase the | 
| 728 |  |                 //       partial update on the current row has been `overwritten` by the previous one with larger sequence | 
| 729 |  |                 //       column value. | 
| 730 |  |                 //     - Otherwise, we should combine the values of the missing columns in the previous row and the values | 
| 731 |  |                 //       of the including columns in the current row into a new row. | 
| 732 | 1 |                 delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}, | 
| 733 | 1 |                                    row_id); | 
| 734 | 1 |                 continue; | 
| 735 |  |                 // NOTE: for partial update which doesn't specify the sequence column, we can't use the sequence column value filled in flush phase | 
| 736 |  |                 // as its final value. Otherwise it may cause inconsistency between replicas. | 
| 737 | 1 |             } | 
| 738 | 1 |             if (is_partial_update && rowset_writer != nullptr) { | 
| 739 |  |                 // In publish version, record rows to be deleted for concurrent update | 
| 740 |  |                 // For example, if version 5 and 6 update a row, but version 6 only see | 
| 741 |  |                 // version 4 when write, and when publish version, version 5's value will | 
| 742 |  |                 // be marked as deleted and it's update is losed. | 
| 743 |  |                 // So here we should read version 5's columns and build a new row, which is | 
| 744 |  |                 // consists of version 6's update columns and version 5's origin columns | 
| 745 |  |                 // here we build 2 read plan for ori values and update values | 
| 746 |  |  | 
| 747 |  |                 // - for fixed partial update, we should read update columns from current load's rowset | 
| 748 |  |                 // and read missing columns from previous rowsets to create the final block | 
| 749 |  |                 // - for flexible partial update, we should read all columns from current load's rowset | 
| 750 |  |                 // and read non sort key columns from previous rowsets to create the final block | 
| 751 |  |                 // So we only need to record rows to read for both mode partial update | 
| 752 | 0 |                 read_plan_ori.prepare_to_read(loc, pos); | 
| 753 | 0 |                 read_plan_update.prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos); | 
| 754 |  |  | 
| 755 |  |                 // For flexible partial update, we should use skip bitmap to determine wheather | 
| 756 |  |                 // a row has specified the sequence column. But skip bitmap should be read from the segment. | 
| 757 |  |                 // So we record these row ids and process and filter them in `generate_new_block_for_flexible_partial_update()` | 
| 758 | 0 |                 if (st.is<KEY_ALREADY_EXISTS>() && | 
| 759 | 0 |                     partial_update_info->is_flexible_partial_update()) { | 
| 760 | 0 |                     rids_be_overwritten.insert(pos); | 
| 761 | 0 |                 } | 
| 762 |  | 
 | 
| 763 | 0 |                 rsid_to_rowset[rowset_find->rowset_id()] = rowset_find; | 
| 764 | 0 |                 ++pos; | 
| 765 |  |  | 
| 766 |  |                 // delete bitmap will be calculate when memtable flush and | 
| 767 |  |                 // publish. The two stages may see different versions. | 
| 768 |  |                 // When there is sequence column, the currently imported data | 
| 769 |  |                 // of rowset may be marked for deletion at memtablet flush or | 
| 770 |  |                 // publish because the seq column is smaller than the previous | 
| 771 |  |                 // rowset. | 
| 772 |  |                 // just set 0 as a unified temporary version number, and update to | 
| 773 |  |                 // the real version number later. | 
| 774 | 0 |                 delete_bitmap->add( | 
| 775 | 0 |                         {loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, | 
| 776 | 0 |                         loc.row_id); | 
| 777 | 0 |                 delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}, | 
| 778 | 0 |                                    row_id); | 
| 779 | 0 |                 ++new_generated_rows; | 
| 780 | 0 |                 continue; | 
| 781 | 0 |             } | 
| 782 |  |             // when st = ok | 
| 783 | 1 |             delete_bitmap->add({loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, | 
| 784 | 1 |                                loc.row_id); | 
| 785 | 1 |         } | 
| 786 | 1 |         remaining -= num_read; | 
| 787 | 1 |     } | 
| 788 |  |     // DCHECK_EQ(total, row_id) << "segment total rows: " << total << " row_id:" << row_id; | 
| 789 |  |  | 
| 790 | 1 |     if (config::enable_merge_on_write_correctness_check) { | 
| 791 | 1 |         RowsetIdUnorderedSet rowsetids; | 
| 792 | 1 |         for (const auto& specified_rowset : specified_rowsets) { | 
| 793 | 1 |             rowsetids.emplace(specified_rowset->rowset_id()); | 
| 794 | 1 |             VLOG_NOTICE << "[tabletID:" << tablet_id() << "]" | 
| 795 | 0 |                         << "[add_sentinel_mark_to_delete_bitmap][end_version:" << end_version << "]" | 
| 796 | 0 |                         << "add:" << specified_rowset->rowset_id(); | 
| 797 | 1 |         } | 
| 798 | 1 |         add_sentinel_mark_to_delete_bitmap(delete_bitmap.get(), rowsetids); | 
| 799 | 1 |     } | 
| 800 |  |  | 
| 801 | 1 |     if (pos > 0) { | 
| 802 | 0 |         DCHECK(partial_update_info); | 
| 803 | 0 |         if (partial_update_info->is_fixed_partial_update()) { | 
| 804 | 0 |             RETURN_IF_ERROR(generate_new_block_for_partial_update( | 
| 805 | 0 |                     rowset_schema, partial_update_info, read_plan_ori, read_plan_update, | 
| 806 | 0 |                     rsid_to_rowset, &block)); | 
| 807 | 0 |         } else { | 
| 808 | 0 |             RETURN_IF_ERROR(generate_new_block_for_flexible_partial_update( | 
| 809 | 0 |                     rowset_schema, partial_update_info, rids_be_overwritten, read_plan_ori, | 
| 810 | 0 |                     read_plan_update, rsid_to_rowset, &block)); | 
| 811 | 0 |         } | 
| 812 | 0 |         RETURN_IF_ERROR(sort_block(block, ordered_block)); | 
| 813 | 0 |         RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block)); | 
| 814 | 0 |         auto cost_us = watch.get_elapse_time_us(); | 
| 815 | 0 |         if (config::enable_mow_verbose_log || cost_us > 10 * 1000) { | 
| 816 | 0 |             LOG(INFO) << "calc segment delete bitmap for " | 
| 817 | 0 |                       << partial_update_info->partial_update_mode_str() | 
| 818 | 0 |                       << ", tablet: " << tablet_id() << " rowset: " << rowset_id | 
| 819 | 0 |                       << " seg_id: " << seg->id() << " dummy_version: " << end_version + 1 | 
| 820 | 0 |                       << " rows: " << seg->num_rows() << " conflict rows: " << conflict_rows | 
| 821 | 0 |                       << " new generated rows: " << new_generated_rows | 
| 822 | 0 |                       << " bitmap num: " << delete_bitmap->get_delete_bitmap_count() | 
| 823 | 0 |                       << " bitmap cardinality: " << delete_bitmap->cardinality() | 
| 824 | 0 |                       << " cost: " << cost_us << "(us)"; | 
| 825 | 0 |         } | 
| 826 | 0 |         return Status::OK(); | 
| 827 | 0 |     } | 
| 828 | 1 |     auto cost_us = watch.get_elapse_time_us(); | 
| 829 | 1 |     if (config::enable_mow_verbose_log || cost_us > 10 * 1000) { | 
| 830 | 0 |         LOG(INFO) << "calc segment delete bitmap, tablet: " << tablet_id() | 
| 831 | 0 |                   << " rowset: " << rowset_id << " seg_id: " << seg->id() | 
| 832 | 0 |                   << " dummy_version: " << end_version + 1 << " rows: " << seg->num_rows() | 
| 833 | 0 |                   << " conflict rows: " << conflict_rows | 
| 834 | 0 |                   << " bitmap num: " << delete_bitmap->get_delete_bitmap_count() | 
| 835 | 0 |                   << " bitmap cardinality: " << delete_bitmap->cardinality() << " cost: " << cost_us | 
| 836 | 0 |                   << "(us)"; | 
| 837 | 0 |     } | 
| 838 | 1 |     return Status::OK(); | 
| 839 | 1 | } | 
| 840 |  |  | 
| 841 | 0 | Status BaseTablet::sort_block(vectorized::Block& in_block, vectorized::Block& output_block) { | 
| 842 | 0 |     vectorized::MutableBlock mutable_input_block = | 
| 843 | 0 |             vectorized::MutableBlock::build_mutable_block(&in_block); | 
| 844 | 0 |     vectorized::MutableBlock mutable_output_block = | 
| 845 | 0 |             vectorized::MutableBlock::build_mutable_block(&output_block); | 
| 846 |  | 
 | 
| 847 | 0 |     std::shared_ptr<RowInBlockComparator> vec_row_comparator = | 
| 848 | 0 |             std::make_shared<RowInBlockComparator>(_tablet_meta->tablet_schema()); | 
| 849 | 0 |     vec_row_comparator->set_block(&mutable_input_block); | 
| 850 |  | 
 | 
| 851 | 0 |     std::vector<std::unique_ptr<RowInBlock>> row_in_blocks; | 
| 852 | 0 |     DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); | 
| 853 | 0 |     row_in_blocks.reserve(in_block.rows()); | 
| 854 | 0 |     for (size_t i = 0; i < in_block.rows(); ++i) { | 
| 855 | 0 |         row_in_blocks.emplace_back(std::make_unique<RowInBlock>(i)); | 
| 856 | 0 |     } | 
| 857 | 0 |     std::sort(row_in_blocks.begin(), row_in_blocks.end(), | 
| 858 | 0 |               [&](const std::unique_ptr<RowInBlock>& l, | 
| 859 | 0 |                   const std::unique_ptr<RowInBlock>& r) -> bool { | 
| 860 | 0 |                   auto value = (*vec_row_comparator)(l.get(), r.get()); | 
| 861 | 0 |                   DCHECK(value != 0) << "value equel when sort block, l_pos: " << l->_row_pos | 
| 862 | 0 |                                      << " r_pos: " << r->_row_pos; | 
| 863 | 0 |                   return value < 0; | 
| 864 | 0 |               }); | 
| 865 | 0 |     std::vector<uint32_t> row_pos_vec; | 
| 866 | 0 |     row_pos_vec.reserve(in_block.rows()); | 
| 867 | 0 |     for (auto& block : row_in_blocks) { | 
| 868 | 0 |         row_pos_vec.emplace_back(block->_row_pos); | 
| 869 | 0 |     } | 
| 870 | 0 |     return mutable_output_block.add_rows(&in_block, row_pos_vec.data(), | 
| 871 | 0 |                                          row_pos_vec.data() + in_block.rows()); | 
| 872 | 0 | } | 
| 873 |  |  | 
| 874 |  | // fetch value by row column | 
| 875 |  | Status BaseTablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, | 
| 876 |  |                                                   const TabletSchema& tablet_schema, uint32_t segid, | 
| 877 |  |                                                   const std::vector<uint32_t>& rowids, | 
| 878 |  |                                                   const std::vector<uint32_t>& cids, | 
| 879 | 0 |                                                   vectorized::Block& block) { | 
| 880 | 0 |     MonotonicStopWatch watch; | 
| 881 | 0 |     watch.start(); | 
| 882 | 0 |     Defer _defer([&]() { | 
| 883 | 0 |         LOG_EVERY_N(INFO, 500) << "fetch_value_by_rowids, cost(us):" << watch.elapsed_time() / 1000 | 
| 884 | 0 |                                << ", row_batch_size:" << rowids.size(); | 
| 885 | 0 |     }); | 
| 886 |  | 
 | 
| 887 | 0 |     BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(input_rowset); | 
| 888 | 0 |     CHECK(rowset); | 
| 889 | 0 |     CHECK(tablet_schema.has_row_store_for_all_columns()); | 
| 890 | 0 |     SegmentCacheHandle segment_cache_handle; | 
| 891 | 0 |     std::unique_ptr<segment_v2::ColumnIterator> column_iterator; | 
| 892 | 0 |     OlapReaderStatistics stats; | 
| 893 | 0 |     const auto& column = *DORIS_TRY(tablet_schema.column(BeConsts::ROW_STORE_COL)); | 
| 894 | 0 |     RETURN_IF_ERROR(_get_segment_column_iterator(rowset, segid, column, &segment_cache_handle, | 
| 895 | 0 |                                                  &column_iterator, &stats)); | 
| 896 |  |     // get and parse tuple row | 
| 897 | 0 |     vectorized::MutableColumnPtr column_ptr = vectorized::ColumnString::create(); | 
| 898 | 0 |     RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), rowids.size(), column_ptr)); | 
| 899 | 0 |     assert(column_ptr->size() == rowids.size()); | 
| 900 | 0 |     auto* string_column = static_cast<vectorized::ColumnString*>(column_ptr.get()); | 
| 901 | 0 |     vectorized::DataTypeSerDeSPtrs serdes; | 
| 902 | 0 |     serdes.resize(cids.size()); | 
| 903 | 0 |     std::unordered_map<uint32_t, uint32_t> col_uid_to_idx; | 
| 904 | 0 |     std::vector<std::string> default_values; | 
| 905 | 0 |     default_values.resize(cids.size()); | 
| 906 | 0 |     for (int i = 0; i < cids.size(); ++i) { | 
| 907 | 0 |         const TabletColumn& tablet_column = tablet_schema.column(cids[i]); | 
| 908 | 0 |         vectorized::DataTypePtr type = | 
| 909 | 0 |                 vectorized::DataTypeFactory::instance().create_data_type(tablet_column); | 
| 910 | 0 |         col_uid_to_idx[tablet_column.unique_id()] = i; | 
| 911 | 0 |         default_values[i] = tablet_column.default_value(); | 
| 912 | 0 |         serdes[i] = type->get_serde(); | 
| 913 | 0 |     } | 
| 914 | 0 |     RETURN_IF_ERROR(vectorized::JsonbSerializeUtil::jsonb_to_block( | 
| 915 | 0 |             serdes, *string_column, col_uid_to_idx, block, default_values, {})); | 
| 916 | 0 |     return Status::OK(); | 
| 917 | 0 | } | 
| 918 |  |  | 
| 919 |  | Status BaseTablet::fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t segid, | 
| 920 |  |                                          const std::vector<uint32_t>& rowids, | 
| 921 |  |                                          const TabletColumn& tablet_column, | 
| 922 | 0 |                                          vectorized::MutableColumnPtr& dst) { | 
| 923 | 0 |     MonotonicStopWatch watch; | 
| 924 | 0 |     watch.start(); | 
| 925 | 0 |     Defer _defer([&]() { | 
| 926 | 0 |         LOG_EVERY_N(INFO, 500) << "fetch_value_by_rowids, cost(us):" << watch.elapsed_time() / 1000 | 
| 927 | 0 |                                << ", row_batch_size:" << rowids.size(); | 
| 928 | 0 |     }); | 
| 929 |  |  | 
| 930 |  |     // read row data | 
| 931 | 0 |     BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(input_rowset); | 
| 932 | 0 |     CHECK(rowset); | 
| 933 | 0 |     SegmentCacheHandle segment_cache_handle; | 
| 934 | 0 |     std::unique_ptr<segment_v2::ColumnIterator> column_iterator; | 
| 935 | 0 |     OlapReaderStatistics stats; | 
| 936 | 0 |     RETURN_IF_ERROR(_get_segment_column_iterator(rowset, segid, tablet_column, | 
| 937 | 0 |                                                  &segment_cache_handle, &column_iterator, &stats)); | 
| 938 | 0 |     RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), rowids.size(), dst)); | 
| 939 | 0 |     return Status::OK(); | 
| 940 | 0 | } | 
| 941 |  |  | 
| 942 |  | const signed char* BaseTablet::get_delete_sign_column_data(const vectorized::Block& block, | 
| 943 | 0 |                                                            size_t rows_at_least) { | 
| 944 | 0 |     if (const vectorized::ColumnWithTypeAndName* delete_sign_column = | 
| 945 | 0 |                 block.try_get_by_name(DELETE_SIGN); | 
| 946 | 0 |         delete_sign_column != nullptr) { | 
| 947 | 0 |         const auto& delete_sign_col = | 
| 948 | 0 |                 reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column->column)); | 
| 949 | 0 |         if (delete_sign_col.size() >= rows_at_least) { | 
| 950 | 0 |             return delete_sign_col.get_data().data(); | 
| 951 | 0 |         } | 
| 952 | 0 |     } | 
| 953 | 0 |     return nullptr; | 
| 954 | 0 | }; | 
| 955 |  |  | 
| 956 |  | Status BaseTablet::generate_default_value_block(const TabletSchema& schema, | 
| 957 |  |                                                 const std::vector<uint32_t>& cids, | 
| 958 |  |                                                 const std::vector<std::string>& default_values, | 
| 959 |  |                                                 const vectorized::Block& ref_block, | 
| 960 | 0 |                                                 vectorized::Block& default_value_block) { | 
| 961 | 0 |     auto mutable_default_value_columns = default_value_block.mutate_columns(); | 
| 962 | 0 |     for (auto i = 0; i < cids.size(); ++i) { | 
| 963 | 0 |         const auto& column = schema.column(cids[i]); | 
| 964 | 0 |         if (column.has_default_value()) { | 
| 965 | 0 |             const auto& default_value = default_values[i]; | 
| 966 | 0 |             StringRef str(default_value); | 
| 967 | 0 |             RETURN_IF_ERROR(ref_block.get_by_position(i).type->get_serde()->default_from_string( | 
| 968 | 0 |                     str, *mutable_default_value_columns[i])); | 
| 969 | 0 |         } | 
| 970 | 0 |     } | 
| 971 | 0 |     default_value_block.set_columns(std::move(mutable_default_value_columns)); | 
| 972 | 0 |     return Status::OK(); | 
| 973 | 0 | } | 
| 974 |  |  | 
| 975 |  | Status BaseTablet::generate_new_block_for_partial_update( | 
| 976 |  |         TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, | 
| 977 |  |         const FixedReadPlan& read_plan_ori, const FixedReadPlan& read_plan_update, | 
| 978 |  |         const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, | 
| 979 | 0 |         vectorized::Block* output_block) { | 
| 980 |  |     // do partial update related works | 
| 981 |  |     // 1. read columns by read plan | 
| 982 |  |     // 2. generate new block | 
| 983 |  |     // 3. write a new segment and modify rowset meta | 
| 984 |  |     // 4. mark current keys deleted | 
| 985 | 0 |     CHECK(output_block); | 
| 986 | 0 |     auto full_mutable_columns = output_block->mutate_columns(); | 
| 987 | 0 |     const auto& missing_cids = partial_update_info->missing_cids; | 
| 988 | 0 |     const auto& update_cids = partial_update_info->update_cids; | 
| 989 | 0 |     auto old_block = rowset_schema->create_block_by_cids(missing_cids); | 
| 990 | 0 |     auto update_block = rowset_schema->create_block_by_cids(update_cids); | 
| 991 |  | 
 | 
| 992 | 0 |     bool have_input_seq_column = false; | 
| 993 | 0 |     if (rowset_schema->has_sequence_col()) { | 
| 994 | 0 |         have_input_seq_column = | 
| 995 | 0 |                 (std::find(update_cids.cbegin(), update_cids.cend(), | 
| 996 | 0 |                            rowset_schema->sequence_col_idx()) != update_cids.cend()); | 
| 997 | 0 |     } | 
| 998 |  |  | 
| 999 |  |     // rowid in the final block(start from 0, increase continuously) -> rowid to read in update_block | 
| 1000 | 0 |     std::map<uint32_t, uint32_t> read_index_update; | 
| 1001 |  |  | 
| 1002 |  |     // read current rowset first, if a row in the current rowset has delete sign mark | 
| 1003 |  |     // we don't need to read values from old block | 
| 1004 | 0 |     RETURN_IF_ERROR(read_plan_update.read_columns_by_plan( | 
| 1005 | 0 |             *rowset_schema, update_cids, rsid_to_rowset, update_block, &read_index_update, false)); | 
| 1006 | 0 |     size_t update_rows = read_index_update.size(); | 
| 1007 | 0 |     for (auto i = 0; i < update_cids.size(); ++i) { | 
| 1008 | 0 |         for (auto idx = 0; idx < update_rows; ++idx) { | 
| 1009 | 0 |             full_mutable_columns[update_cids[i]]->insert_from( | 
| 1010 | 0 |                     *update_block.get_by_position(i).column, read_index_update[idx]); | 
| 1011 | 0 |         } | 
| 1012 | 0 |     } | 
| 1013 |  |  | 
| 1014 |  |     // if there is sequence column in the table, we need to read the sequence column, | 
| 1015 |  |     // otherwise it may cause the merge-on-read based compaction policy to produce incorrect results | 
| 1016 | 0 |     const auto* __restrict new_block_delete_signs = | 
| 1017 | 0 |             rowset_schema->has_sequence_col() | 
| 1018 | 0 |                     ? nullptr | 
| 1019 | 0 |                     : get_delete_sign_column_data(update_block, update_rows); | 
| 1020 |  |  | 
| 1021 |  |     // rowid in the final block(start from 0, increase, may not continuous becasue we skip to read some rows) -> rowid to read in old_block | 
| 1022 | 0 |     std::map<uint32_t, uint32_t> read_index_old; | 
| 1023 | 0 |     RETURN_IF_ERROR(read_plan_ori.read_columns_by_plan(*rowset_schema, missing_cids, rsid_to_rowset, | 
| 1024 | 0 |                                                        old_block, &read_index_old, true, | 
| 1025 | 0 |                                                        new_block_delete_signs)); | 
| 1026 | 0 |     size_t old_rows = read_index_old.size(); | 
| 1027 | 0 |     const auto* __restrict old_block_delete_signs = | 
| 1028 | 0 |             get_delete_sign_column_data(old_block, old_rows); | 
| 1029 | 0 |     DCHECK(old_block_delete_signs != nullptr); | 
| 1030 |  |     // build default value block | 
| 1031 | 0 |     auto default_value_block = old_block.clone_empty(); | 
| 1032 | 0 |     RETURN_IF_ERROR(BaseTablet::generate_default_value_block(*rowset_schema, missing_cids, | 
| 1033 | 0 |                                                              partial_update_info->default_values, | 
| 1034 | 0 |                                                              old_block, default_value_block)); | 
| 1035 |  |  | 
| 1036 | 0 |     CHECK(update_rows >= old_rows); | 
| 1037 |  |  | 
| 1038 |  |     // build full block | 
| 1039 | 0 |     for (auto i = 0; i < missing_cids.size(); ++i) { | 
| 1040 | 0 |         const auto& rs_column = rowset_schema->column(missing_cids[i]); | 
| 1041 | 0 |         auto& mutable_column = full_mutable_columns[missing_cids[i]]; | 
| 1042 | 0 |         for (auto idx = 0; idx < update_rows; ++idx) { | 
| 1043 |  |             // There are two cases we don't need to read values from old data: | 
| 1044 |  |             //     1. if the conflicting new row's delete sign is marked, which means the value columns | 
| 1045 |  |             //     of the row will not be read. So we don't need to read the missing values from the previous rows. | 
| 1046 |  |             //     2. if the conflicting old row's delete sign is marked, which means that the key is not exist now, | 
| 1047 |  |             //     we should not read old values from the deleted data, and should use default value instead. | 
| 1048 |  |             //     NOTE: since now we are in the publishing phase, all data is commited | 
| 1049 |  |             //         before, even the `strict_mode` is true (which requires partial update | 
| 1050 |  |             //         load job can't insert new keys), this "new" key MUST be written into | 
| 1051 |  |             //         the new generated segment file. | 
| 1052 | 0 |             bool new_row_delete_sign = | 
| 1053 | 0 |                     (new_block_delete_signs != nullptr && new_block_delete_signs[idx]); | 
| 1054 | 0 |             if (new_row_delete_sign) { | 
| 1055 | 0 |                 mutable_column->insert_default(); | 
| 1056 | 0 |             } else { | 
| 1057 | 0 |                 bool use_default = false; | 
| 1058 | 0 |                 bool old_row_delete_sign = (old_block_delete_signs != nullptr && | 
| 1059 | 0 |                                             old_block_delete_signs[read_index_old.at(idx)] != 0); | 
| 1060 | 0 |                 if (old_row_delete_sign) { | 
| 1061 | 0 |                     if (!rowset_schema->has_sequence_col()) { | 
| 1062 | 0 |                         use_default = true; | 
| 1063 | 0 |                     } else if (have_input_seq_column || !rs_column.is_seqeunce_col()) { | 
| 1064 |  |                         // to keep the sequence column value not decreasing, we should read values of seq column | 
| 1065 |  |                         // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise | 
| 1066 |  |                         // it may cause the merge-on-read based compaction to produce incorrect results | 
| 1067 | 0 |                         use_default = true; | 
| 1068 | 0 |                     } | 
| 1069 | 0 |                 } | 
| 1070 |  | 
 | 
| 1071 | 0 |                 if (use_default) { | 
| 1072 | 0 |                     if (rs_column.has_default_value()) { | 
| 1073 | 0 |                         mutable_column->insert_from(*default_value_block.get_by_position(i).column, | 
| 1074 | 0 |                                                     0); | 
| 1075 | 0 |                     } else if (rs_column.is_nullable()) { | 
| 1076 | 0 |                         assert_cast<vectorized::ColumnNullable*, TypeCheckOnRelease::DISABLE>( | 
| 1077 | 0 |                                 mutable_column.get()) | 
| 1078 | 0 |                                 ->insert_default(); | 
| 1079 | 0 |                     } else { | 
| 1080 | 0 |                         mutable_column->insert(rs_column.get_vec_type()->get_default()); | 
| 1081 | 0 |                     } | 
| 1082 | 0 |                 } else { | 
| 1083 | 0 |                     mutable_column->insert_from(*old_block.get_by_position(i).column, | 
| 1084 | 0 |                                                 read_index_old[idx]); | 
| 1085 | 0 |                 } | 
| 1086 | 0 |             } | 
| 1087 | 0 |         } | 
| 1088 | 0 |     } | 
| 1089 | 0 |     output_block->set_columns(std::move(full_mutable_columns)); | 
| 1090 | 0 |     VLOG_DEBUG << "full block when publish: " << output_block->dump_data(); | 
| 1091 | 0 |     return Status::OK(); | 
| 1092 | 0 | } | 
| 1093 |  |  | 
| 1094 |  | Status BaseTablet::generate_new_block_for_flexible_partial_update( | 
| 1095 |  |         TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, | 
| 1096 |  |         std::set<uint32_t>& rids_be_overwritten, const FixedReadPlan& read_plan_ori, | 
| 1097 |  |         const FixedReadPlan& read_plan_update, | 
| 1098 |  |         const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, | 
| 1099 | 0 |         vectorized::Block* output_block) { | 
| 1100 | 0 |     CHECK(output_block); | 
| 1101 |  | 
 | 
| 1102 | 0 |     int32_t seq_col_unique_id = -1; | 
| 1103 | 0 |     if (rowset_schema->has_sequence_col()) { | 
| 1104 | 0 |         seq_col_unique_id = rowset_schema->column(rowset_schema->sequence_col_idx()).unique_id(); | 
| 1105 | 0 |     } | 
| 1106 | 0 |     const auto& non_sort_key_cids = partial_update_info->missing_cids; | 
| 1107 | 0 |     std::vector<uint32_t> all_cids(rowset_schema->num_columns()); | 
| 1108 | 0 |     std::iota(all_cids.begin(), all_cids.end(), 0); | 
| 1109 | 0 |     auto old_block = rowset_schema->create_block_by_cids(non_sort_key_cids); | 
| 1110 | 0 |     auto update_block = rowset_schema->create_block_by_cids(all_cids); | 
| 1111 |  |  | 
| 1112 |  |     // rowid in the final block(start from 0, increase continuously) -> rowid to read in update_block | 
| 1113 | 0 |     std::map<uint32_t, uint32_t> read_index_update; | 
| 1114 |  |  | 
| 1115 |  |     // 1. read the current rowset first, if a row in the current rowset has delete sign mark | 
| 1116 |  |     // we don't need to read values from old block for that row | 
| 1117 | 0 |     RETURN_IF_ERROR(read_plan_update.read_columns_by_plan(*rowset_schema, all_cids, rsid_to_rowset, | 
| 1118 | 0 |                                                           update_block, &read_index_update, true)); | 
| 1119 | 0 |     size_t update_rows = read_index_update.size(); | 
| 1120 |  |  | 
| 1121 |  |     // TODO(bobhan1): add the delete sign optimazation here | 
| 1122 |  |     // // if there is sequence column in the table, we need to read the sequence column, | 
| 1123 |  |     // // otherwise it may cause the merge-on-read based compaction policy to produce incorrect results | 
| 1124 |  |     // const auto* __restrict new_block_delete_signs = | 
| 1125 |  |     //         rowset_schema->has_sequence_col() | 
| 1126 |  |     //                 ? nullptr | 
| 1127 |  |     //                 : get_delete_sign_column_data(update_block, update_rows); | 
| 1128 |  |  | 
| 1129 |  |     // 2. read previous rowsets | 
| 1130 |  |     // rowid in the final block(start from 0, increase, may not continuous becasue we skip to read some rows) -> rowid to read in old_block | 
| 1131 | 0 |     std::map<uint32_t, uint32_t> read_index_old; | 
| 1132 | 0 |     RETURN_IF_ERROR(read_plan_ori.read_columns_by_plan( | 
| 1133 | 0 |             *rowset_schema, non_sort_key_cids, rsid_to_rowset, old_block, &read_index_old, true)); | 
| 1134 | 0 |     size_t old_rows = read_index_old.size(); | 
| 1135 | 0 |     DCHECK(update_rows == old_rows); | 
| 1136 | 0 |     const auto* __restrict old_block_delete_signs = | 
| 1137 | 0 |             get_delete_sign_column_data(old_block, old_rows); | 
| 1138 | 0 |     DCHECK(old_block_delete_signs != nullptr); | 
| 1139 |  |  | 
| 1140 |  |     // 3. build default value block | 
| 1141 | 0 |     auto default_value_block = old_block.clone_empty(); | 
| 1142 | 0 |     RETURN_IF_ERROR(BaseTablet::generate_default_value_block(*rowset_schema, non_sort_key_cids, | 
| 1143 | 0 |                                                              partial_update_info->default_values, | 
| 1144 | 0 |                                                              old_block, default_value_block)); | 
| 1145 |  |  | 
| 1146 |  |     // 4. build the final block | 
| 1147 | 0 |     auto full_mutable_columns = output_block->mutate_columns(); | 
| 1148 | 0 |     DCHECK(rowset_schema->has_skip_bitmap_col()); | 
| 1149 | 0 |     auto skip_bitmap_col_idx = rowset_schema->skip_bitmap_col_idx(); | 
| 1150 | 0 |     const std::vector<BitmapValue>* skip_bitmaps = | 
| 1151 | 0 |             &(assert_cast<const vectorized::ColumnBitmap*, TypeCheckOnRelease::DISABLE>( | 
| 1152 | 0 |                       update_block.get_by_position(skip_bitmap_col_idx).column->get_ptr().get()) | 
| 1153 | 0 |                       ->get_data()); | 
| 1154 |  | 
 | 
| 1155 | 0 |     if (rowset_schema->has_sequence_col() && !rids_be_overwritten.empty()) { | 
| 1156 |  |         // If the row specifies the sequence column, we should delete the current row becase the | 
| 1157 |  |         // flexible partial update on the current row has been `overwritten` by the previous one with larger sequence | 
| 1158 |  |         // column value. | 
| 1159 | 0 |         for (auto it = rids_be_overwritten.begin(); it != rids_be_overwritten.end();) { | 
| 1160 | 0 |             auto rid = *it; | 
| 1161 | 0 |             if (!skip_bitmaps->at(rid).contains(seq_col_unique_id)) { | 
| 1162 | 0 |                 ++it; | 
| 1163 | 0 |             } else { | 
| 1164 | 0 |                 it = rids_be_overwritten.erase(it); | 
| 1165 | 0 |             } | 
| 1166 | 0 |         } | 
| 1167 | 0 |     } | 
| 1168 |  | 
 | 
| 1169 | 0 |     auto fill_one_cell = [&read_index_old, &read_index_update, &rowset_schema, partial_update_info]( | 
| 1170 | 0 |                                  const TabletColumn& tablet_column, std::size_t idx, | 
| 1171 | 0 |                                  vectorized::MutableColumnPtr& new_col, | 
| 1172 | 0 |                                  const vectorized::IColumn& default_value_col, | 
| 1173 | 0 |                                  const vectorized::IColumn& old_value_col, | 
| 1174 | 0 |                                  const vectorized::IColumn& cur_col, bool skipped, | 
| 1175 | 0 |                                  bool row_has_sequence_col, | 
| 1176 | 0 |                                  const signed char* delete_sign_column_data) { | 
| 1177 | 0 |         if (skipped) { | 
| 1178 | 0 |             bool use_default = false; | 
| 1179 | 0 |             bool old_row_delete_sign = | 
| 1180 | 0 |                     (delete_sign_column_data != nullptr && | 
| 1181 | 0 |                      delete_sign_column_data[read_index_old[cast_set<uint32_t>(idx)]] != 0); | 
| 1182 | 0 |             if (old_row_delete_sign) { | 
| 1183 | 0 |                 if (!rowset_schema->has_sequence_col()) { | 
| 1184 | 0 |                     use_default = true; | 
| 1185 | 0 |                 } else if (row_has_sequence_col || | 
| 1186 | 0 |                            (!tablet_column.is_seqeunce_col() && | 
| 1187 | 0 |                             (tablet_column.unique_id() != | 
| 1188 | 0 |                              partial_update_info->sequence_map_col_uid()))) { | 
| 1189 |  |                     // to keep the sequence column value not decreasing, we should read values of seq column(and seq map column) | 
| 1190 |  |                     // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise | 
| 1191 |  |                     // it may cause the merge-on-read based compaction to produce incorrect results | 
| 1192 | 0 |                     use_default = true; | 
| 1193 | 0 |                 } | 
| 1194 | 0 |             } | 
| 1195 | 0 |             if (use_default) { | 
| 1196 | 0 |                 if (tablet_column.has_default_value()) { | 
| 1197 | 0 |                     new_col->insert_from(default_value_col, 0); | 
| 1198 | 0 |                 } else if (tablet_column.is_nullable()) { | 
| 1199 | 0 |                     assert_cast<vectorized::ColumnNullable*, TypeCheckOnRelease::DISABLE>( | 
| 1200 | 0 |                             new_col.get()) | 
| 1201 | 0 |                             ->insert_many_defaults(1); | 
| 1202 | 0 |                 } else if (tablet_column.is_auto_increment()) { | 
| 1203 |  |                     // For auto-increment column, its default value(generated value) is filled in current block in flush phase | 
| 1204 |  |                     // when the load doesn't specify the auto-increment column | 
| 1205 |  |                     //     - if the previous conflicting row is deleted, we should use the value in current block as its final value | 
| 1206 |  |                     //     - if the previous conflicting row is an insert, we should use the value in old block as its final value to | 
| 1207 |  |                     //       keep consistency between replicas | 
| 1208 | 0 |                     new_col->insert_from(cur_col, read_index_update[cast_set<uint32_t>(idx)]); | 
| 1209 | 0 |                 } else { | 
| 1210 | 0 |                     new_col->insert(tablet_column.get_vec_type()->get_default()); | 
| 1211 | 0 |                 } | 
| 1212 | 0 |             } else { | 
| 1213 | 0 |                 new_col->insert_from(old_value_col, read_index_old[cast_set<uint32_t>(idx)]); | 
| 1214 | 0 |             } | 
| 1215 | 0 |         } else { | 
| 1216 | 0 |             new_col->insert_from(cur_col, read_index_update[cast_set<uint32_t>(idx)]); | 
| 1217 | 0 |         } | 
| 1218 | 0 |     }; | 
| 1219 |  | 
 | 
| 1220 | 0 |     for (std::size_t cid {0}; cid < rowset_schema->num_columns(); cid++) { | 
| 1221 | 0 |         vectorized::MutableColumnPtr& new_col = full_mutable_columns[cid]; | 
| 1222 | 0 |         const vectorized::IColumn& cur_col = *update_block.get_by_position(cid).column; | 
| 1223 | 0 |         const auto& rs_column = rowset_schema->column(cid); | 
| 1224 | 0 |         auto col_uid = rs_column.unique_id(); | 
| 1225 | 0 |         for (auto idx = 0; idx < update_rows; ++idx) { | 
| 1226 | 0 |             if (cid < rowset_schema->num_key_columns()) { | 
| 1227 | 0 |                 new_col->insert_from(cur_col, read_index_update[idx]); | 
| 1228 | 0 |             } else { | 
| 1229 | 0 |                 const vectorized::IColumn& default_value_col = | 
| 1230 | 0 |                         *default_value_block.get_by_position(cid - rowset_schema->num_key_columns()) | 
| 1231 | 0 |                                  .column; | 
| 1232 | 0 |                 const vectorized::IColumn& old_value_col = | 
| 1233 | 0 |                         *old_block.get_by_position(cid - rowset_schema->num_key_columns()).column; | 
| 1234 | 0 |                 if (rids_be_overwritten.contains(idx)) { | 
| 1235 | 0 |                     new_col->insert_from(old_value_col, read_index_old[idx]); | 
| 1236 | 0 |                 } else { | 
| 1237 | 0 |                     fill_one_cell(rs_column, idx, new_col, default_value_col, old_value_col, | 
| 1238 | 0 |                                   cur_col, skip_bitmaps->at(idx).contains(col_uid), | 
| 1239 | 0 |                                   rowset_schema->has_sequence_col() | 
| 1240 | 0 |                                           ? !skip_bitmaps->at(idx).contains(seq_col_unique_id) | 
| 1241 | 0 |                                           : false, | 
| 1242 | 0 |                                   old_block_delete_signs); | 
| 1243 | 0 |                 } | 
| 1244 | 0 |             } | 
| 1245 | 0 |         } | 
| 1246 | 0 |         DCHECK_EQ(full_mutable_columns[cid]->size(), update_rows); | 
| 1247 | 0 |     } | 
| 1248 |  | 
 | 
| 1249 | 0 |     output_block->set_columns(std::move(full_mutable_columns)); | 
| 1250 | 0 |     VLOG_DEBUG << "full block when publish: " << output_block->dump_data(); | 
| 1251 | 0 |     return Status::OK(); | 
| 1252 | 0 | } | 
| 1253 |  |  | 
| 1254 |  | Status BaseTablet::commit_phase_update_delete_bitmap( | 
| 1255 |  |         const BaseTabletSPtr& tablet, const RowsetSharedPtr& rowset, | 
| 1256 |  |         RowsetIdUnorderedSet& pre_rowset_ids, DeleteBitmapPtr delete_bitmap, | 
| 1257 |  |         const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t txn_id, | 
| 1258 | 3 |         CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer) { | 
| 1259 | 3 |     DBUG_EXECUTE_IF("BaseTablet::commit_phase_update_delete_bitmap.enable_spin_wait", { | 
| 1260 | 3 |         auto tok = dp->param<std::string>("token", "invalid_token"); | 
| 1261 | 3 |         while (DebugPoints::instance()->is_enable( | 
| 1262 | 3 |                 "BaseTablet::commit_phase_update_delete_bitmap.block")) { | 
| 1263 | 3 |             auto block_dp = DebugPoints::instance()->get_debug_point( | 
| 1264 | 3 |                     "BaseTablet::commit_phase_update_delete_bitmap.block"); | 
| 1265 | 3 |             if (block_dp) { | 
| 1266 | 3 |                 auto pass_token = block_dp->param<std::string>("pass_token", ""); | 
| 1267 | 3 |                 if (pass_token == tok) { | 
| 1268 | 3 |                     break; | 
| 1269 | 3 |                 } | 
| 1270 | 3 |             } | 
| 1271 | 3 |             std::this_thread::sleep_for(std::chrono::milliseconds(50)); | 
| 1272 | 3 |         } | 
| 1273 | 3 |     }); | 
| 1274 | 3 |     SCOPED_BVAR_LATENCY(g_tablet_commit_phase_update_delete_bitmap_latency); | 
| 1275 | 3 |     RowsetIdUnorderedSet cur_rowset_ids; | 
| 1276 | 3 |     RowsetIdUnorderedSet rowset_ids_to_add; | 
| 1277 | 3 |     RowsetIdUnorderedSet rowset_ids_to_del; | 
| 1278 | 3 |     int64_t cur_version; | 
| 1279 |  |  | 
| 1280 | 3 |     std::vector<RowsetSharedPtr> specified_rowsets; | 
| 1281 | 3 |     { | 
| 1282 |  |         // to prevent seeing intermediate state of a tablet | 
| 1283 | 3 |         std::unique_lock<bthread::Mutex> sync_lock; | 
| 1284 | 3 |         if (config::is_cloud_mode()) { | 
| 1285 | 0 |             sync_lock = std::unique_lock<bthread::Mutex>( | 
| 1286 | 0 |                     std::static_pointer_cast<CloudTablet>(tablet)->get_sync_meta_lock()); | 
| 1287 | 0 |         } | 
| 1288 | 3 |         std::shared_lock meta_rlock(tablet->_meta_lock); | 
| 1289 | 3 |         if (tablet->tablet_state() == TABLET_NOTREADY) { | 
| 1290 |  |             // tablet is under alter process. The delete bitmap will be calculated after conversion. | 
| 1291 | 0 |             LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " | 
| 1292 | 0 |                          "tablet_id: " | 
| 1293 | 0 |                       << tablet->tablet_id() << " txn_id: " << txn_id; | 
| 1294 | 0 |             return Status::OK(); | 
| 1295 | 0 |         } | 
| 1296 | 3 |         cur_version = tablet->max_version_unlocked(); | 
| 1297 | 3 |         RETURN_IF_ERROR(tablet->get_all_rs_id_unlocked(cur_version, &cur_rowset_ids)); | 
| 1298 | 3 |         _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, | 
| 1299 | 3 |                                &rowset_ids_to_del); | 
| 1300 | 3 |         specified_rowsets = tablet->get_rowset_by_ids(&rowset_ids_to_add); | 
| 1301 | 3 |     } | 
| 1302 | 0 |     for (const auto& to_del : rowset_ids_to_del) { | 
| 1303 | 0 |         delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX}); | 
| 1304 | 0 |     } | 
| 1305 |  |  | 
| 1306 | 3 |     RETURN_IF_ERROR(calc_delete_bitmap(tablet, rowset, segments, specified_rowsets, delete_bitmap, | 
| 1307 | 3 |                                        cur_version, token, rowset_writer)); | 
| 1308 | 3 |     size_t total_rows = std::accumulate( | 
| 1309 | 3 |             segments.begin(), segments.end(), 0, | 
| 1310 | 3 |             [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); | 
| 1311 | 3 |     LOG(INFO) << "[Before Commit] construct delete bitmap tablet: " << tablet->tablet_id() | 
| 1312 | 3 |               << ", rowset_ids to add: " << rowset_ids_to_add.size() | 
| 1313 | 3 |               << ", rowset_ids to del: " << rowset_ids_to_del.size() | 
| 1314 | 3 |               << ", cur max_version: " << cur_version << ", transaction_id: " << txn_id | 
| 1315 | 3 |               << ", total rows: " << total_rows; | 
| 1316 | 3 |     pre_rowset_ids = cur_rowset_ids; | 
| 1317 | 3 |     return Status::OK(); | 
| 1318 | 3 | } | 
| 1319 |  |  | 
| 1320 |  | void BaseTablet::add_sentinel_mark_to_delete_bitmap(DeleteBitmap* delete_bitmap, | 
| 1321 | 5 |                                                     const RowsetIdUnorderedSet& rowsetids) { | 
| 1322 | 5 |     for (const auto& rowsetid : rowsetids) { | 
| 1323 | 5 |         delete_bitmap->add( | 
| 1324 | 5 |                 {rowsetid, DeleteBitmap::INVALID_SEGMENT_ID, DeleteBitmap::TEMP_VERSION_COMMON}, | 
| 1325 | 5 |                 DeleteBitmap::ROWSET_SENTINEL_MARK); | 
| 1326 | 5 |     } | 
| 1327 | 5 | } | 
| 1328 |  |  | 
| 1329 |  | void BaseTablet::_rowset_ids_difference(const RowsetIdUnorderedSet& cur, | 
| 1330 |  |                                         const RowsetIdUnorderedSet& pre, | 
| 1331 |  |                                         RowsetIdUnorderedSet* to_add, | 
| 1332 | 6 |                                         RowsetIdUnorderedSet* to_del) { | 
| 1333 | 6 |     for (const auto& id : cur) { | 
| 1334 | 2 |         if (pre.find(id) == pre.end()) { | 
| 1335 | 1 |             to_add->insert(id); | 
| 1336 | 1 |         } | 
| 1337 | 2 |     } | 
| 1338 | 6 |     for (const auto& id : pre) { | 
| 1339 | 1 |         if (cur.find(id) == cur.end()) { | 
| 1340 | 0 |             to_del->insert(id); | 
| 1341 | 0 |         } | 
| 1342 | 1 |     } | 
| 1343 | 6 | } | 
| 1344 |  |  | 
| 1345 |  | Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, | 
| 1346 |  |                                                    int64_t max_version, int64_t txn_id, | 
| 1347 |  |                                                    const RowsetIdUnorderedSet& rowset_ids, | 
| 1348 | 6 |                                                    std::vector<RowsetSharedPtr>* rowsets) { | 
| 1349 | 6 |     RowsetIdUnorderedSet missing_ids; | 
| 1350 | 6 |     for (const auto& rowsetid : rowset_ids) { | 
| 1351 | 2 |         if (!delete_bitmap->delete_bitmap.contains({rowsetid, DeleteBitmap::INVALID_SEGMENT_ID, | 
| 1352 | 2 |                                                     DeleteBitmap::TEMP_VERSION_COMMON})) { | 
| 1353 | 0 |             missing_ids.insert(rowsetid); | 
| 1354 | 0 |         } | 
| 1355 | 2 |     } | 
| 1356 |  |  | 
| 1357 | 6 |     if (!missing_ids.empty()) { | 
| 1358 | 0 |         LOG(WARNING) << "[txn_id:" << txn_id << "][tablet_id:" << tablet_id() | 
| 1359 | 0 |                      << "][max_version: " << max_version | 
| 1360 | 0 |                      << "] check delete bitmap correctness failed!"; | 
| 1361 | 0 |         rapidjson::Document root; | 
| 1362 | 0 |         root.SetObject(); | 
| 1363 | 0 |         rapidjson::Document required_rowsets_arr; | 
| 1364 | 0 |         required_rowsets_arr.SetArray(); | 
| 1365 | 0 |         rapidjson::Document missing_rowsets_arr; | 
| 1366 | 0 |         missing_rowsets_arr.SetArray(); | 
| 1367 |  | 
 | 
| 1368 | 0 |         if (rowsets != nullptr) { | 
| 1369 | 0 |             for (const auto& rowset : *rowsets) { | 
| 1370 | 0 |                 rapidjson::Value value; | 
| 1371 | 0 |                 std::string version_str = rowset->get_rowset_info_str(); | 
| 1372 | 0 |                 value.SetString(version_str.c_str(), | 
| 1373 | 0 |                                 cast_set<rapidjson::SizeType>(version_str.length()), | 
| 1374 | 0 |                                 required_rowsets_arr.GetAllocator()); | 
| 1375 | 0 |                 required_rowsets_arr.PushBack(value, required_rowsets_arr.GetAllocator()); | 
| 1376 | 0 |             } | 
| 1377 | 0 |         } else { | 
| 1378 | 0 |             std::vector<RowsetSharedPtr> tablet_rowsets; | 
| 1379 | 0 |             { | 
| 1380 | 0 |                 std::shared_lock meta_rlock(_meta_lock); | 
| 1381 | 0 |                 tablet_rowsets = get_rowset_by_ids(&rowset_ids); | 
| 1382 | 0 |             } | 
| 1383 | 0 |             for (const auto& rowset : tablet_rowsets) { | 
| 1384 | 0 |                 rapidjson::Value value; | 
| 1385 | 0 |                 std::string version_str = rowset->get_rowset_info_str(); | 
| 1386 | 0 |                 value.SetString(version_str.c_str(), | 
| 1387 | 0 |                                 cast_set<rapidjson::SizeType>(version_str.length()), | 
| 1388 | 0 |                                 required_rowsets_arr.GetAllocator()); | 
| 1389 | 0 |                 required_rowsets_arr.PushBack(value, required_rowsets_arr.GetAllocator()); | 
| 1390 | 0 |             } | 
| 1391 | 0 |         } | 
| 1392 | 0 |         for (const auto& missing_rowset_id : missing_ids) { | 
| 1393 | 0 |             rapidjson::Value miss_value; | 
| 1394 | 0 |             std::string rowset_id_str = missing_rowset_id.to_string(); | 
| 1395 | 0 |             miss_value.SetString(rowset_id_str.c_str(), | 
| 1396 | 0 |                                  cast_set<rapidjson::SizeType>(rowset_id_str.length()), | 
| 1397 | 0 |                                  missing_rowsets_arr.GetAllocator()); | 
| 1398 | 0 |             missing_rowsets_arr.PushBack(miss_value, missing_rowsets_arr.GetAllocator()); | 
| 1399 | 0 |         } | 
| 1400 |  | 
 | 
| 1401 | 0 |         root.AddMember("required_rowsets", required_rowsets_arr, root.GetAllocator()); | 
| 1402 | 0 |         root.AddMember("missing_rowsets", missing_rowsets_arr, root.GetAllocator()); | 
| 1403 | 0 |         rapidjson::StringBuffer strbuf; | 
| 1404 | 0 |         rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf); | 
| 1405 | 0 |         root.Accept(writer); | 
| 1406 | 0 |         std::string rowset_status_string = std::string(strbuf.GetString()); | 
| 1407 | 0 |         LOG_EVERY_SECOND(WARNING) << rowset_status_string; | 
| 1408 |  |         // let it crash if correctness check failed in Debug mode | 
| 1409 | 0 |         DCHECK(false) << "delete bitmap correctness check failed in publish phase!"; | 
| 1410 | 0 |         return Status::InternalError("check delete bitmap failed!"); | 
| 1411 | 0 |     } | 
| 1412 | 6 |     return Status::OK(); | 
| 1413 | 6 | } | 
| 1414 |  |  | 
| 1415 |  | Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInfo* txn_info, | 
| 1416 |  |                                         int64_t txn_id, int64_t txn_expiration, | 
| 1417 | 3 |                                         DeleteBitmapPtr tablet_delete_bitmap) { | 
| 1418 | 3 |     SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency); | 
| 1419 | 3 |     RowsetIdUnorderedSet cur_rowset_ids; | 
| 1420 | 3 |     RowsetIdUnorderedSet rowset_ids_to_add; | 
| 1421 | 3 |     RowsetIdUnorderedSet rowset_ids_to_del; | 
| 1422 | 3 |     RowsetSharedPtr rowset = txn_info->rowset; | 
| 1423 | 3 |     int64_t cur_version = rowset->start_version(); | 
| 1424 |  |  | 
| 1425 | 3 |     std::unique_ptr<RowsetWriter> transient_rs_writer; | 
| 1426 | 3 |     DeleteBitmapPtr delete_bitmap = txn_info->delete_bitmap; | 
| 1427 | 3 |     bool is_partial_update = | 
| 1428 | 3 |             txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update(); | 
| 1429 | 3 |     if (is_partial_update) { | 
| 1430 | 0 |         transient_rs_writer = DORIS_TRY(self->create_transient_rowset_writer( | 
| 1431 | 0 |                 *rowset, txn_info->partial_update_info, txn_expiration)); | 
| 1432 | 0 |         DBUG_EXECUTE_IF("BaseTablet::update_delete_bitmap.after.create_transient_rs_writer", | 
| 1433 | 0 |                         DBUG_BLOCK); | 
| 1434 |  |         // Partial update might generate new segments when there is conflicts while publish, and mark | 
| 1435 |  |         // the same key in original segments as delete. | 
| 1436 |  |         // When the new segment flush fails or the rowset build fails, the deletion marker for the | 
| 1437 |  |         // duplicate key of the original segment should not remain in `txn_info->delete_bitmap`, | 
| 1438 |  |         // so we need to make a copy of `txn_info->delete_bitmap` and make changes on it. | 
| 1439 | 0 |         delete_bitmap = std::make_shared<DeleteBitmap>(*(txn_info->delete_bitmap)); | 
| 1440 | 0 |     } | 
| 1441 |  |  | 
| 1442 | 3 |     OlapStopWatch watch; | 
| 1443 | 3 |     std::vector<segment_v2::SegmentSharedPtr> segments; | 
| 1444 | 3 |     RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments)); | 
| 1445 | 3 |     auto t1 = watch.get_elapse_time_us(); | 
| 1446 |  |  | 
| 1447 | 3 |     int64_t next_visible_version = txn_info->is_txn_load ? txn_info->next_visible_version | 
| 1448 | 3 |                                                          : txn_info->rowset->start_version(); | 
| 1449 | 3 |     { | 
| 1450 | 3 |         std::shared_lock meta_rlock(self->_meta_lock); | 
| 1451 |  |         // tablet is under alter process. The delete bitmap will be calculated after conversion. | 
| 1452 | 3 |         if (self->tablet_state() == TABLET_NOTREADY) { | 
| 1453 | 0 |             LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id=" | 
| 1454 | 0 |                       << self->tablet_id(); | 
| 1455 | 0 |             return Status::OK(); | 
| 1456 | 0 |         } | 
| 1457 | 3 |         RETURN_IF_ERROR(self->get_all_rs_id_unlocked(next_visible_version - 1, &cur_rowset_ids)); | 
| 1458 | 3 |     } | 
| 1459 | 3 |     auto t2 = watch.get_elapse_time_us(); | 
| 1460 |  |  | 
| 1461 | 3 |     _rowset_ids_difference(cur_rowset_ids, txn_info->rowset_ids, &rowset_ids_to_add, | 
| 1462 | 3 |                            &rowset_ids_to_del); | 
| 1463 | 3 |     for (const auto& to_del : rowset_ids_to_del) { | 
| 1464 | 0 |         delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX}); | 
| 1465 | 0 |     } | 
| 1466 |  |  | 
| 1467 | 3 |     std::vector<RowsetSharedPtr> specified_rowsets; | 
| 1468 | 3 |     { | 
| 1469 | 3 |         std::shared_lock meta_rlock(self->_meta_lock); | 
| 1470 | 3 |         specified_rowsets = self->get_rowset_by_ids(&rowset_ids_to_add); | 
| 1471 | 3 |     } | 
| 1472 | 3 |     if (txn_info->is_txn_load) { | 
| 1473 | 0 |         for (auto invisible_rowset : txn_info->invisible_rowsets) { | 
| 1474 | 0 |             specified_rowsets.emplace_back(invisible_rowset); | 
| 1475 | 0 |         } | 
| 1476 | 0 |         std::sort(specified_rowsets.begin(), specified_rowsets.end(), | 
| 1477 | 0 |                   [](RowsetSharedPtr& lhs, RowsetSharedPtr& rhs) { | 
| 1478 | 0 |                       return lhs->end_version() > rhs->end_version(); | 
| 1479 | 0 |                   }); | 
| 1480 | 0 |     } | 
| 1481 | 3 |     auto t3 = watch.get_elapse_time_us(); | 
| 1482 |  |  | 
| 1483 |  |     // If a rowset is produced by compaction before the commit phase of the partial update load | 
| 1484 |  |     // and is not included in txn_info->rowset_ids, we can skip the alignment process of that rowset | 
| 1485 |  |     // because data remains the same before and after compaction. But we still need to calculate the | 
| 1486 |  |     // the delete bitmap for that rowset. | 
| 1487 | 3 |     std::vector<RowsetSharedPtr> rowsets_skip_alignment; | 
| 1488 | 3 |     if (is_partial_update) { | 
| 1489 | 0 |         int64_t max_version_in_flush_phase = | 
| 1490 | 0 |                 txn_info->partial_update_info->max_version_in_flush_phase; | 
| 1491 | 0 |         DCHECK(max_version_in_flush_phase != -1); | 
| 1492 | 0 |         std::vector<RowsetSharedPtr> remained_rowsets; | 
| 1493 | 0 |         for (const auto& specified_rowset : specified_rowsets) { | 
| 1494 | 0 |             if (specified_rowset->end_version() <= max_version_in_flush_phase && | 
| 1495 | 0 |                 specified_rowset->produced_by_compaction()) { | 
| 1496 | 0 |                 rowsets_skip_alignment.emplace_back(specified_rowset); | 
| 1497 | 0 |             } else { | 
| 1498 | 0 |                 remained_rowsets.emplace_back(specified_rowset); | 
| 1499 | 0 |             } | 
| 1500 | 0 |         } | 
| 1501 | 0 |         if (!rowsets_skip_alignment.empty()) { | 
| 1502 | 0 |             specified_rowsets = std::move(remained_rowsets); | 
| 1503 | 0 |         } | 
| 1504 | 0 |     } | 
| 1505 |  |  | 
| 1506 | 3 |     DBUG_EXECUTE_IF("BaseTablet::update_delete_bitmap.enable_spin_wait", { | 
| 1507 | 3 |         auto token = dp->param<std::string>("token", "invalid_token"); | 
| 1508 | 3 |         while (DebugPoints::instance()->is_enable("BaseTablet::update_delete_bitmap.block")) { | 
| 1509 | 3 |             auto block_dp = DebugPoints::instance()->get_debug_point( | 
| 1510 | 3 |                     "BaseTablet::update_delete_bitmap.block"); | 
| 1511 | 3 |             if (block_dp) { | 
| 1512 | 3 |                 auto wait_token = block_dp->param<std::string>("wait_token", ""); | 
| 1513 | 3 |                 LOG(INFO) << "BaseTablet::update_delete_bitmap.enable_spin_wait, wait_token: " | 
| 1514 | 3 |                           << wait_token << ", token: " << token; | 
| 1515 | 3 |                 if (wait_token != token) { | 
| 1516 | 3 |                     break; | 
| 1517 | 3 |                 } | 
| 1518 | 3 |             } | 
| 1519 | 3 |             std::this_thread::sleep_for(std::chrono::milliseconds(50)); | 
| 1520 | 3 |         } | 
| 1521 | 3 |     }); | 
| 1522 |  |  | 
| 1523 | 3 |     if (!rowsets_skip_alignment.empty()) { | 
| 1524 | 0 |         auto token = self->calc_delete_bitmap_executor()->create_token(); | 
| 1525 |  |         // set rowset_writer to nullptr to skip the alignment process | 
| 1526 | 0 |         RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, rowsets_skip_alignment, | 
| 1527 | 0 |                                            delete_bitmap, cur_version - 1, token.get(), nullptr, | 
| 1528 | 0 |                                            tablet_delete_bitmap)); | 
| 1529 | 0 |         RETURN_IF_ERROR(token->wait()); | 
| 1530 | 0 |     } | 
| 1531 |  |  | 
| 1532 |  |     // When there is only one segment, it will be calculated in the current thread. | 
| 1533 |  |     // Otherwise, it will be submitted to the thread pool for calculation. | 
| 1534 | 3 |     if (segments.size() <= 1) { | 
| 1535 | 3 |         RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, specified_rowsets, delete_bitmap, | 
| 1536 | 3 |                                            cur_version - 1, nullptr, transient_rs_writer.get(), | 
| 1537 | 3 |                                            tablet_delete_bitmap)); | 
| 1538 |  |  | 
| 1539 | 3 |     } else { | 
| 1540 | 0 |         auto token = self->calc_delete_bitmap_executor()->create_token(); | 
| 1541 | 0 |         RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, specified_rowsets, delete_bitmap, | 
| 1542 | 0 |                                            cur_version - 1, token.get(), transient_rs_writer.get(), | 
| 1543 | 0 |                                            tablet_delete_bitmap)); | 
| 1544 | 0 |         RETURN_IF_ERROR(token->wait()); | 
| 1545 | 0 |     } | 
| 1546 |  |  | 
| 1547 | 3 |     std::stringstream ss; | 
| 1548 | 3 |     ss << "cost(us): (load segments: " << t1 << ", get all rsid: " << t2 - t1 | 
| 1549 | 3 |        << ", get rowsets: " << t3 - t2 | 
| 1550 | 3 |        << ", calc delete bitmap: " << watch.get_elapse_time_us() - t3 << ")"; | 
| 1551 |  |  | 
| 1552 | 3 |     if (config::enable_merge_on_write_correctness_check && rowset->num_rows() != 0) { | 
| 1553 |  |         // only do correctness check if the rowset has at least one row written | 
| 1554 |  |         // check if all the rowset has ROWSET_SENTINEL_MARK | 
| 1555 | 3 |         auto st = self->check_delete_bitmap_correctness(delete_bitmap, cur_version - 1, -1, | 
| 1556 | 3 |                                                         cur_rowset_ids, &specified_rowsets); | 
| 1557 | 3 |         if (!st.ok()) { | 
| 1558 | 0 |             LOG(WARNING) << fmt::format("delete bitmap correctness check failed in publish phase!"); | 
| 1559 | 0 |         } | 
| 1560 | 3 |     } | 
| 1561 |  |  | 
| 1562 | 3 |     if (transient_rs_writer) { | 
| 1563 | 0 |         auto t4 = watch.get_elapse_time_us(); | 
| 1564 | 0 |         DBUG_EXECUTE_IF("Tablet.update_delete_bitmap.partial_update_write_rowset_fail", { | 
| 1565 | 0 |             if (rand() % 100 < (100 * dp->param("percent", 0.5))) { | 
| 1566 | 0 |                 LOG_WARNING("Tablet.update_delete_bitmap.partial_update_write_rowset random failed") | 
| 1567 | 0 |                         .tag("txn_id", txn_id); | 
| 1568 | 0 |                 return Status::InternalError( | 
| 1569 | 0 |                         "debug update_delete_bitmap partial update write rowset random failed"); | 
| 1570 | 0 |             } | 
| 1571 | 0 |         }); | 
| 1572 |  |         // build rowset writer and merge transient rowset | 
| 1573 | 0 |         RETURN_IF_ERROR(transient_rs_writer->flush()); | 
| 1574 | 0 |         RowsetSharedPtr transient_rowset; | 
| 1575 | 0 |         RETURN_IF_ERROR(transient_rs_writer->build(transient_rowset)); | 
| 1576 | 0 |         auto old_segments = rowset->num_segments(); | 
| 1577 | 0 |         rowset->merge_rowset_meta(*transient_rowset->rowset_meta()); | 
| 1578 | 0 |         auto new_segments = rowset->num_segments(); | 
| 1579 | 0 |         ss << ", " << txn_info->partial_update_info->partial_update_mode_str() | 
| 1580 | 0 |            << " flush rowset (old segment num: " << old_segments | 
| 1581 | 0 |            << ", new segment num: " << new_segments << ")" | 
| 1582 | 0 |            << ", cost:" << watch.get_elapse_time_us() - t4 << "(us)"; | 
| 1583 |  |  | 
| 1584 |  |         // update the shared_ptr to new bitmap, which is consistent with current rowset. | 
| 1585 | 0 |         txn_info->delete_bitmap = delete_bitmap; | 
| 1586 |  |         // erase segment cache cause we will add a segment to rowset | 
| 1587 | 0 |         SegmentLoader::instance()->erase_segments(rowset->rowset_id(), rowset->num_segments()); | 
| 1588 | 0 |     } | 
| 1589 |  |  | 
| 1590 | 3 |     size_t total_rows = std::accumulate( | 
| 1591 | 3 |             segments.begin(), segments.end(), 0, | 
| 1592 | 3 |             [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); | 
| 1593 | 3 |     auto t5 = watch.get_elapse_time_us(); | 
| 1594 | 3 |     int64_t lock_id = txn_info->is_txn_load ? txn_info->lock_id : -1; | 
| 1595 | 3 |     RETURN_IF_ERROR(self->save_delete_bitmap(txn_info, txn_id, delete_bitmap, | 
| 1596 | 3 |                                              transient_rs_writer.get(), cur_rowset_ids, lock_id, | 
| 1597 | 3 |                                              next_visible_version)); | 
| 1598 |  |  | 
| 1599 |  |     // defensive check, check that the delete bitmap cache we wrote is correct | 
| 1600 | 3 |     RETURN_IF_ERROR(self->check_delete_bitmap_cache(txn_id, delete_bitmap.get())); | 
| 1601 |  |  | 
| 1602 | 3 |     LOG(INFO) << "[Publish] construct delete bitmap tablet: " << self->tablet_id() | 
| 1603 | 3 |               << ", rowset_ids to add: " | 
| 1604 | 3 |               << (specified_rowsets.size() + rowsets_skip_alignment.size()) | 
| 1605 | 3 |               << ", rowset_ids to del: " << rowset_ids_to_del.size() | 
| 1606 | 3 |               << ", cur version: " << cur_version << ", transaction_id: " << txn_id << "," | 
| 1607 | 3 |               << ss.str() << " , total rows: " << total_rows | 
| 1608 | 3 |               << ", update delete_bitmap cost: " << watch.get_elapse_time_us() - t5 << "(us)"; | 
| 1609 | 3 |     return Status::OK(); | 
| 1610 | 3 | } | 
| 1611 |  |  | 
| 1612 |  | void BaseTablet::calc_compaction_output_rowset_delete_bitmap( | 
| 1613 |  |         const std::vector<RowsetSharedPtr>& input_rowsets, const RowIdConversion& rowid_conversion, | 
| 1614 |  |         uint64_t start_version, uint64_t end_version, std::set<RowLocation>* missed_rows, | 
| 1615 |  |         std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>* location_map, | 
| 1616 | 0 |         const DeleteBitmap& input_delete_bitmap, DeleteBitmap* output_rowset_delete_bitmap) { | 
| 1617 | 0 |     RowLocation src; | 
| 1618 | 0 |     RowLocation dst; | 
| 1619 | 0 |     for (auto& rowset : input_rowsets) { | 
| 1620 | 0 |         src.rowset_id = rowset->rowset_id(); | 
| 1621 | 0 |         for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) { | 
| 1622 | 0 |             src.segment_id = seg_id; | 
| 1623 | 0 |             DeleteBitmap subset_map(tablet_id()); | 
| 1624 | 0 |             input_delete_bitmap.subset({rowset->rowset_id(), seg_id, start_version}, | 
| 1625 | 0 |                                        {rowset->rowset_id(), seg_id, end_version}, &subset_map); | 
| 1626 |  |             // traverse all versions and convert rowid | 
| 1627 | 0 |             for (auto iter = subset_map.delete_bitmap.begin(); | 
| 1628 | 0 |                  iter != subset_map.delete_bitmap.end(); ++iter) { | 
| 1629 | 0 |                 auto cur_version = std::get<2>(iter->first); | 
| 1630 | 0 |                 for (auto index = iter->second.begin(); index != iter->second.end(); ++index) { | 
| 1631 | 0 |                     src.row_id = *index; | 
| 1632 | 0 |                     if (rowid_conversion.get(src, &dst) != 0) { | 
| 1633 | 0 |                         VLOG_CRITICAL << "Can't find rowid, may be deleted by the delete_handler, " | 
| 1634 | 0 |                                       << " src loaction: |" << src.rowset_id << "|" | 
| 1635 | 0 |                                       << src.segment_id << "|" << src.row_id | 
| 1636 | 0 |                                       << " version: " << cur_version; | 
| 1637 | 0 |                         if (missed_rows) { | 
| 1638 | 0 |                             missed_rows->insert(src); | 
| 1639 | 0 |                         } | 
| 1640 | 0 |                         continue; | 
| 1641 | 0 |                     } | 
| 1642 | 0 |                     VLOG_DEBUG << "calc_compaction_output_rowset_delete_bitmap dst location: |" | 
| 1643 | 0 |                                << dst.rowset_id << "|" << dst.segment_id << "|" << dst.row_id | 
| 1644 | 0 |                                << " src location: |" << src.rowset_id << "|" << src.segment_id | 
| 1645 | 0 |                                << "|" << src.row_id << " start version: " << start_version | 
| 1646 | 0 |                                << "end version" << end_version; | 
| 1647 | 0 |                     if (location_map) { | 
| 1648 | 0 |                         (*location_map)[rowset].emplace_back(src, dst); | 
| 1649 | 0 |                     } | 
| 1650 | 0 |                     output_rowset_delete_bitmap->add({dst.rowset_id, dst.segment_id, cur_version}, | 
| 1651 | 0 |                                                      dst.row_id); | 
| 1652 | 0 |                 } | 
| 1653 | 0 |             } | 
| 1654 | 0 |         } | 
| 1655 | 0 |     } | 
| 1656 | 0 | } | 
| 1657 |  |  | 
| 1658 |  | Status BaseTablet::check_rowid_conversion( | 
| 1659 |  |         RowsetSharedPtr dst_rowset, | 
| 1660 |  |         const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>& | 
| 1661 | 0 |                 location_map) { | 
| 1662 | 0 |     if (location_map.empty()) { | 
| 1663 | 0 |         VLOG_DEBUG << "check_rowid_conversion, location_map is empty"; | 
| 1664 | 0 |         return Status::OK(); | 
| 1665 | 0 |     } | 
| 1666 | 0 |     std::vector<segment_v2::SegmentSharedPtr> dst_segments; | 
| 1667 |  | 
 | 
| 1668 | 0 |     RETURN_IF_ERROR( | 
| 1669 | 0 |             std::dynamic_pointer_cast<BetaRowset>(dst_rowset)->load_segments(&dst_segments)); | 
| 1670 | 0 |     std::unordered_map<RowsetId, std::vector<segment_v2::SegmentSharedPtr>> input_rowsets_segment; | 
| 1671 |  | 
 | 
| 1672 | 0 |     VLOG_DEBUG << "check_rowid_conversion, dst_segments size: " << dst_segments.size(); | 
| 1673 | 0 |     for (auto [src_rowset, locations] : location_map) { | 
| 1674 | 0 |         std::vector<segment_v2::SegmentSharedPtr>& segments = | 
| 1675 | 0 |                 input_rowsets_segment[src_rowset->rowset_id()]; | 
| 1676 | 0 |         if (segments.empty()) { | 
| 1677 | 0 |             RETURN_IF_ERROR( | 
| 1678 | 0 |                     std::dynamic_pointer_cast<BetaRowset>(src_rowset)->load_segments(&segments)); | 
| 1679 | 0 |         } | 
| 1680 | 0 |         for (auto& [src, dst] : locations) { | 
| 1681 | 0 |             std::string src_key; | 
| 1682 | 0 |             std::string dst_key; | 
| 1683 | 0 |             Status s = segments[src.segment_id]->read_key_by_rowid(src.row_id, &src_key); | 
| 1684 | 0 |             if (UNLIKELY(s.is<NOT_IMPLEMENTED_ERROR>())) { | 
| 1685 | 0 |                 LOG(INFO) << "primary key index of old version does not " | 
| 1686 | 0 |                              "support reading key by rowid"; | 
| 1687 | 0 |                 break; | 
| 1688 | 0 |             } | 
| 1689 | 0 |             if (UNLIKELY(!s)) { | 
| 1690 | 0 |                 LOG(WARNING) << "failed to get src key: |" << src.rowset_id << "|" << src.segment_id | 
| 1691 | 0 |                              << "|" << src.row_id << " status: " << s; | 
| 1692 | 0 |                 DCHECK(false); | 
| 1693 | 0 |                 return s; | 
| 1694 | 0 |             } | 
| 1695 |  |  | 
| 1696 | 0 |             s = dst_segments[dst.segment_id]->read_key_by_rowid(dst.row_id, &dst_key); | 
| 1697 | 0 |             if (UNLIKELY(!s)) { | 
| 1698 | 0 |                 LOG(WARNING) << "failed to get dst key: |" << dst.rowset_id << "|" << dst.segment_id | 
| 1699 | 0 |                              << "|" << dst.row_id << " status: " << s; | 
| 1700 | 0 |                 DCHECK(false); | 
| 1701 | 0 |                 return s; | 
| 1702 | 0 |             } | 
| 1703 |  |  | 
| 1704 | 0 |             VLOG_DEBUG << "check_rowid_conversion, src: |" << src.rowset_id << "|" << src.segment_id | 
| 1705 | 0 |                        << "|" << src.row_id << "|" << src_key << " dst: |" << dst.rowset_id << "|" | 
| 1706 | 0 |                        << dst.segment_id << "|" << dst.row_id << "|" << dst_key; | 
| 1707 | 0 |             if (UNLIKELY(src_key.compare(dst_key) != 0)) { | 
| 1708 | 0 |                 LOG(WARNING) << "failed to check key, src key: |" << src.rowset_id << "|" | 
| 1709 | 0 |                              << src.segment_id << "|" << src.row_id << "|" << src_key | 
| 1710 | 0 |                              << " dst key: |" << dst.rowset_id << "|" << dst.segment_id << "|" | 
| 1711 | 0 |                              << dst.row_id << "|" << dst_key; | 
| 1712 | 0 |                 DCHECK(false); | 
| 1713 | 0 |                 return Status::InternalError("failed to check rowid conversion"); | 
| 1714 | 0 |             } | 
| 1715 | 0 |         } | 
| 1716 | 0 |     } | 
| 1717 | 0 |     return Status::OK(); | 
| 1718 | 0 | } | 
| 1719 |  |  | 
| 1720 |  | // The caller should hold _rowset_update_lock and _meta_lock lock. | 
| 1721 |  | Status BaseTablet::update_delete_bitmap_without_lock( | 
| 1722 |  |         const BaseTabletSPtr& self, const RowsetSharedPtr& rowset, | 
| 1723 | 0 |         const std::vector<RowsetSharedPtr>* specified_base_rowsets) { | 
| 1724 | 0 |     DBUG_EXECUTE_IF("BaseTablet.update_delete_bitmap_without_lock.random_failed", { | 
| 1725 | 0 |         auto rnd = rand() % 100; | 
| 1726 | 0 |         auto percent = dp->param("percent", 0.1); | 
| 1727 | 0 |         if (rnd < (100 * percent)) { | 
| 1728 | 0 |             LOG(WARNING) << "BaseTablet.update_delete_bitmap_without_lock.random_failed"; | 
| 1729 | 0 |             return Status::InternalError( | 
| 1730 | 0 |                     "debug tablet update delete bitmap without lock random failed"); | 
| 1731 | 0 |         } else { | 
| 1732 | 0 |             LOG(INFO) << "BaseTablet.update_delete_bitmap_without_lock.random_failed not " | 
| 1733 | 0 |                          "triggered" | 
| 1734 | 0 |                       << ", rnd:" << rnd << ", percent: " << percent; | 
| 1735 | 0 |         } | 
| 1736 | 0 |     }); | 
| 1737 | 0 |     int64_t cur_version = rowset->start_version(); | 
| 1738 | 0 |     std::vector<segment_v2::SegmentSharedPtr> segments; | 
| 1739 | 0 |     RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments)); | 
| 1740 |  |  | 
| 1741 |  |     // If this rowset does not have a segment, there is no need for an update. | 
| 1742 | 0 |     if (segments.empty()) { | 
| 1743 | 0 |         LOG(INFO) << "[Schema Change or Clone] skip to construct delete bitmap tablet: " | 
| 1744 | 0 |                   << self->tablet_id() << " cur max_version: " << cur_version; | 
| 1745 | 0 |         return Status::OK(); | 
| 1746 | 0 |     } | 
| 1747 |  |  | 
| 1748 |  |     // calculate delete bitmap between segments if necessary. | 
| 1749 | 0 |     DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(self->tablet_id()); | 
| 1750 | 0 |     RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments( | 
| 1751 | 0 |             rowset->tablet_schema(), rowset->rowset_id(), segments, delete_bitmap)); | 
| 1752 |  |  | 
| 1753 |  |     // get all base rowsets to calculate on | 
| 1754 | 0 |     std::vector<RowsetSharedPtr> specified_rowsets; | 
| 1755 | 0 |     RowsetIdUnorderedSet cur_rowset_ids; | 
| 1756 | 0 |     if (specified_base_rowsets == nullptr) { | 
| 1757 | 0 |         RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1, &cur_rowset_ids)); | 
| 1758 | 0 |         specified_rowsets = self->get_rowset_by_ids(&cur_rowset_ids); | 
| 1759 | 0 |     } else { | 
| 1760 | 0 |         specified_rowsets = *specified_base_rowsets; | 
| 1761 | 0 |     } | 
| 1762 |  |  | 
| 1763 | 0 |     OlapStopWatch watch; | 
| 1764 | 0 |     auto token = self->calc_delete_bitmap_executor()->create_token(); | 
| 1765 | 0 |     RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, specified_rowsets, delete_bitmap, | 
| 1766 | 0 |                                        cur_version - 1, token.get())); | 
| 1767 | 0 |     RETURN_IF_ERROR(token->wait()); | 
| 1768 | 0 |     size_t total_rows = std::accumulate( | 
| 1769 | 0 |             segments.begin(), segments.end(), 0, | 
| 1770 | 0 |             [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); | 
| 1771 | 0 |     LOG(INFO) << "[Schema Change or Clone] construct delete bitmap tablet: " << self->tablet_id() | 
| 1772 | 0 |               << ", rowset_ids: " << cur_rowset_ids.size() << ", cur max_version: " << cur_version | 
| 1773 | 0 |               << ", transaction_id: " << -1 << ", cost: " << watch.get_elapse_time_us() | 
| 1774 | 0 |               << "(us), total rows: " << total_rows; | 
| 1775 | 0 |     if (config::enable_merge_on_write_correctness_check) { | 
| 1776 |  |         // check if all the rowset has ROWSET_SENTINEL_MARK | 
| 1777 | 0 |         auto st = self->check_delete_bitmap_correctness(delete_bitmap, cur_version - 1, -1, | 
| 1778 | 0 |                                                         cur_rowset_ids, &specified_rowsets); | 
| 1779 | 0 |         if (!st.ok()) { | 
| 1780 | 0 |             LOG(WARNING) << fmt::format("delete bitmap correctness check failed in publish phase!"); | 
| 1781 | 0 |         } | 
| 1782 | 0 |         delete_bitmap->remove_sentinel_marks(); | 
| 1783 | 0 |     } | 
| 1784 | 0 |     for (auto& iter : delete_bitmap->delete_bitmap) { | 
| 1785 | 0 |         self->_tablet_meta->delete_bitmap().merge( | 
| 1786 | 0 |                 {std::get<0>(iter.first), std::get<1>(iter.first), cur_version}, iter.second); | 
| 1787 | 0 |     } | 
| 1788 |  | 
 | 
| 1789 | 0 |     return Status::OK(); | 
| 1790 | 0 | } | 
| 1791 |  |  | 
| 1792 |  | void BaseTablet::agg_delete_bitmap_for_stale_rowsets( | 
| 1793 | 0 |         Version version, DeleteBitmapKeyRanges& remove_delete_bitmap_key_ranges) { | 
| 1794 | 0 |     if (!config::enable_agg_and_remove_pre_rowsets_delete_bitmap) { | 
| 1795 | 0 |         return; | 
| 1796 | 0 |     } | 
| 1797 | 0 |     if (!(keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write())) { | 
| 1798 | 0 |         return; | 
| 1799 | 0 |     } | 
| 1800 | 0 |     int64_t start_version = version.first; | 
| 1801 | 0 |     int64_t end_version = version.second; | 
| 1802 | 0 |     if (start_version == end_version) { | 
| 1803 | 0 |         return; | 
| 1804 | 0 |     } | 
| 1805 | 0 |     DCHECK(start_version < end_version) | 
| 1806 | 0 |             << ". start_version: " << start_version << ", end_version: " << end_version; | 
| 1807 |  |     // get pre rowsets | 
| 1808 | 0 |     std::vector<RowsetSharedPtr> pre_rowsets {}; | 
| 1809 | 0 |     { | 
| 1810 | 0 |         std::shared_lock rdlock(_meta_lock); | 
| 1811 | 0 |         for (const auto& it2 : _rs_version_map) { | 
| 1812 | 0 |             if (it2.first.second < start_version) { | 
| 1813 | 0 |                 pre_rowsets.emplace_back(it2.second); | 
| 1814 | 0 |             } | 
| 1815 | 0 |         } | 
| 1816 | 0 |     } | 
| 1817 | 0 |     std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator); | 
| 1818 |  |     // do agg for pre rowsets | 
| 1819 | 0 |     DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id()); | 
| 1820 | 0 |     for (auto& rowset : pre_rowsets) { | 
| 1821 | 0 |         for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) { | 
| 1822 | 0 |             auto d = tablet_meta()->delete_bitmap().get_agg_without_cache( | 
| 1823 | 0 |                     {rowset->rowset_id(), seg_id, end_version}, start_version); | 
| 1824 | 0 |             if (d->isEmpty()) { | 
| 1825 | 0 |                 continue; | 
| 1826 | 0 |             } | 
| 1827 | 0 |             VLOG_DEBUG << "agg delete bitmap for tablet_id=" << tablet_id() | 
| 1828 | 0 |                        << ", rowset_id=" << rowset->rowset_id() << ", seg_id=" << seg_id | 
| 1829 | 0 |                        << ", rowset_version=" << rowset->version().to_string() | 
| 1830 | 0 |                        << ". compaction start_version=" << start_version | 
| 1831 | 0 |                        << ", end_version=" << end_version << ", delete_bitmap=" << d->cardinality(); | 
| 1832 | 0 |             DeleteBitmap::BitmapKey start_key {rowset->rowset_id(), seg_id, start_version}; | 
| 1833 | 0 |             DeleteBitmap::BitmapKey end_key {rowset->rowset_id(), seg_id, end_version}; | 
| 1834 | 0 |             new_delete_bitmap->set(end_key, *d); | 
| 1835 | 0 |             remove_delete_bitmap_key_ranges.emplace_back(start_key, end_key); | 
| 1836 | 0 |         } | 
| 1837 | 0 |     } | 
| 1838 | 0 |     DBUG_EXECUTE_IF("BaseTablet.agg_delete_bitmap_for_stale_rowsets.merge_delete_bitmap.block", | 
| 1839 | 0 |                     DBUG_BLOCK); | 
| 1840 | 0 |     tablet_meta()->delete_bitmap().merge(*new_delete_bitmap); | 
| 1841 | 0 | } | 
| 1842 |  |  | 
| 1843 |  | void BaseTablet::check_agg_delete_bitmap_for_stale_rowsets(int64_t& useless_rowset_count, | 
| 1844 | 0 |                                                            int64_t& useless_rowset_version_count) { | 
| 1845 | 0 |     std::map<RowsetId, Version> rowset_ids; | 
| 1846 | 0 |     std::set<int64_t> end_versions; | 
| 1847 | 0 |     traverse_rowsets( | 
| 1848 | 0 |             [&rowset_ids, &end_versions](const RowsetSharedPtr& rs) { | 
| 1849 | 0 |                 rowset_ids[rs->rowset_id()] = rs->version(); | 
| 1850 | 0 |                 end_versions.emplace(rs->end_version()); | 
| 1851 | 0 |             }, | 
| 1852 | 0 |             true); | 
| 1853 |  | 
 | 
| 1854 | 0 |     std::set<RowsetId> useless_rowsets; | 
| 1855 | 0 |     std::map<RowsetId, std::vector<int64_t>> useless_rowset_versions; | 
| 1856 | 0 |     { | 
| 1857 | 0 |         _tablet_meta->delete_bitmap().traverse_rowset_and_version( | 
| 1858 |  |                 // 0: rowset and rowset with version exists | 
| 1859 |  |                 // -1: rowset does not exist | 
| 1860 |  |                 // -2: find next <rowset, version> | 
| 1861 |  |                 //     rowset exist, rowset with version does not exist | 
| 1862 |  |                 //     sequence table | 
| 1863 | 0 |                 [&](const RowsetId& rowset_id, int64_t version) { | 
| 1864 | 0 |                     auto rowset_it = rowset_ids.find(rowset_id); | 
| 1865 | 0 |                     if (rowset_it == rowset_ids.end()) { | 
| 1866 | 0 |                         useless_rowsets.emplace(rowset_id); | 
| 1867 | 0 |                         return -1; | 
| 1868 | 0 |                     } | 
| 1869 | 0 |                     if (end_versions.find(version) == end_versions.end()) { | 
| 1870 | 0 |                         if (tablet_schema()->has_sequence_col()) { | 
| 1871 | 0 |                             auto rowset_version = rowset_it->second; | 
| 1872 | 0 |                             if (version >= rowset_version.first && | 
| 1873 | 0 |                                 version <= rowset_version.second) { | 
| 1874 | 0 |                                 return -2; | 
| 1875 | 0 |                             } | 
| 1876 | 0 |                         } | 
| 1877 | 0 |                         if (useless_rowset_versions.find(rowset_id) == | 
| 1878 | 0 |                             useless_rowset_versions.end()) { | 
| 1879 | 0 |                             useless_rowset_versions[rowset_id] = {}; | 
| 1880 | 0 |                         } | 
| 1881 | 0 |                         useless_rowset_versions[rowset_id].emplace_back(version); | 
| 1882 | 0 |                         return -2; | 
| 1883 | 0 |                     } | 
| 1884 | 0 |                     return 0; | 
| 1885 | 0 |                 }); | 
| 1886 | 0 |     } | 
| 1887 | 0 |     useless_rowset_count = useless_rowsets.size(); | 
| 1888 | 0 |     useless_rowset_version_count = useless_rowset_versions.size(); | 
| 1889 | 0 |     if (!useless_rowsets.empty() || !useless_rowset_versions.empty()) { | 
| 1890 | 0 |         std::stringstream ss; | 
| 1891 | 0 |         if (!useless_rowsets.empty()) { | 
| 1892 | 0 |             ss << "useless rowsets: {"; | 
| 1893 | 0 |             for (auto it = useless_rowsets.begin(); it != useless_rowsets.end(); ++it) { | 
| 1894 | 0 |                 if (it != useless_rowsets.begin()) { | 
| 1895 | 0 |                     ss << ", "; | 
| 1896 | 0 |                 } | 
| 1897 | 0 |                 ss << it->to_string(); | 
| 1898 | 0 |             } | 
| 1899 | 0 |             ss << "}. "; | 
| 1900 | 0 |         } | 
| 1901 | 0 |         if (!useless_rowset_versions.empty()) { | 
| 1902 | 0 |             ss << "useless rowset versions: {"; | 
| 1903 | 0 |             for (auto iter = useless_rowset_versions.begin(); iter != useless_rowset_versions.end(); | 
| 1904 | 0 |                  ++iter) { | 
| 1905 | 0 |                 if (iter != useless_rowset_versions.begin()) { | 
| 1906 | 0 |                     ss << ", "; | 
| 1907 | 0 |                 } | 
| 1908 | 0 |                 ss << iter->first.to_string() << ": ["; | 
| 1909 |  |                 // some versions are continuous, such as [8, 9, 10, 11, 13, 17, 18] | 
| 1910 |  |                 // print as [8-11, 13, 17-18] | 
| 1911 | 0 |                 int64_t last_start_version = -1; | 
| 1912 | 0 |                 int64_t last_end_version = -1; | 
| 1913 | 0 |                 for (int64_t version : iter->second) { | 
| 1914 | 0 |                     if (last_start_version == -1) { | 
| 1915 | 0 |                         last_start_version = version; | 
| 1916 | 0 |                         last_end_version = version; | 
| 1917 | 0 |                         continue; | 
| 1918 | 0 |                     } | 
| 1919 | 0 |                     if (last_end_version + 1 == version) { | 
| 1920 | 0 |                         last_end_version = version; | 
| 1921 | 0 |                     } else { | 
| 1922 | 0 |                         if (last_start_version == last_end_version) { | 
| 1923 | 0 |                             ss << last_start_version << ", "; | 
| 1924 | 0 |                         } else { | 
| 1925 | 0 |                             ss << last_start_version << "-" << last_end_version << ", "; | 
| 1926 | 0 |                         } | 
| 1927 | 0 |                         last_start_version = version; | 
| 1928 | 0 |                         last_end_version = version; | 
| 1929 | 0 |                     } | 
| 1930 | 0 |                 } | 
| 1931 | 0 |                 if (last_start_version == last_end_version) { | 
| 1932 | 0 |                     ss << last_start_version; | 
| 1933 | 0 |                 } else { | 
| 1934 | 0 |                     ss << last_start_version << "-" << last_end_version; | 
| 1935 | 0 |                 } | 
| 1936 |  | 
 | 
| 1937 | 0 |                 ss << "]"; | 
| 1938 | 0 |             } | 
| 1939 | 0 |             ss << "}."; | 
| 1940 | 0 |         } | 
| 1941 | 0 |         LOG(WARNING) << "failed check_agg_delete_bitmap_for_stale_rowsets for tablet_id=" | 
| 1942 | 0 |                      << tablet_id() << ". " << ss.str(); | 
| 1943 | 0 |     } else { | 
| 1944 | 0 |         LOG(INFO) << "succeed check_agg_delete_bitmap_for_stale_rowsets for tablet_id=" | 
| 1945 | 0 |                   << tablet_id(); | 
| 1946 | 0 |     } | 
| 1947 | 0 | } | 
| 1948 |  |  | 
| 1949 | 0 | RowsetSharedPtr BaseTablet::get_rowset(const RowsetId& rowset_id) { | 
| 1950 | 0 |     std::shared_lock rdlock(_meta_lock); | 
| 1951 | 0 |     for (auto& version_rowset : _rs_version_map) { | 
| 1952 | 0 |         if (version_rowset.second->rowset_id() == rowset_id) { | 
| 1953 | 0 |             return version_rowset.second; | 
| 1954 | 0 |         } | 
| 1955 | 0 |     } | 
| 1956 | 0 |     for (auto& stale_version_rowset : _stale_rs_version_map) { | 
| 1957 | 0 |         if (stale_version_rowset.second->rowset_id() == rowset_id) { | 
| 1958 | 0 |             return stale_version_rowset.second; | 
| 1959 | 0 |         } | 
| 1960 | 0 |     } | 
| 1961 | 0 |     return nullptr; | 
| 1962 | 0 | } | 
| 1963 |  |  | 
| 1964 | 1 | std::vector<RowsetSharedPtr> BaseTablet::get_snapshot_rowset(bool include_stale_rowset) const { | 
| 1965 | 1 |     std::shared_lock rdlock(_meta_lock); | 
| 1966 | 1 |     std::vector<RowsetSharedPtr> rowsets; | 
| 1967 | 1 |     std::transform(_rs_version_map.cbegin(), _rs_version_map.cend(), std::back_inserter(rowsets), | 
| 1968 | 28 |                    [](auto& kv) { return kv.second; }); | 
| 1969 | 1 |     if (include_stale_rowset) { | 
| 1970 | 0 |         std::transform(_stale_rs_version_map.cbegin(), _stale_rs_version_map.cend(), | 
| 1971 | 0 |                        std::back_inserter(rowsets), [](auto& kv) { return kv.second; }); | 
| 1972 | 0 |     } | 
| 1973 | 1 |     return rowsets; | 
| 1974 | 1 | } | 
| 1975 |  |  | 
| 1976 |  | void BaseTablet::calc_consecutive_empty_rowsets( | 
| 1977 |  |         std::vector<RowsetSharedPtr>* empty_rowsets, | 
| 1978 | 4 |         const std::vector<RowsetSharedPtr>& candidate_rowsets, int64_t limit) { | 
| 1979 | 4 |     int len = cast_set<int>(candidate_rowsets.size()); | 
| 1980 | 12 |     for (int i = 0; i < len - 1; ++i) { | 
| 1981 | 9 |         auto rowset = candidate_rowsets[i]; | 
| 1982 | 9 |         auto next_rowset = candidate_rowsets[i + 1]; | 
| 1983 |  |  | 
| 1984 |  |         // identify two consecutive rowsets that are empty | 
| 1985 | 9 |         if (rowset->num_segments() == 0 && next_rowset->num_segments() == 0 && | 
| 1986 | 9 |             !rowset->rowset_meta()->has_delete_predicate() && | 
| 1987 | 9 |             !next_rowset->rowset_meta()->has_delete_predicate() && | 
| 1988 | 9 |             rowset->end_version() == next_rowset->start_version() - 1) { | 
| 1989 | 1 |             empty_rowsets->emplace_back(rowset); | 
| 1990 | 1 |             empty_rowsets->emplace_back(next_rowset); | 
| 1991 | 1 |             rowset = next_rowset; | 
| 1992 | 1 |             int next_index = i + 2; | 
| 1993 |  |  | 
| 1994 |  |             // keep searching for consecutive empty rowsets | 
| 1995 | 6 |             while (next_index < len && candidate_rowsets[next_index]->num_segments() == 0 && | 
| 1996 | 6 |                    !candidate_rowsets[next_index]->rowset_meta()->has_delete_predicate() && | 
| 1997 | 6 |                    rowset->end_version() == candidate_rowsets[next_index]->start_version() - 1) { | 
| 1998 | 5 |                 empty_rowsets->emplace_back(candidate_rowsets[next_index]); | 
| 1999 | 5 |                 rowset = candidate_rowsets[next_index++]; | 
| 2000 | 5 |             } | 
| 2001 |  |             // if the number of consecutive empty rowset reach the limit, | 
| 2002 |  |             // and there are still rowsets following them | 
| 2003 | 1 |             if (empty_rowsets->size() >= limit && next_index < len) { | 
| 2004 | 1 |                 return; | 
| 2005 | 1 |             } else { | 
| 2006 |  |                 // current rowset is not empty, start searching from that rowset in the next | 
| 2007 | 0 |                 i = next_index - 1; | 
| 2008 | 0 |                 empty_rowsets->clear(); | 
| 2009 | 0 |             } | 
| 2010 | 1 |         } | 
| 2011 | 9 |     } | 
| 2012 | 4 | } | 
| 2013 |  |  | 
| 2014 |  | Status BaseTablet::calc_file_crc(uint32_t* crc_value, int64_t start_version, int64_t end_version, | 
| 2015 | 0 |                                  uint32_t* rowset_count, int64_t* file_count) { | 
| 2016 | 0 |     Version v(start_version, end_version); | 
| 2017 | 0 |     std::vector<RowsetSharedPtr> rowsets; | 
| 2018 | 0 |     traverse_rowsets([&rowsets, &v](const auto& rs) { | 
| 2019 |  |         // get all rowsets | 
| 2020 | 0 |         if (v.contains(rs->version())) { | 
| 2021 | 0 |             rowsets.emplace_back(rs); | 
| 2022 | 0 |         } | 
| 2023 | 0 |     }); | 
| 2024 | 0 |     std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator); | 
| 2025 | 0 |     *rowset_count = cast_set<uint32_t>(rowsets.size()); | 
| 2026 |  | 
 | 
| 2027 | 0 |     *crc_value = 0; | 
| 2028 | 0 |     *file_count = 0; | 
| 2029 | 0 |     for (const auto& rs : rowsets) { | 
| 2030 | 0 |         uint32_t rs_crc_value = 0; | 
| 2031 | 0 |         int64_t rs_file_count = 0; | 
| 2032 | 0 |         auto rowset = std::static_pointer_cast<BetaRowset>(rs); | 
| 2033 | 0 |         auto st = rowset->calc_file_crc(&rs_crc_value, &rs_file_count); | 
| 2034 | 0 |         if (!st.ok()) { | 
| 2035 | 0 |             return st; | 
| 2036 | 0 |         } | 
| 2037 |  |         // crc_value is calculated based on the crc_value of each rowset. | 
| 2038 | 0 |         *crc_value = crc32c::Extend(*crc_value, reinterpret_cast<const char*>(&rs_crc_value), | 
| 2039 | 0 |                                     sizeof(rs_crc_value)); | 
| 2040 | 0 |         *file_count += rs_file_count; | 
| 2041 | 0 |     } | 
| 2042 | 0 |     return Status::OK(); | 
| 2043 | 0 | } | 
| 2044 |  |  | 
| 2045 | 0 | Status BaseTablet::show_nested_index_file(std::string* json_meta) { | 
| 2046 | 0 |     Version v(0, max_version_unlocked()); | 
| 2047 | 0 |     std::vector<RowsetSharedPtr> rowsets; | 
| 2048 | 0 |     traverse_rowsets([&rowsets, &v](const auto& rs) { | 
| 2049 |  |         // get all rowsets | 
| 2050 | 0 |         if (v.contains(rs->version())) { | 
| 2051 | 0 |             rowsets.emplace_back(rs); | 
| 2052 | 0 |         } | 
| 2053 | 0 |     }); | 
| 2054 | 0 |     std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator); | 
| 2055 |  | 
 | 
| 2056 | 0 |     rapidjson::Document doc; | 
| 2057 | 0 |     doc.SetObject(); | 
| 2058 | 0 |     rapidjson::Document::AllocatorType& allocator = doc.GetAllocator(); | 
| 2059 | 0 |     rapidjson::Value tabletIdValue(tablet_id()); | 
| 2060 | 0 |     doc.AddMember("tablet_id", tabletIdValue, allocator); | 
| 2061 |  | 
 | 
| 2062 | 0 |     rapidjson::Value rowsets_value(rapidjson::kArrayType); | 
| 2063 |  | 
 | 
| 2064 | 0 |     for (const auto& rs : rowsets) { | 
| 2065 | 0 |         rapidjson::Value rowset_value(rapidjson::kObjectType); | 
| 2066 |  | 
 | 
| 2067 | 0 |         auto rowset = std::static_pointer_cast<BetaRowset>(rs); | 
| 2068 | 0 |         RETURN_IF_ERROR(rowset->show_nested_index_file(&rowset_value, allocator)); | 
| 2069 | 0 |         rowsets_value.PushBack(rowset_value, allocator); | 
| 2070 | 0 |     } | 
| 2071 | 0 |     doc.AddMember("rowsets", rowsets_value, allocator); | 
| 2072 |  | 
 | 
| 2073 | 0 |     rapidjson::StringBuffer buffer; | 
| 2074 | 0 |     rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer); | 
| 2075 | 0 |     doc.Accept(writer); | 
| 2076 | 0 |     *json_meta = std::string(buffer.GetString()); | 
| 2077 |  | 
 | 
| 2078 | 0 |     return Status::OK(); | 
| 2079 | 0 | } | 
| 2080 |  |  | 
| 2081 |  | void BaseTablet::get_base_rowset_delete_bitmap_count( | 
| 2082 |  |         uint64_t* max_base_rowset_delete_bitmap_score, | 
| 2083 | 0 |         int64_t* max_base_rowset_delete_bitmap_score_tablet_id) { | 
| 2084 | 0 |     std::vector<RowsetSharedPtr> rowsets_; | 
| 2085 | 0 |     std::string base_rowset_id_str; | 
| 2086 | 0 |     { | 
| 2087 | 0 |         std::shared_lock rowset_ldlock(this->get_header_lock()); | 
| 2088 | 0 |         for (const auto& it : _rs_version_map) { | 
| 2089 | 0 |             rowsets_.emplace_back(it.second); | 
| 2090 | 0 |         } | 
| 2091 | 0 |     } | 
| 2092 | 0 |     std::sort(rowsets_.begin(), rowsets_.end(), Rowset::comparator); | 
| 2093 | 0 |     if (!rowsets_.empty()) { | 
| 2094 | 0 |         bool base_found = false; | 
| 2095 | 0 |         for (auto& rowset : rowsets_) { | 
| 2096 | 0 |             if (rowset->start_version() > 2) { | 
| 2097 | 0 |                 break; | 
| 2098 | 0 |             } | 
| 2099 | 0 |             base_found = true; | 
| 2100 | 0 |             uint64_t base_rowset_delete_bitmap_count = | 
| 2101 | 0 |                     this->tablet_meta()->delete_bitmap().get_count_with_range( | 
| 2102 | 0 |                             {rowset->rowset_id(), 0, 0}, | 
| 2103 | 0 |                             {rowset->rowset_id(), UINT32_MAX, UINT64_MAX}); | 
| 2104 | 0 |             if (base_rowset_delete_bitmap_count > *max_base_rowset_delete_bitmap_score) { | 
| 2105 | 0 |                 *max_base_rowset_delete_bitmap_score = base_rowset_delete_bitmap_count; | 
| 2106 | 0 |                 *max_base_rowset_delete_bitmap_score_tablet_id = this->tablet_id(); | 
| 2107 | 0 |             } | 
| 2108 | 0 |         } | 
| 2109 | 0 |         if (!base_found) { | 
| 2110 | 0 |             LOG(WARNING) << "can not found base rowset for tablet " << tablet_id(); | 
| 2111 | 0 |         } | 
| 2112 | 0 |     } | 
| 2113 | 0 | } | 
| 2114 |  |  | 
| 2115 | 337 | void TabletReadSource::fill_delete_predicates() { | 
| 2116 | 337 |     DCHECK_EQ(delete_predicates.size(), 0); | 
| 2117 | 337 |     auto delete_pred_view = | 
| 2118 | 1.22k |             rs_splits | std::views::transform([](auto&& split) { | 
| 2119 | 1.22k |                 return split.rs_reader->rowset()->rowset_meta(); | 
| 2120 | 1.22k |             }) | | 
| 2121 | 1.07k |             std::views::filter([](const auto& rs_meta) { return rs_meta->has_delete_predicate(); }); | 
| 2122 | 337 |     delete_predicates = {delete_pred_view.begin(), delete_pred_view.end()}; | 
| 2123 | 337 | } | 
| 2124 |  |  | 
| 2125 | 32 | int32_t BaseTablet::max_version_config() { | 
| 2126 | 32 |     int32_t max_version = tablet_meta()->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY | 
| 2127 | 32 |                                   ? std::max(config::time_series_max_tablet_version_num, | 
| 2128 | 0 |                                              config::max_tablet_version_num) | 
| 2129 | 32 |                                   : config::max_tablet_version_num; | 
| 2130 | 32 |     return max_version; | 
| 2131 | 32 | } | 
| 2132 |  |  | 
| 2133 | 0 | void BaseTablet::prefill_dbm_agg_cache(const RowsetSharedPtr& rowset, int64_t version) { | 
| 2134 | 0 |     for (std::size_t i = 0; i < rowset->num_segments(); i++) { | 
| 2135 | 0 |         tablet_meta()->delete_bitmap().get_agg({rowset->rowset_id(), i, version}); | 
| 2136 | 0 |     } | 
| 2137 | 0 | } | 
| 2138 |  |  | 
| 2139 | 0 | void BaseTablet::prefill_dbm_agg_cache_after_compaction(const RowsetSharedPtr& output_rowset) { | 
| 2140 | 0 |     if (keys_type() == KeysType::UNIQUE_KEYS && enable_unique_key_merge_on_write() && | 
| 2141 | 0 |         (config::enable_prefill_output_dbm_agg_cache_after_compaction || | 
| 2142 | 0 |          config::enable_prefill_all_dbm_agg_cache_after_compaction)) { | 
| 2143 | 0 |         int64_t cur_max_version {-1}; | 
| 2144 | 0 |         { | 
| 2145 | 0 |             std::shared_lock rlock(get_header_lock()); | 
| 2146 | 0 |             cur_max_version = max_version_unlocked(); | 
| 2147 | 0 |         } | 
| 2148 | 0 |         if (config::enable_prefill_all_dbm_agg_cache_after_compaction) { | 
| 2149 | 0 |             traverse_rowsets( | 
| 2150 | 0 |                     [&](const RowsetSharedPtr& rs) { prefill_dbm_agg_cache(rs, cur_max_version); }, | 
| 2151 | 0 |                     false); | 
| 2152 | 0 |         } else if (config::enable_prefill_output_dbm_agg_cache_after_compaction) { | 
| 2153 | 0 |             prefill_dbm_agg_cache(output_rowset, cur_max_version); | 
| 2154 | 0 |         } | 
| 2155 | 0 |     } | 
| 2156 | 0 | } | 
| 2157 |  |  | 
| 2158 |  | } // namespace doris |