be/src/storage/tablet/base_tablet.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/tablet/base_tablet.h" |
19 | | |
20 | | #include <bthread/mutex.h> |
21 | | #include <crc32c/crc32c.h> |
22 | | #include <fmt/format.h> |
23 | | #include <rapidjson/prettywriter.h> |
24 | | |
25 | | #include <algorithm> |
26 | | #include <cstdint> |
27 | | #include <iterator> |
28 | | #include <random> |
29 | | #include <shared_mutex> |
30 | | |
31 | | #include "cloud/cloud_tablet.h" |
32 | | #include "cloud/config.h" |
33 | | #include "common/cast_set.h" |
34 | | #include "common/logging.h" |
35 | | #include "common/metrics/doris_metrics.h" |
36 | | #include "common/status.h" |
37 | | #include "core/assert_cast.h" |
38 | | #include "core/data_type/data_type_factory.hpp" |
39 | | #include "exec/sink/autoinc_buffer.h" // GlobalAutoIncBuffers |
40 | | #include "load/memtable/memtable.h" |
41 | | #include "service/point_query_executor.h" |
42 | | #include "storage/binlog.h" |
43 | | #include "storage/compaction/cumulative_compaction_time_series_policy.h" |
44 | | #include "storage/delete/calc_delete_bitmap_executor.h" |
45 | | #include "storage/delete/delete_bitmap_calculator.h" |
46 | | #include "storage/index/primary_key_index.h" |
47 | | #include "storage/iterators.h" |
48 | | #include "storage/partial_update_info.h" |
49 | | #include "storage/rowid_conversion.h" |
50 | | #include "storage/rowset/beta_rowset.h" |
51 | | #include "storage/rowset/group_rowset_writer.h" |
52 | | #include "storage/rowset/rowset.h" |
53 | | #include "storage/rowset/rowset_factory.h" |
54 | | #include "storage/rowset/rowset_fwd.h" |
55 | | #include "storage/rowset/rowset_reader.h" |
56 | | #include "storage/rowset/rowset_writer_context.h" |
57 | | #include "storage/segment/column_reader.h" |
58 | | #include "storage/tablet/tablet_fwd.h" |
59 | | #include "storage/txn/txn_manager.h" |
60 | | #include "util/bvar_helper.h" |
61 | | #include "util/debug_points.h" |
62 | | #include "util/jsonb/serialize.h" |
63 | | |
64 | | namespace doris { |
65 | | |
66 | | using namespace ErrorCode; |
67 | | |
68 | | namespace { |
69 | | |
70 | | bvar::LatencyRecorder g_tablet_commit_phase_update_delete_bitmap_latency( |
71 | | "doris_pk", "commit_phase_update_delete_bitmap"); |
72 | | bvar::LatencyRecorder g_tablet_lookup_rowkey_latency("doris_pk", "tablet_lookup_rowkey"); |
73 | | bvar::Adder<uint64_t> g_tablet_pk_not_found("doris_pk", "lookup_not_found"); |
74 | | bvar::PerSecond<bvar::Adder<uint64_t>> g_tablet_pk_not_found_per_second( |
75 | | "doris_pk", "lookup_not_found_per_second", &g_tablet_pk_not_found, 60); |
76 | | bvar::LatencyRecorder g_tablet_update_delete_bitmap_latency("doris_pk", "update_delete_bitmap"); |
77 | | |
78 | | static bvar::Adder<size_t> g_total_tablet_num("doris_total_tablet_num"); |
79 | | |
80 | | Status _get_segment_column_iterator(const BetaRowsetSharedPtr& rowset, uint32_t segid, |
81 | | const TabletColumn& target_column, |
82 | | SegmentCacheHandle* segment_cache_handle, |
83 | | std::unique_ptr<segment_v2::ColumnIterator>* column_iterator, |
84 | 8.56k | OlapReaderStatistics* stats) { |
85 | 8.56k | RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(rowset, segment_cache_handle, true)); |
86 | | // find segment |
87 | 8.56k | auto it = std::find_if( |
88 | 8.56k | segment_cache_handle->get_segments().begin(), |
89 | 8.56k | segment_cache_handle->get_segments().end(), |
90 | 8.59k | [&segid](const segment_v2::SegmentSharedPtr& seg) { return seg->id() == segid; }); |
91 | 8.56k | if (it == segment_cache_handle->get_segments().end()) { |
92 | 0 | return Status::NotFound(fmt::format("rowset {} 's segemnt not found, seg_id {}", |
93 | 0 | rowset->rowset_id().to_string(), segid)); |
94 | 0 | } |
95 | 8.56k | segment_v2::SegmentSharedPtr segment = *it; |
96 | 8.56k | StorageReadOptions opts; |
97 | 8.56k | opts.stats = stats; |
98 | 8.56k | RETURN_IF_ERROR(segment->new_column_iterator(target_column, column_iterator, &opts)); |
99 | 8.56k | segment_v2::ColumnIteratorOptions opt { |
100 | 8.56k | .use_page_cache = !config::disable_storage_page_cache, |
101 | 8.56k | .file_reader = segment->file_reader().get(), |
102 | 8.56k | .stats = stats, |
103 | 8.56k | .io_ctx = io::IOContext {.reader_type = ReaderType::READER_QUERY, |
104 | 8.56k | .file_cache_stats = &stats->file_cache_stats}, |
105 | 8.56k | }; |
106 | 8.56k | RETURN_IF_ERROR((*column_iterator)->init(opt)); |
107 | 8.56k | return Status::OK(); |
108 | 8.56k | } |
109 | | |
110 | | } // namespace |
111 | | |
112 | | extern MetricPrototype METRIC_query_scan_bytes; |
113 | | extern MetricPrototype METRIC_query_scan_rows; |
114 | | extern MetricPrototype METRIC_query_scan_count; |
115 | | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_bytes, MetricUnit::BYTES); |
116 | | DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_finish_count, MetricUnit::OPERATIONS); |
117 | | |
118 | 409k | BaseTablet::BaseTablet(TabletMetaSharedPtr tablet_meta) : _tablet_meta(std::move(tablet_meta)) { |
119 | 409k | _metric_entity = DorisMetrics::instance()->metric_registry()->register_entity( |
120 | 409k | fmt::format("Tablet.{}", tablet_id()), {{"tablet_id", std::to_string(tablet_id())}}, |
121 | 409k | MetricEntityType::kTablet); |
122 | 409k | INT_COUNTER_METRIC_REGISTER(_metric_entity, query_scan_bytes); |
123 | 409k | INT_COUNTER_METRIC_REGISTER(_metric_entity, query_scan_rows); |
124 | 409k | INT_COUNTER_METRIC_REGISTER(_metric_entity, query_scan_count); |
125 | 409k | INT_COUNTER_METRIC_REGISTER(_metric_entity, flush_bytes); |
126 | 409k | INT_COUNTER_METRIC_REGISTER(_metric_entity, flush_finish_count); |
127 | | |
128 | | // construct _timestamped_versioned_tracker from rs and stale rs meta |
129 | 409k | _timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas(), |
130 | 409k | _tablet_meta->all_stale_rs_metas()); |
131 | 409k | _row_binlog_version_tracker.construct_versioned_tracker( |
132 | 409k | _tablet_meta->all_row_binlog_rs_metas()); |
133 | | |
134 | | // if !_tablet_meta->all_rs_metas()[0]->tablet_schema(), |
135 | | // that mean the tablet_meta is still no upgrade to doris 1.2 versions. |
136 | | // Before doris 1.2 version, rowset metas don't have tablet schema. |
137 | | // And when upgrade to doris 1.2 version, |
138 | | // all rowset metas will be set the tablet schmea from tablet meta. |
139 | 409k | if (_tablet_meta->all_rs_metas().empty() || |
140 | 409k | !_tablet_meta->all_rs_metas().begin()->second->tablet_schema()) { |
141 | 127k | _max_version_schema = _tablet_meta->tablet_schema(); |
142 | 282k | } else { |
143 | 282k | std::vector<RowsetMetaSharedPtr> rowset_metas(_tablet_meta->all_rs_metas().size()); |
144 | 282k | std::transform(_tablet_meta->all_rs_metas().begin(), _tablet_meta->all_rs_metas().end(), |
145 | 485k | rowset_metas.begin(), [](const auto& it) { return it.second; }); |
146 | 282k | _max_version_schema = tablet_schema_with_merged_max_schema_version(rowset_metas); |
147 | 282k | } |
148 | 409k | DCHECK(_max_version_schema); |
149 | 409k | g_total_tablet_num << 1; |
150 | 409k | } |
151 | | |
152 | 170k | BaseTablet::~BaseTablet() { |
153 | 170k | DorisMetrics::instance()->metric_registry()->deregister_entity(_metric_entity); |
154 | 170k | g_total_tablet_num << -1; |
155 | 170k | } |
156 | | |
157 | | TabletSchemaSPtr BaseTablet::tablet_schema_with_merged_max_schema_version( |
158 | 292k | const std::vector<RowsetMetaSharedPtr>& rowset_metas) { |
159 | 292k | RowsetMetaSharedPtr max_schema_version_rs = *std::max_element( |
160 | 292k | rowset_metas.begin(), rowset_metas.end(), [](const auto& a, const auto& b) -> bool { |
161 | 270k | return !a->tablet_schema() |
162 | 270k | ? true |
163 | 270k | : (!b->tablet_schema() |
164 | 270k | ? false |
165 | 270k | : a->tablet_schema()->schema_version() < |
166 | 270k | b->tablet_schema()->schema_version()); |
167 | 270k | }); |
168 | 292k | return max_schema_version_rs->tablet_schema(); |
169 | 292k | } |
170 | | |
171 | 15.4k | Status BaseTablet::set_tablet_state(TabletState state) { |
172 | 15.4k | if (_tablet_meta->tablet_state() == TABLET_SHUTDOWN && state != TABLET_SHUTDOWN) { |
173 | 0 | return Status::Error<META_INVALID_ARGUMENT>( |
174 | 0 | "could not change tablet state from shutdown to {}", state); |
175 | 0 | } |
176 | 15.4k | _tablet_meta->set_tablet_state(state); |
177 | 15.4k | return Status::OK(); |
178 | 15.4k | } |
179 | | |
180 | 2.44k | void BaseTablet::update_max_version_schema(const TabletSchemaSPtr& tablet_schema) { |
181 | 2.44k | std::lock_guard wrlock(_meta_lock); |
182 | | // Double Check for concurrent update |
183 | 2.44k | if (!_max_version_schema || |
184 | 2.44k | tablet_schema->schema_version() > _max_version_schema->schema_version()) { |
185 | 1.92k | _max_version_schema = tablet_schema; |
186 | 1.92k | } |
187 | 2.44k | } |
188 | | |
189 | 318k | uint32_t BaseTablet::get_real_compaction_score() const { |
190 | 318k | std::shared_lock l(_meta_lock); |
191 | 318k | const auto& rs_metas = _tablet_meta->all_rs_metas(); |
192 | 653k | return std::accumulate(rs_metas.begin(), rs_metas.end(), 0, [](uint32_t score, const auto& it) { |
193 | 653k | return score + it.second->get_compaction_score(); |
194 | 653k | }); |
195 | 318k | } |
196 | | |
197 | | Status BaseTablet::capture_rs_readers_unlocked(const Versions& version_path, |
198 | 0 | std::vector<RowSetSplits>* rs_splits) const { |
199 | 0 | DCHECK(rs_splits != nullptr && rs_splits->empty()); |
200 | 0 | for (auto version : version_path) { |
201 | 0 | auto it = _rs_version_map.find(version); |
202 | 0 | if (it == _rs_version_map.end()) { |
203 | 0 | VLOG_NOTICE << "fail to find Rowset in rs_version for version. tablet=" << tablet_id() |
204 | 0 | << ", version='" << version.first << "-" << version.second; |
205 | |
|
206 | 0 | it = _stale_rs_version_map.find(version); |
207 | 0 | if (it == _stale_rs_version_map.end()) { |
208 | 0 | return Status::Error<CAPTURE_ROWSET_READER_ERROR>( |
209 | 0 | "fail to find Rowset in stale_rs_version for version. tablet={}, " |
210 | 0 | "version={}-{}", |
211 | 0 | tablet_id(), version.first, version.second); |
212 | 0 | } |
213 | 0 | } |
214 | 0 | RowsetReaderSharedPtr rs_reader; |
215 | 0 | auto res = it->second->create_reader(&rs_reader); |
216 | 0 | if (!res.ok()) { |
217 | 0 | return Status::Error<CAPTURE_ROWSET_READER_ERROR>( |
218 | 0 | "failed to create reader for rowset:{}", it->second->rowset_id().to_string()); |
219 | 0 | } |
220 | 0 | rs_splits->emplace_back(std::move(rs_reader)); |
221 | 0 | } |
222 | 0 | return Status::OK(); |
223 | 0 | } |
224 | | |
225 | | // snapshot manager may call this api to check if version exists, so that |
226 | | // the version maybe not exist |
227 | | RowsetSharedPtr BaseTablet::get_rowset_by_version(const Version& version, |
228 | 135k | bool find_in_stale) const { |
229 | 135k | auto iter = _rs_version_map.find(version); |
230 | 135k | if (iter == _rs_version_map.end()) { |
231 | 134k | if (find_in_stale) { |
232 | 0 | return get_stale_rowset_by_version(version); |
233 | 0 | } |
234 | 134k | return nullptr; |
235 | 134k | } |
236 | 1.47k | return iter->second; |
237 | 135k | } |
238 | | |
239 | 0 | RowsetSharedPtr BaseTablet::get_stale_rowset_by_version(const Version& version) const { |
240 | 0 | auto iter = _stale_rs_version_map.find(version); |
241 | 0 | if (iter == _stale_rs_version_map.end()) { |
242 | 0 | VLOG_NOTICE << "no rowset for version:" << version << ", tablet: " << tablet_id(); |
243 | 0 | return nullptr; |
244 | 0 | } |
245 | 0 | return iter->second; |
246 | 0 | } |
247 | | |
248 | | // Already under _meta_lock |
249 | 14.6k | RowsetSharedPtr BaseTablet::get_rowset_with_max_version() const { |
250 | 14.6k | Version max_version = _tablet_meta->max_version(); |
251 | 14.6k | if (max_version.first == -1) { |
252 | 0 | return nullptr; |
253 | 0 | } |
254 | | |
255 | 14.6k | auto iter = _rs_version_map.find(max_version); |
256 | 14.6k | if (iter == _rs_version_map.end()) { |
257 | 0 | DCHECK(false) << "invalid version:" << max_version; |
258 | 0 | return nullptr; |
259 | 0 | } |
260 | 14.6k | return iter->second; |
261 | 14.6k | } |
262 | | |
263 | 13 | Status BaseTablet::get_all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const { |
264 | 13 | std::shared_lock rlock(_meta_lock); |
265 | 13 | return get_all_rs_id_unlocked(max_version, rowset_ids); |
266 | 13 | } |
267 | | |
268 | | Status BaseTablet::get_all_rs_id_unlocked(int64_t max_version, |
269 | 156k | RowsetIdUnorderedSet* rowset_ids) const { |
270 | | // Ensure that the obtained versions of rowsets are continuous |
271 | 156k | Version spec_version(0, max_version); |
272 | 156k | Versions version_path; |
273 | 156k | auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path); |
274 | 156k | if (!st.ok()) [[unlikely]] { |
275 | 0 | return st; |
276 | 0 | } |
277 | | |
278 | 1.28M | for (auto& ver : version_path) { |
279 | 1.28M | if (ver.second == 1) { |
280 | | // [0-1] rowset is empty for each tablet, skip it |
281 | 156k | continue; |
282 | 156k | } |
283 | 1.13M | auto it = _rs_version_map.find(ver); |
284 | 1.13M | if (it == _rs_version_map.end()) { |
285 | 0 | return Status::Error<CAPTURE_ROWSET_ERROR, false>( |
286 | 0 | "fail to find Rowset for version. tablet={}, version={}", tablet_id(), |
287 | 0 | ver.to_string()); |
288 | 0 | } |
289 | 1.13M | rowset_ids->emplace(it->second->rowset_id()); |
290 | 1.13M | } |
291 | 156k | return Status::OK(); |
292 | 156k | } |
293 | | |
294 | 0 | Versions BaseTablet::get_missed_versions(int64_t spec_version) const { |
295 | 0 | DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version; |
296 | |
|
297 | 0 | Versions existing_versions; |
298 | 0 | { |
299 | 0 | std::shared_lock rdlock(_meta_lock); |
300 | 0 | for (const auto& [ver, _] : _tablet_meta->all_rs_metas()) { |
301 | 0 | existing_versions.emplace_back(ver); |
302 | 0 | } |
303 | 0 | } |
304 | 0 | return calc_missed_versions(spec_version, std::move(existing_versions)); |
305 | 0 | } |
306 | | |
307 | | Versions BaseTablet::get_missed_versions_unlocked(int64_t spec_version, |
308 | 1.61k | bool capture_row_binlog) const { |
309 | 1.61k | DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version; |
310 | | |
311 | 1.61k | Versions existing_versions; |
312 | 1.61k | const auto& rs_metas = capture_row_binlog ? _tablet_meta->all_row_binlog_rs_metas() |
313 | 1.61k | : _tablet_meta->all_rs_metas(); |
314 | 4.85k | for (const auto& [ver, _] : rs_metas) { |
315 | 4.85k | existing_versions.emplace_back(ver); |
316 | 4.85k | } |
317 | 1.61k | return calc_missed_versions(spec_version, std::move(existing_versions)); |
318 | 1.61k | } |
319 | | |
320 | 1 | void BaseTablet::_print_missed_versions(const Versions& missed_versions) const { |
321 | 1 | std::stringstream ss; |
322 | 1 | ss << tablet_id() << " has " << missed_versions.size() << " missed version:"; |
323 | | // print at most 10 version |
324 | 3 | for (int i = 0; i < 10 && i < missed_versions.size(); ++i) { |
325 | 2 | ss << missed_versions[i] << ","; |
326 | 2 | } |
327 | 1 | LOG(WARNING) << ss.str(); |
328 | 1 | } |
329 | | |
330 | 3.89k | bool BaseTablet::_reconstruct_version_tracker_if_necessary() { |
331 | 3.89k | double data_orphan_vertex_ratio = _timestamped_version_tracker.get_orphan_vertex_ratio(); |
332 | 3.89k | double row_binlog_orphan_vertex_ratio = _row_binlog_version_tracker.get_orphan_vertex_ratio(); |
333 | 3.89k | if (data_orphan_vertex_ratio >= config::tablet_version_graph_orphan_vertex_ratio) { |
334 | 3.86k | _timestamped_version_tracker.construct_versioned_tracker( |
335 | 3.86k | _tablet_meta->all_rs_metas(), _tablet_meta->all_stale_rs_metas()); |
336 | 3.86k | return true; |
337 | 3.86k | } else if (row_binlog_orphan_vertex_ratio >= config::tablet_version_graph_orphan_vertex_ratio) { |
338 | 0 | _row_binlog_version_tracker.construct_versioned_tracker( |
339 | 0 | _tablet_meta->all_row_binlog_rs_metas()); |
340 | 0 | return true; |
341 | 0 | } |
342 | 31 | return false; |
343 | 3.89k | } |
344 | | |
345 | | // should use this method to get a copy of current tablet meta |
346 | | // there are some rowset meta in local meta store and in in-memory tablet meta |
347 | | // but not in tablet meta in local meta store |
348 | | void BaseTablet::generate_tablet_meta_copy(TabletMeta& new_tablet_meta, |
349 | 1 | bool cloud_get_rowset_meta) const { |
350 | 1 | std::shared_lock rdlock(_meta_lock); |
351 | 1 | generate_tablet_meta_copy_unlocked(new_tablet_meta, cloud_get_rowset_meta); |
352 | 1 | } |
353 | | |
354 | | // this is a unlocked version of generate_tablet_meta_copy() |
355 | | // some method already hold the _meta_lock before calling this, |
356 | | // such as EngineCloneTask::_finish_clone -> tablet->revise_tablet_meta |
357 | | void BaseTablet::generate_tablet_meta_copy_unlocked(TabletMeta& new_tablet_meta, |
358 | 325 | bool cloud_get_rowset_meta) const { |
359 | 325 | TabletMetaPB tablet_meta_pb; |
360 | 325 | _tablet_meta->to_meta_pb(&tablet_meta_pb, cloud_get_rowset_meta); |
361 | 325 | new_tablet_meta.init_from_pb(tablet_meta_pb); |
362 | 325 | } |
363 | | |
364 | | Status BaseTablet::calc_delete_bitmap_between_segments( |
365 | | TabletSchemaSPtr schema, RowsetId rowset_id, |
366 | 15 | const std::vector<segment_v2::SegmentSharedPtr>& segments, DeleteBitmapPtr delete_bitmap) { |
367 | 15 | size_t const num_segments = segments.size(); |
368 | 15 | if (num_segments < 2) { |
369 | 15 | return Status::OK(); |
370 | 15 | } |
371 | | |
372 | 0 | OlapStopWatch watch; |
373 | 0 | size_t seq_col_length = 0; |
374 | 0 | if (schema->has_sequence_col()) { |
375 | 0 | auto seq_col_idx = schema->sequence_col_idx(); |
376 | 0 | seq_col_length = schema->column(seq_col_idx).length() + 1; |
377 | 0 | } |
378 | 0 | size_t rowid_length = 0; |
379 | 0 | if (!schema->cluster_key_uids().empty()) { |
380 | 0 | rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH; |
381 | 0 | } |
382 | |
|
383 | 0 | MergeIndexDeleteBitmapCalculator calculator; |
384 | 0 | RETURN_IF_ERROR(calculator.init(rowset_id, segments, seq_col_length, rowid_length)); |
385 | | |
386 | 0 | RETURN_IF_ERROR(calculator.calculate_all(delete_bitmap)); |
387 | | |
388 | 0 | delete_bitmap->add( |
389 | 0 | {rowset_id, DeleteBitmap::INVALID_SEGMENT_ID, DeleteBitmap::TEMP_VERSION_COMMON}, |
390 | 0 | DeleteBitmap::ROWSET_SENTINEL_MARK); |
391 | 0 | LOG(INFO) << fmt::format( |
392 | 0 | "construct delete bitmap between segments, " |
393 | 0 | "tablet: {}, rowset: {}, number of segments: {}, bitmap count: {}, bitmap cardinality: " |
394 | 0 | "{}, cost {} (us)", |
395 | 0 | tablet_id(), rowset_id.to_string(), num_segments, |
396 | 0 | delete_bitmap->get_delete_bitmap_count(), delete_bitmap->cardinality(), |
397 | 0 | watch.get_elapse_time_us()); |
398 | 0 | return Status::OK(); |
399 | 0 | } |
400 | | |
401 | | std::vector<RowsetSharedPtr> BaseTablet::get_rowset_by_ids( |
402 | 183k | const RowsetIdUnorderedSet* specified_rowset_ids) { |
403 | 183k | std::vector<RowsetSharedPtr> rowsets; |
404 | 1.53M | for (auto& rs : _rs_version_map) { |
405 | 1.53M | if (!specified_rowset_ids || |
406 | 1.53M | specified_rowset_ids->find(rs.second->rowset_id()) != specified_rowset_ids->end()) { |
407 | 697k | rowsets.push_back(rs.second); |
408 | 697k | } |
409 | 1.53M | } |
410 | | |
411 | 2.36M | std::sort(rowsets.begin(), rowsets.end(), [](RowsetSharedPtr& lhs, RowsetSharedPtr& rhs) { |
412 | 2.36M | return lhs->end_version() > rhs->end_version(); |
413 | 2.36M | }); |
414 | 183k | return rowsets; |
415 | 183k | } |
416 | | |
417 | | Status BaseTablet::lookup_row_data(const Slice& encoded_key, const RowLocation& row_location, |
418 | | RowsetSharedPtr input_rowset, OlapReaderStatistics& stats, |
419 | 4.61k | std::string& values, bool write_to_cache) { |
420 | 4.61k | MonotonicStopWatch watch; |
421 | 4.61k | size_t row_size = 1; |
422 | 4.61k | watch.start(); |
423 | 4.61k | Defer _defer([&]() { |
424 | 4.61k | LOG_EVERY_N(INFO, 500) << "get a single_row, cost(us):" << watch.elapsed_time() / 1000 |
425 | 10 | << ", row_size:" << row_size; |
426 | 4.61k | }); |
427 | | |
428 | 4.61k | BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(input_rowset); |
429 | 4.61k | CHECK(rowset); |
430 | 4.61k | const TabletSchemaSPtr tablet_schema = rowset->tablet_schema(); |
431 | 4.61k | SegmentCacheHandle segment_cache_handle; |
432 | 4.61k | std::unique_ptr<segment_v2::ColumnIterator> column_iterator; |
433 | 4.61k | const auto& column = *DORIS_TRY(tablet_schema->column(BeConsts::ROW_STORE_COL)); |
434 | 4.61k | RETURN_IF_ERROR(_get_segment_column_iterator(rowset, row_location.segment_id, column, |
435 | 4.61k | &segment_cache_handle, &column_iterator, &stats)); |
436 | | // get and parse tuple row |
437 | 4.61k | MutableColumnPtr column_ptr = ColumnString::create(); |
438 | 4.61k | std::vector<segment_v2::rowid_t> rowids {static_cast<segment_v2::rowid_t>(row_location.row_id)}; |
439 | 4.61k | RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 1, column_ptr)); |
440 | 4.61k | assert(column_ptr->size() == 1); |
441 | 4.61k | auto* string_column = static_cast<ColumnString*>(column_ptr.get()); |
442 | 4.61k | StringRef value = string_column->get_data_at(0); |
443 | 4.61k | values = value.to_string(); |
444 | 4.61k | if (write_to_cache) { |
445 | 65 | RowCache::instance()->insert({tablet_id(), encoded_key}, Slice {value.data, value.size}); |
446 | 65 | } |
447 | 4.61k | return Status::OK(); |
448 | 4.61k | } |
449 | | |
450 | | Status BaseTablet::lookup_row_key(const Slice& encoded_key, TabletSchema* latest_schema, |
451 | | bool with_seq_col, |
452 | | const std::vector<RowsetSharedPtr>& specified_rowsets, |
453 | | RowLocation* row_location, int64_t version, |
454 | | std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches, |
455 | | RowsetSharedPtr* rowset, bool with_rowid, |
456 | | std::string* encoded_seq_value, OlapReaderStatistics* stats, |
457 | 3.78M | DeleteBitmapPtr delete_bitmap) { |
458 | 3.78M | SCOPED_BVAR_LATENCY(g_tablet_lookup_rowkey_latency); |
459 | 3.78M | size_t seq_col_length = 0; |
460 | | // use the latest tablet schema to decide if the tablet has sequence column currently |
461 | 3.78M | const TabletSchema* schema = |
462 | 3.78M | (latest_schema == nullptr ? _tablet_meta->tablet_schema().get() : latest_schema); |
463 | 3.78M | if (schema->has_sequence_col() && with_seq_col) { |
464 | 23.3k | seq_col_length = schema->column(schema->sequence_col_idx()).length() + 1; |
465 | 23.3k | } |
466 | 3.78M | size_t rowid_length = 0; |
467 | 3.80M | if (with_rowid && !schema->cluster_key_uids().empty()) { |
468 | 66.2k | rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH; |
469 | 66.2k | } |
470 | 3.78M | Slice key_without_seq = |
471 | 3.78M | Slice(encoded_key.get_data(), encoded_key.get_size() - seq_col_length - rowid_length); |
472 | 3.78M | RowLocation loc; |
473 | | |
474 | 3.78M | auto tablet_delete_bitmap = |
475 | 3.78M | delete_bitmap == nullptr ? _tablet_meta->delete_bitmap_ptr() : delete_bitmap; |
476 | 4.19M | for (size_t i = 0; i < specified_rowsets.size(); i++) { |
477 | 4.18M | const auto& rs = specified_rowsets[i]; |
478 | 4.18M | std::vector<KeyBoundsPB> segments_key_bounds; |
479 | 4.18M | rs->rowset_meta()->get_segments_key_bounds(&segments_key_bounds); |
480 | 4.18M | int num_segments = cast_set<int>(rs->num_segments()); |
481 | | // MOW lookup requires per-segment bounds. Aggregation must be disabled |
482 | | // for MOW writers, but enforce at runtime too — indexing segments_key_bounds[j] |
483 | | // below would be out-of-bounds otherwise. |
484 | 4.18M | if (UNLIKELY(rs->rowset_meta()->is_segments_key_bounds_aggregated() || |
485 | 4.18M | static_cast<int>(segments_key_bounds.size()) != num_segments)) { |
486 | 0 | return Status::InternalError( |
487 | 0 | "MOW lookup got rowset with inconsistent segments_key_bounds, rowset_id={}, " |
488 | 0 | "aggregated={}, bounds_size={}, num_segments={}", |
489 | 0 | rs->rowset_id().to_string(), |
490 | 0 | rs->rowset_meta()->is_segments_key_bounds_aggregated(), |
491 | 0 | segments_key_bounds.size(), num_segments); |
492 | 0 | } |
493 | 4.18M | std::vector<uint32_t> picked_segments; |
494 | 8.15M | for (int j = num_segments - 1; j >= 0; j--) { |
495 | 3.97M | if (_key_is_not_in_segment(key_without_seq, segments_key_bounds[j], |
496 | 3.97M | rs->rowset_meta()->is_segments_key_bounds_truncated())) { |
497 | 17.5k | continue; |
498 | 17.5k | } |
499 | 3.95M | picked_segments.emplace_back(j); |
500 | 3.95M | } |
501 | 4.18M | if (picked_segments.empty()) { |
502 | 214k | continue; |
503 | 214k | } |
504 | | |
505 | 3.97M | if (UNLIKELY(segment_caches[i] == nullptr)) { |
506 | 81.7k | segment_caches[i] = std::make_unique<SegmentCacheHandle>(); |
507 | 81.7k | RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( |
508 | 81.7k | std::static_pointer_cast<BetaRowset>(rs), segment_caches[i].get(), true, true)); |
509 | 81.7k | } |
510 | 3.97M | auto& segments = segment_caches[i]->get_segments(); |
511 | 3.97M | DCHECK_EQ(segments.size(), num_segments); |
512 | | |
513 | 3.97M | for (auto id : picked_segments) { |
514 | 3.96M | Status s = segments[id]->lookup_row_key(encoded_key, schema, with_seq_col, with_rowid, |
515 | 3.96M | &loc, stats, encoded_seq_value); |
516 | 3.96M | if (s.is<KEY_NOT_FOUND>()) { |
517 | 187k | continue; |
518 | 187k | } |
519 | 3.78M | if (!s.ok() && !s.is<KEY_ALREADY_EXISTS>()) { |
520 | 0 | return s; |
521 | 0 | } |
522 | 3.78M | if (s.ok() && tablet_delete_bitmap->contains_agg_with_cache_if_eligible( |
523 | 3.77M | {loc.rowset_id, loc.segment_id, version}, loc.row_id)) { |
524 | | // if has sequence col, we continue to compare the sequence_id of |
525 | | // all rowsets, util we find an existing key. |
526 | 339 | if (schema->has_sequence_col()) { |
527 | 189 | continue; |
528 | 189 | } |
529 | | // The key is deleted, we don't need to search for it any more. |
530 | 150 | break; |
531 | 339 | } |
532 | | // `st` is either OK or KEY_ALREADY_EXISTS now. |
533 | | // for partial update, even if the key is already exists, we still need to |
534 | | // read it's original values to keep all columns align. |
535 | 3.78M | *row_location = loc; |
536 | 3.78M | if (rowset) { |
537 | | // return it's rowset |
538 | 3.78M | *rowset = rs; |
539 | 3.78M | } |
540 | | // find it and return |
541 | 3.78M | return s; |
542 | 3.78M | } |
543 | 3.97M | } |
544 | 4.04k | g_tablet_pk_not_found << 1; |
545 | 4.04k | return Status::Error<ErrorCode::KEY_NOT_FOUND>("can't find key in all rowsets"); |
546 | 3.78M | } |
547 | | |
548 | | // if user pass a token, then all calculation works will submit to a threadpool, |
549 | | // user can get all delete bitmaps from that token. |
550 | | // if `token` is nullptr, the calculation will run in local, and user can get the result |
551 | | // delete bitmap from `delete_bitmap` directly. |
552 | | Status BaseTablet::calc_delete_bitmap(const BaseTabletSPtr& tablet, RowsetSharedPtr rowset, |
553 | | const std::vector<segment_v2::SegmentSharedPtr>& segments, |
554 | | const std::vector<RowsetSharedPtr>& specified_rowsets, |
555 | | DeleteBitmapPtr delete_bitmap, int64_t end_version, |
556 | | CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer, |
557 | 116k | DeleteBitmapPtr tablet_delete_bitmap) { |
558 | 116k | if (specified_rowsets.empty() || segments.empty()) { |
559 | 88.0k | return Status::OK(); |
560 | 88.0k | } |
561 | | |
562 | 28.2k | OlapStopWatch watch; |
563 | 28.9k | for (const auto& segment : segments) { |
564 | 28.9k | const auto& seg = segment; |
565 | 28.9k | if (token != nullptr) { |
566 | 6.15k | RETURN_IF_ERROR(token->submit(tablet, rowset, seg, specified_rowsets, end_version, |
567 | 6.15k | delete_bitmap, rowset_writer, tablet_delete_bitmap)); |
568 | 22.7k | } else { |
569 | 22.7k | RETURN_IF_ERROR(tablet->calc_segment_delete_bitmap( |
570 | 22.7k | rowset, segment, specified_rowsets, delete_bitmap, end_version, rowset_writer, |
571 | 22.7k | tablet_delete_bitmap)); |
572 | 22.7k | } |
573 | 28.9k | } |
574 | | |
575 | 28.2k | return Status::OK(); |
576 | 28.2k | } |
577 | | |
578 | | Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, |
579 | | const segment_v2::SegmentSharedPtr& seg, |
580 | | const std::vector<RowsetSharedPtr>& specified_rowsets, |
581 | | DeleteBitmapPtr delete_bitmap, int64_t end_version, |
582 | | RowsetWriter* rowset_writer, |
583 | 28.9k | DeleteBitmapPtr tablet_delete_bitmap) { |
584 | 28.9k | OlapStopWatch watch; |
585 | 28.9k | auto rowset_id = rowset->rowset_id(); |
586 | 28.9k | Version dummy_version(end_version + 1, end_version + 1); |
587 | 28.9k | auto rowset_schema = rowset->tablet_schema(); |
588 | | |
589 | 28.9k | PartialUpdateInfo* partial_update_info = |
590 | 28.9k | rowset_writer != nullptr ? rowset_writer->get_partial_update_info().get() : nullptr; |
591 | 28.9k | bool is_partial_update = partial_update_info && partial_update_info->is_partial_update(); |
592 | 28.9k | bool need_rewrite_conflict = partial_update_info != nullptr; |
593 | | // `have_input_seq_column` is for fixed partial update only. For flexible partial update, we should use |
594 | | // the skip bitmap to determine wheather a row has specified the sequence column |
595 | 28.9k | bool have_input_seq_column = false; |
596 | | // `rids_be_overwritten` is for flexible partial update only, it records row ids that is overwritten by |
597 | | // another row with higher seqeucne value |
598 | 28.9k | std::set<uint32_t> rids_be_overwritten; |
599 | 28.9k | if (is_partial_update) { |
600 | 103 | if (partial_update_info->is_fixed_partial_update() && rowset_schema->has_sequence_col()) { |
601 | 4 | std::vector<uint32_t> including_cids = |
602 | 4 | rowset_writer->get_partial_update_info()->update_cids; |
603 | 4 | have_input_seq_column = |
604 | 4 | rowset_schema->has_sequence_col() && |
605 | 4 | (std::find(including_cids.cbegin(), including_cids.cend(), |
606 | 4 | rowset_schema->sequence_col_idx()) != including_cids.cend()); |
607 | 4 | } |
608 | 102 | } |
609 | | |
610 | 28.9k | if (rowset_schema->num_variant_columns() > 0) { |
611 | | // During partial updates, the extracted columns of a variant should not be included in the rowset schema. |
612 | | // This is because the partial update for a variant needs to ignore the extracted columns. |
613 | | // Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update, |
614 | | // the complete variant is constructed by reading all the sub-columns of the variant. |
615 | 4.34k | rowset_schema = rowset_schema->copy_without_variant_extracted_columns(); |
616 | 4.34k | } |
617 | | // use for partial update |
618 | 28.9k | FixedReadPlan read_plan_ori; |
619 | 28.9k | FixedReadPlan read_plan_update; |
620 | 28.9k | int64_t conflict_rows = 0; |
621 | 28.9k | int64_t new_generated_rows = 0; |
622 | | |
623 | 28.9k | std::map<RowsetId, RowsetSharedPtr> rsid_to_rowset; |
624 | 28.9k | rsid_to_rowset[rowset_id] = rowset; |
625 | 28.9k | Block block = rowset_schema->create_block(); |
626 | 28.9k | Block ordered_block = block.clone_empty(); |
627 | 28.9k | uint32_t pos = 0; |
628 | | |
629 | 28.9k | RETURN_IF_ERROR(seg->load_pk_index_and_bf(nullptr)); // We need index blocks to iterate |
630 | 28.9k | const auto* pk_idx = seg->get_primary_key_index(); |
631 | 28.9k | int64_t total = pk_idx->num_rows(); |
632 | 28.9k | uint32_t row_id = 0; |
633 | 28.9k | int64_t remaining = total; |
634 | 28.9k | bool exact_match = false; |
635 | 28.9k | std::string last_key; |
636 | 28.9k | int batch_size = 1024; |
637 | | // The data for each segment may be lookup multiple times. Creating a SegmentCacheHandle |
638 | | // will update the lru cache, and there will be obvious lock competition in multithreading |
639 | | // scenarios, so using a segment_caches to cache SegmentCacheHandle. |
640 | 28.9k | std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size()); |
641 | 60.8k | while (remaining > 0) { |
642 | 31.9k | std::unique_ptr<segment_v2::IndexedColumnIterator> iter; |
643 | 31.9k | RETURN_IF_ERROR(pk_idx->new_iterator(&iter, nullptr)); |
644 | | |
645 | 31.9k | size_t num_to_read = std::min<int64_t>(batch_size, remaining); |
646 | 31.9k | auto index_type = DataTypeFactory::instance().create_data_type(pk_idx->type(), 1, 0); |
647 | 31.9k | auto index_column = index_type->create_column(); |
648 | 31.9k | Slice last_key_slice(last_key); |
649 | 31.9k | RETURN_IF_ERROR(iter->seek_at_or_after(&last_key_slice, &exact_match)); |
650 | 31.9k | auto current_ordinal = iter->get_current_ordinal(); |
651 | 18.4E | DCHECK(total == remaining + current_ordinal) |
652 | 18.4E | << "total: " << total << ", remaining: " << remaining |
653 | 18.4E | << ", current_ordinal: " << current_ordinal; |
654 | | |
655 | 31.9k | size_t num_read = num_to_read; |
656 | 31.9k | RETURN_IF_ERROR(iter->next_batch(&num_read, index_column)); |
657 | 18.4E | DCHECK(num_to_read == num_read) |
658 | 18.4E | << "num_to_read: " << num_to_read << ", num_read: " << num_read; |
659 | 31.9k | last_key = index_column->get_data_at(num_read - 1).to_string(); |
660 | | |
661 | | // exclude last_key, last_key will be read in next batch. |
662 | 31.9k | if (num_read == batch_size && num_read != remaining) { |
663 | 3.50k | num_read -= 1; |
664 | 3.50k | } |
665 | 3.82M | for (size_t i = 0; i < num_read; i++, row_id++) { |
666 | 3.78M | Slice key = Slice(index_column->get_data_at(i).data, index_column->get_data_at(i).size); |
667 | 3.78M | RowLocation loc; |
668 | | // calculate row id |
669 | 3.78M | if (!_tablet_meta->tablet_schema()->cluster_key_uids().empty()) { |
670 | 66.2k | size_t seq_col_length = 0; |
671 | 66.2k | if (_tablet_meta->tablet_schema()->has_sequence_col()) { |
672 | 11.9k | seq_col_length = |
673 | 11.9k | _tablet_meta->tablet_schema() |
674 | 11.9k | ->column(_tablet_meta->tablet_schema()->sequence_col_idx()) |
675 | 11.9k | .length() + |
676 | 11.9k | 1; |
677 | 11.9k | } |
678 | 66.2k | size_t rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH; |
679 | 66.2k | Slice key_without_seq = |
680 | 66.2k | Slice(key.get_data(), key.get_size() - seq_col_length - rowid_length); |
681 | 66.2k | Slice rowid_slice = |
682 | 66.2k | Slice(key.get_data() + key_without_seq.get_size() + seq_col_length + 1, |
683 | 66.2k | rowid_length - 1); |
684 | 66.2k | const auto* rowid_coder = get_key_coder(FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT); |
685 | 66.2k | RETURN_IF_ERROR(rowid_coder->decode_ascending(&rowid_slice, rowid_length, |
686 | 66.2k | (uint8_t*)&row_id)); |
687 | 66.2k | } |
688 | | // same row in segments should be filtered |
689 | 3.78M | if (delete_bitmap->contains({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}, |
690 | 3.78M | row_id)) { |
691 | 4 | continue; |
692 | 4 | } |
693 | | |
694 | 3.78M | DBUG_EXECUTE_IF("BaseTablet::calc_segment_delete_bitmap.inject_err", { |
695 | 3.78M | auto p = dp->param("percent", 0.01); |
696 | 3.78M | std::mt19937 gen {std::random_device {}()}; |
697 | 3.78M | std::bernoulli_distribution inject_fault {p}; |
698 | 3.78M | if (inject_fault(gen)) { |
699 | 3.78M | return Status::InternalError( |
700 | 3.78M | "injection error in calc_segment_delete_bitmap, " |
701 | 3.78M | "tablet_id={}, rowset_id={}", |
702 | 3.78M | tablet_id(), rowset_id.to_string()); |
703 | 3.78M | } |
704 | 3.78M | }); |
705 | | |
706 | 3.78M | RowsetSharedPtr rowset_find; |
707 | 3.78M | Status st = Status::OK(); |
708 | 3.78M | if (tablet_delete_bitmap == nullptr) { |
709 | 2.30M | st = lookup_row_key(key, rowset_schema.get(), true, specified_rowsets, &loc, |
710 | 2.30M | dummy_version.first - 1, segment_caches, &rowset_find); |
711 | 2.30M | } else { |
712 | 1.47M | st = lookup_row_key(key, rowset_schema.get(), true, specified_rowsets, &loc, |
713 | 1.47M | dummy_version.first - 1, segment_caches, &rowset_find, true, |
714 | 1.47M | nullptr, nullptr, tablet_delete_bitmap); |
715 | 1.47M | } |
716 | 3.78M | bool expected_st = st.ok() || st.is<KEY_NOT_FOUND>() || st.is<KEY_ALREADY_EXISTS>(); |
717 | | // It's a defensive DCHECK, we need to exclude some common errors to avoid core-dump |
718 | | // while stress test |
719 | 3.78M | DCHECK(expected_st || st.is<MEM_LIMIT_EXCEEDED>()) |
720 | 3.18k | << "unexpected error status while lookup_row_key:" << st; |
721 | 3.78M | if (!expected_st) { |
722 | 0 | return st; |
723 | 0 | } |
724 | 3.78M | if (st.is<KEY_NOT_FOUND>()) { |
725 | 36.0k | continue; |
726 | 36.0k | } |
727 | | |
728 | 3.75M | ++conflict_rows; |
729 | 3.75M | if (st.is<KEY_ALREADY_EXISTS>() && |
730 | 3.75M | (!is_partial_update || |
731 | 291 | (partial_update_info->is_fixed_partial_update() && have_input_seq_column))) { |
732 | | // `st.is<KEY_ALREADY_EXISTS>()` means that there exists a row with the same key and larger value |
733 | | // in seqeunce column. |
734 | | // - If the current load is not a partial update, we just delete current row. |
735 | | // - Otherwise, it means that we are doing the alignment process in publish phase due to conflicts |
736 | | // during concurrent partial updates. And there exists another load which introduces a row with |
737 | | // the same keys and larger sequence column value published successfully after the commit phase |
738 | | // of the current load. |
739 | | // - If the columns we update include sequence column, we should delete the current row becase the |
740 | | // partial update on the current row has been `overwritten` by the previous one with larger sequence |
741 | | // column value. |
742 | | // - Otherwise, we should combine the values of the missing columns in the previous row and the values |
743 | | // of the including columns in the current row into a new row. |
744 | 286 | delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}, |
745 | 286 | row_id); |
746 | 286 | continue; |
747 | | // NOTE: for partial update which doesn't specify the sequence column, we can't use the sequence column value filled in flush phase |
748 | | // as its final value. Otherwise it may cause inconsistency between replicas. |
749 | 286 | } |
750 | 3.75M | if (need_rewrite_conflict) { |
751 | | // In publish version, record rows to be deleted for concurrent update |
752 | | // For example, if version 5 and 6 update a row, but version 6 only see |
753 | | // version 4 when write, and when publish version, version 5's value will |
754 | | // be marked as deleted and it's update is losed. |
755 | | // So here we should read version 5's columns and build a new row, which is |
756 | | // consists of version 6's update columns and version 5's origin columns |
757 | | // here we build 2 read plan for ori values and update values |
758 | | |
759 | | // - for fixed partial update, we should read update columns from current load's rowset |
760 | | // and read missing columns from previous rowsets to create the final block |
761 | | // - for flexible partial update, we should read all columns from current load's rowset |
762 | | // and read non sort key columns from previous rowsets to create the final block |
763 | | // - for upsert rewrite, we should read all columns from current load's rowset |
764 | | // So we only need to record rows to read for both mode partial update and upsert rewrite |
765 | 83 | if (is_partial_update) { |
766 | 83 | read_plan_ori.prepare_to_read(loc, pos); |
767 | 83 | } |
768 | 83 | read_plan_update.prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos); |
769 | | |
770 | | // For flexible partial update, we should use skip bitmap to determine wheather |
771 | | // a row has specified the sequence column. But skip bitmap should be read from the segment. |
772 | | // So we record these row ids and process and filter them in `generate_new_block_for_flexible_partial_update()` |
773 | 83 | if (st.is<KEY_ALREADY_EXISTS>() && |
774 | 83 | partial_update_info->is_flexible_partial_update()) { |
775 | 0 | rids_be_overwritten.insert(pos); |
776 | 0 | } |
777 | | |
778 | 83 | rsid_to_rowset[rowset_find->rowset_id()] = rowset_find; |
779 | 83 | ++pos; |
780 | | |
781 | | // delete bitmap will be calculate when memtable flush and |
782 | | // publish. The two stages may see different versions. |
783 | | // When there is sequence column, the currently imported data |
784 | | // of rowset may be marked for deletion at memtablet flush or |
785 | | // publish because the seq column is smaller than the previous |
786 | | // rowset. |
787 | | // just set 0 as a unified temporary version number, and update to |
788 | | // the real version number later. |
789 | 83 | delete_bitmap->add( |
790 | 83 | {loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, |
791 | 83 | loc.row_id); |
792 | 83 | delete_bitmap->add({rowset_id, seg->id(), DeleteBitmap::TEMP_VERSION_COMMON}, |
793 | 83 | row_id); |
794 | 83 | ++new_generated_rows; |
795 | 83 | continue; |
796 | 83 | } |
797 | | // when st = ok |
798 | 3.75M | delete_bitmap->add({loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, |
799 | 3.75M | loc.row_id); |
800 | 3.75M | } |
801 | 31.9k | remaining -= num_read; |
802 | 31.9k | } |
803 | | // DCHECK_EQ(total, row_id) << "segment total rows: " << total << " row_id:" << row_id; |
804 | | |
805 | 28.9k | if (config::enable_merge_on_write_correctness_check) { |
806 | 28.8k | RowsetIdUnorderedSet rowsetids; |
807 | 227k | for (const auto& specified_rowset : specified_rowsets) { |
808 | 227k | rowsetids.emplace(specified_rowset->rowset_id()); |
809 | 227k | VLOG_NOTICE << "[tabletID:" << tablet_id() << "]" |
810 | 65 | << "[add_sentinel_mark_to_delete_bitmap][end_version:" << end_version << "]" |
811 | 65 | << "add:" << specified_rowset->rowset_id(); |
812 | 227k | } |
813 | 28.8k | add_sentinel_mark_to_delete_bitmap(delete_bitmap.get(), rowsetids); |
814 | 28.8k | } |
815 | | |
816 | 28.9k | if (pos > 0) { |
817 | 19 | DCHECK(partial_update_info != nullptr); |
818 | 19 | if (partial_update_info->is_flexible_partial_update()) { |
819 | 0 | RETURN_IF_ERROR(generate_new_block_for_flexible_partial_update( |
820 | 0 | rowset_schema, partial_update_info, rids_be_overwritten, read_plan_ori, |
821 | 0 | read_plan_update, rsid_to_rowset, &block)); |
822 | 19 | } else { |
823 | 19 | RETURN_IF_ERROR(generate_new_block_for_partial_update( |
824 | 19 | rowset_schema, partial_update_info, read_plan_ori, read_plan_update, |
825 | 19 | rsid_to_rowset, &block)); |
826 | 19 | } |
827 | 19 | RETURN_IF_ERROR(sort_block(block, ordered_block)); |
828 | | |
829 | | // Publish-phase partial update may flush transient segments to a GroupRowsetWriter. |
830 | | // For row-binlog writing, RowBinlogSegmentWriter requires `seg_id -> lsn_ids` to be |
831 | | // registered before the segment writer is constructed. |
832 | 19 | if (auto* group_writer = typeid_cast<GroupRowsetWriter*>(rowset_writer); |
833 | 19 | group_writer != nullptr) { |
834 | 0 | auto seg_id = group_writer->get_allocated_segment_id(); |
835 | 0 | auto binlog_writer = group_writer->row_binlog_writer(); |
836 | 0 | auto& binlog_ctx = const_cast<RowsetWriterContext&>(binlog_writer->context()); |
837 | 0 | if (binlog_ctx.write_binlog_opt().enable) { |
838 | 0 | auto db_id = binlog_writer->rowset_meta()->db_id(); |
839 | 0 | auto table_id = binlog_writer->rowset_meta()->table_id(); |
840 | 0 | DCHECK_GT(db_id, 0); |
841 | 0 | DCHECK_GT(table_id, 0); |
842 | 0 | auto lsn_buffer = GlobalAutoIncBuffers::GetInstance()->get_auto_inc_buffer( |
843 | 0 | db_id, table_id, kBinlogLsnAutoIncId); |
844 | 0 | std::shared_ptr<std::vector<int64_t>> lsn_ids; |
845 | 0 | RETURN_IF_ERROR(allocate_binlog_lsn(lsn_buffer, ordered_block.rows(), &lsn_ids)); |
846 | 0 | binlog_ctx.write_binlog_opt().write_binlog_config().insert_seg_lsn( |
847 | 0 | seg_id, std::move(lsn_ids)); |
848 | 0 | } |
849 | 0 | } |
850 | 19 | RETURN_IF_ERROR(rowset_writer->flush_single_block(&ordered_block)); |
851 | 19 | auto cost_us = watch.get_elapse_time_us(); |
852 | 19 | if (config::enable_mow_verbose_log || cost_us > 10 * 1000) { |
853 | 18 | LOG(INFO) << "calc segment delete bitmap for " |
854 | 18 | << partial_update_info->partial_update_mode_str() |
855 | 18 | << ", tablet: " << tablet_id() << " rowset: " << rowset_id |
856 | 18 | << " seg_id: " << seg->id() << " dummy_version: " << end_version + 1 |
857 | 18 | << " rows: " << seg->num_rows() << " conflict rows: " << conflict_rows |
858 | 18 | << " new generated rows: " << new_generated_rows |
859 | 18 | << " bitmap num: " << delete_bitmap->get_delete_bitmap_count() |
860 | 18 | << " bitmap cardinality: " << delete_bitmap->cardinality() |
861 | 18 | << " cost: " << cost_us << "(us)"; |
862 | 18 | } |
863 | 19 | return Status::OK(); |
864 | 19 | } |
865 | 28.9k | auto cost_us = watch.get_elapse_time_us(); |
866 | 28.9k | if (config::enable_mow_verbose_log || cost_us > 10 * 1000) { |
867 | 3.83k | LOG(INFO) << "calc segment delete bitmap, tablet: " << tablet_id() |
868 | 3.83k | << " rowset: " << rowset_id << " seg_id: " << seg->id() |
869 | 3.83k | << " dummy_version: " << end_version + 1 << " rows: " << seg->num_rows() |
870 | 3.83k | << " conflict rows: " << conflict_rows |
871 | 3.83k | << " bitmap num: " << delete_bitmap->get_delete_bitmap_count() |
872 | 3.83k | << " bitmap cardinality: " << delete_bitmap->cardinality() << " cost: " << cost_us |
873 | 3.83k | << "(us)"; |
874 | 3.83k | } |
875 | 28.9k | return Status::OK(); |
876 | 28.9k | } |
877 | | |
878 | 19 | Status BaseTablet::sort_block(Block& in_block, Block& output_block) { |
879 | 19 | MutableBlock mutable_input_block = MutableBlock::build_mutable_block(&in_block); |
880 | 19 | MutableBlock mutable_output_block = MutableBlock::build_mutable_block(&output_block); |
881 | | |
882 | 19 | std::shared_ptr<RowInBlockComparator> vec_row_comparator = |
883 | 19 | std::make_shared<RowInBlockComparator>(_tablet_meta->tablet_schema()); |
884 | 19 | vec_row_comparator->set_block(&mutable_input_block); |
885 | | |
886 | 19 | std::vector<std::unique_ptr<RowInBlock>> row_in_blocks; |
887 | 19 | DCHECK(in_block.rows() <= std::numeric_limits<int>::max()); |
888 | 19 | row_in_blocks.reserve(in_block.rows()); |
889 | 102 | for (size_t i = 0; i < in_block.rows(); ++i) { |
890 | 83 | row_in_blocks.emplace_back(std::make_unique<RowInBlock>(i)); |
891 | 83 | } |
892 | 19 | std::sort(row_in_blocks.begin(), row_in_blocks.end(), |
893 | 19 | [&](const std::unique_ptr<RowInBlock>& l, |
894 | 128 | const std::unique_ptr<RowInBlock>& r) -> bool { |
895 | 128 | auto value = (*vec_row_comparator)(l.get(), r.get()); |
896 | 128 | DCHECK(value != 0) << "value equel when sort block, l_pos: " << l->_row_pos |
897 | 0 | << " r_pos: " << r->_row_pos; |
898 | 128 | return value < 0; |
899 | 128 | }); |
900 | 19 | std::vector<uint32_t> row_pos_vec; |
901 | 19 | row_pos_vec.reserve(in_block.rows()); |
902 | 83 | for (auto& block : row_in_blocks) { |
903 | 83 | row_pos_vec.emplace_back(block->_row_pos); |
904 | 83 | } |
905 | 19 | return mutable_output_block.add_rows(&in_block, row_pos_vec.data(), |
906 | 19 | row_pos_vec.data() + in_block.rows()); |
907 | 19 | } |
908 | | |
909 | | // fetch value by row column |
910 | | Status BaseTablet::fetch_value_through_row_column(RowsetSharedPtr input_rowset, |
911 | | const TabletSchema& tablet_schema, uint32_t segid, |
912 | | const std::vector<uint32_t>& rowids, |
913 | 337 | const std::vector<uint32_t>& cids, Block& block) { |
914 | 337 | MonotonicStopWatch watch; |
915 | 337 | watch.start(); |
916 | 337 | Defer _defer([&]() { |
917 | 337 | LOG_EVERY_N(INFO, 500) << "fetch_value_by_rowids, cost(us):" << watch.elapsed_time() / 1000 |
918 | 1 | << ", row_batch_size:" << rowids.size(); |
919 | 337 | }); |
920 | | |
921 | 337 | BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(input_rowset); |
922 | 337 | CHECK(rowset); |
923 | 337 | CHECK(tablet_schema.has_row_store_for_all_columns()); |
924 | 337 | SegmentCacheHandle segment_cache_handle; |
925 | 337 | std::unique_ptr<segment_v2::ColumnIterator> column_iterator; |
926 | 337 | OlapReaderStatistics stats; |
927 | 337 | const auto& column = *DORIS_TRY(tablet_schema.column(BeConsts::ROW_STORE_COL)); |
928 | 337 | RETURN_IF_ERROR(_get_segment_column_iterator(rowset, segid, column, &segment_cache_handle, |
929 | 337 | &column_iterator, &stats)); |
930 | | // get and parse tuple row |
931 | 337 | MutableColumnPtr column_ptr = ColumnString::create(); |
932 | 337 | RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), rowids.size(), column_ptr)); |
933 | 337 | assert(column_ptr->size() == rowids.size()); |
934 | 337 | auto* string_column = static_cast<ColumnString*>(column_ptr.get()); |
935 | 337 | DataTypeSerDeSPtrs serdes; |
936 | 337 | serdes.resize(cids.size()); |
937 | 337 | std::unordered_map<uint32_t, uint32_t> col_uid_to_idx; |
938 | 337 | std::vector<std::string> default_values; |
939 | 337 | default_values.resize(cids.size()); |
940 | 3.73k | for (int i = 0; i < cids.size(); ++i) { |
941 | 3.39k | const TabletColumn& tablet_column = tablet_schema.column(cids[i]); |
942 | 3.39k | DataTypePtr type = DataTypeFactory::instance().create_data_type(tablet_column); |
943 | 3.39k | col_uid_to_idx[tablet_column.unique_id()] = i; |
944 | 3.39k | default_values[i] = tablet_column.default_value(); |
945 | 3.39k | serdes[i] = type->get_serde(); |
946 | 3.39k | } |
947 | 337 | RETURN_IF_ERROR(JsonbSerializeUtil::jsonb_to_block(serdes, *string_column, col_uid_to_idx, |
948 | 337 | block, default_values, {})); |
949 | 337 | return Status::OK(); |
950 | 337 | } |
951 | | |
952 | | Status BaseTablet::fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t segid, |
953 | | const std::vector<uint32_t>& rowids, |
954 | 3.61k | const TabletColumn& tablet_column, MutableColumnPtr& dst) { |
955 | 3.61k | MonotonicStopWatch watch; |
956 | 3.61k | watch.start(); |
957 | 3.61k | Defer _defer([&]() { |
958 | 3.61k | LOG_EVERY_N(INFO, 500) << "fetch_value_by_rowids, cost(us):" << watch.elapsed_time() / 1000 |
959 | 10 | << ", row_batch_size:" << rowids.size(); |
960 | 3.61k | }); |
961 | | |
962 | | // read row data |
963 | 3.61k | BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(input_rowset); |
964 | 3.61k | CHECK(rowset); |
965 | 3.61k | SegmentCacheHandle segment_cache_handle; |
966 | 3.61k | std::unique_ptr<segment_v2::ColumnIterator> column_iterator; |
967 | 3.61k | OlapReaderStatistics stats; |
968 | 3.61k | RETURN_IF_ERROR(_get_segment_column_iterator(rowset, segid, tablet_column, |
969 | 3.61k | &segment_cache_handle, &column_iterator, &stats)); |
970 | 3.61k | RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), rowids.size(), dst)); |
971 | 3.61k | return Status::OK(); |
972 | 3.61k | } |
973 | | |
974 | | const signed char* BaseTablet::get_delete_sign_column_data(const Block& block, |
975 | 3.63k | size_t rows_at_least) { |
976 | 3.63k | if (int pos = block.get_position_by_name(DELETE_SIGN); pos != -1) { |
977 | 3.62k | const ColumnWithTypeAndName& delete_sign_column = block.get_by_position(pos); |
978 | 3.62k | const auto& delete_sign_col = assert_cast<const ColumnInt8&>(*(delete_sign_column.column)); |
979 | 3.62k | if (delete_sign_col.size() >= rows_at_least) { |
980 | 3.06k | return delete_sign_col.get_data().data(); |
981 | 3.06k | } |
982 | 3.62k | } |
983 | 574 | return nullptr; |
984 | 3.63k | }; |
985 | | |
986 | | Status BaseTablet::generate_default_value_block(const TabletSchema& schema, |
987 | | const std::vector<uint32_t>& cids, |
988 | | const std::vector<std::string>& default_values, |
989 | | const Block& ref_block, |
990 | 1.65k | Block& default_value_block) { |
991 | 1.65k | auto mutable_default_value_columns = default_value_block.mutate_columns(); |
992 | 12.9k | for (auto i = 0; i < cids.size(); ++i) { |
993 | 11.3k | const auto& column = schema.column(cids[i]); |
994 | 11.3k | if (column.has_default_value()) { |
995 | 3.82k | const auto& default_value = default_values[i]; |
996 | 3.82k | StringRef str(default_value); |
997 | 3.82k | RETURN_IF_ERROR(ref_block.get_by_position(i).type->get_serde()->default_from_string( |
998 | 3.82k | str, *mutable_default_value_columns[i])); |
999 | 3.82k | } |
1000 | 11.3k | } |
1001 | 1.65k | default_value_block.set_columns(std::move(mutable_default_value_columns)); |
1002 | 1.65k | return Status::OK(); |
1003 | 1.65k | } |
1004 | | |
1005 | | Status BaseTablet::generate_new_block_for_partial_update( |
1006 | | TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, |
1007 | | const FixedReadPlan& read_plan_ori, const FixedReadPlan& read_plan_update, |
1008 | 19 | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, Block* output_block) { |
1009 | | // do partial update related works |
1010 | | // 1. read columns by read plan |
1011 | | // 2. generate new block |
1012 | | // 3. write a new segment and modify rowset meta |
1013 | | // 4. mark current keys deleted |
1014 | 19 | CHECK(output_block); |
1015 | 19 | auto full_mutable_columns = output_block->mutate_columns(); |
1016 | 19 | const auto& missing_cids = partial_update_info->missing_cids; |
1017 | 19 | const auto& update_cids = partial_update_info->update_cids; |
1018 | 19 | auto old_block = rowset_schema->create_block_by_cids(missing_cids); |
1019 | 19 | auto update_block = rowset_schema->create_block_by_cids(update_cids); |
1020 | | |
1021 | 19 | bool have_input_seq_column = false; |
1022 | 19 | if (rowset_schema->has_sequence_col()) { |
1023 | 4 | have_input_seq_column = |
1024 | 4 | (std::find(update_cids.cbegin(), update_cids.cend(), |
1025 | 4 | rowset_schema->sequence_col_idx()) != update_cids.cend()); |
1026 | 4 | } |
1027 | | |
1028 | | // rowid in the final block(start from 0, increase continuously) -> rowid to read in update_block |
1029 | 19 | std::map<uint32_t, uint32_t> read_index_update; |
1030 | | |
1031 | | // read current rowset first, if a row in the current rowset has delete sign mark |
1032 | | // we don't need to read values from old block |
1033 | 19 | RETURN_IF_ERROR(read_plan_update.read_columns_by_plan( |
1034 | 19 | *rowset_schema, update_cids, rsid_to_rowset, update_block, &read_index_update, false)); |
1035 | 19 | size_t update_rows = read_index_update.size(); |
1036 | 65 | for (auto i = 0; i < update_cids.size(); ++i) { |
1037 | 252 | for (auto idx = 0; idx < update_rows; ++idx) { |
1038 | 206 | full_mutable_columns[update_cids[i]]->insert_from( |
1039 | 206 | *update_block.get_by_position(i).column, read_index_update[idx]); |
1040 | 206 | } |
1041 | 46 | } |
1042 | | |
1043 | | // if there is sequence column in the table, we need to read the sequence column, |
1044 | | // otherwise it may cause the merge-on-read based compaction policy to produce incorrect results |
1045 | 19 | const auto* __restrict new_block_delete_signs = |
1046 | 19 | rowset_schema->has_sequence_col() |
1047 | 19 | ? nullptr |
1048 | 19 | : get_delete_sign_column_data(update_block, update_rows); |
1049 | | |
1050 | | // rowid in the final block(start from 0, increase, may not continuous becasue we skip to read some rows) -> rowid to read in old_block |
1051 | 19 | std::map<uint32_t, uint32_t> read_index_old; |
1052 | 19 | RETURN_IF_ERROR(read_plan_ori.read_columns_by_plan(*rowset_schema, missing_cids, rsid_to_rowset, |
1053 | 19 | old_block, &read_index_old, true, |
1054 | 19 | new_block_delete_signs)); |
1055 | 19 | size_t old_rows = read_index_old.size(); |
1056 | 19 | const auto* __restrict old_block_delete_signs = |
1057 | 19 | get_delete_sign_column_data(old_block, old_rows); |
1058 | 19 | DCHECK(old_block_delete_signs != nullptr); |
1059 | | // build default value block |
1060 | 19 | auto default_value_block = old_block.clone_empty(); |
1061 | 19 | RETURN_IF_ERROR(BaseTablet::generate_default_value_block(*rowset_schema, missing_cids, |
1062 | 19 | partial_update_info->default_values, |
1063 | 19 | old_block, default_value_block)); |
1064 | | |
1065 | 19 | CHECK(update_rows >= old_rows); |
1066 | | |
1067 | | // build full block |
1068 | 120 | for (auto i = 0; i < missing_cids.size(); ++i) { |
1069 | 101 | const auto& rs_column = rowset_schema->column(missing_cids[i]); |
1070 | 101 | auto& mutable_column = full_mutable_columns[missing_cids[i]]; |
1071 | 566 | for (auto idx = 0; idx < update_rows; ++idx) { |
1072 | | // There are two cases we don't need to read values from old data: |
1073 | | // 1. if the conflicting new row's delete sign is marked, which means the value columns |
1074 | | // of the row will not be read. So we don't need to read the missing values from the previous rows. |
1075 | | // 2. if the conflicting old row's delete sign is marked, which means that the key is not exist now, |
1076 | | // we should not read old values from the deleted data, and should use default value instead. |
1077 | | // NOTE: since now we are in the publishing phase, all data is commited |
1078 | | // before, even the `strict_mode` is true (which requires partial update |
1079 | | // load job can't insert new keys), this "new" key MUST be written into |
1080 | | // the new generated segment file. |
1081 | 465 | bool new_row_delete_sign = |
1082 | 465 | (new_block_delete_signs != nullptr && new_block_delete_signs[idx]); |
1083 | 465 | if (new_row_delete_sign) { |
1084 | 18 | mutable_column->insert_default(); |
1085 | 447 | } else { |
1086 | 447 | bool use_default = false; |
1087 | 447 | bool old_row_delete_sign = (old_block_delete_signs != nullptr && |
1088 | 447 | old_block_delete_signs[read_index_old.at(idx)] != 0); |
1089 | 447 | if (old_row_delete_sign) { |
1090 | 0 | if (!rowset_schema->has_sequence_col()) { |
1091 | 0 | use_default = true; |
1092 | 0 | } else if (have_input_seq_column || !rs_column.is_seqeunce_col()) { |
1093 | | // to keep the sequence column value not decreasing, we should read values of seq column |
1094 | | // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise |
1095 | | // it may cause the merge-on-read based compaction to produce incorrect results |
1096 | 0 | use_default = true; |
1097 | 0 | } |
1098 | 0 | } |
1099 | | |
1100 | 447 | if (use_default) { |
1101 | 0 | if (rs_column.has_default_value()) { |
1102 | 0 | mutable_column->insert_from(*default_value_block.get_by_position(i).column, |
1103 | 0 | 0); |
1104 | 0 | } else if (rs_column.is_nullable()) { |
1105 | 0 | assert_cast<ColumnNullable*, TypeCheckOnRelease::DISABLE>( |
1106 | 0 | mutable_column.get()) |
1107 | 0 | ->insert_default(); |
1108 | 0 | } else { |
1109 | 0 | mutable_column->insert_default(); |
1110 | 0 | } |
1111 | 447 | } else { |
1112 | 447 | mutable_column->insert_from(*old_block.get_by_position(i).column, |
1113 | 447 | read_index_old[idx]); |
1114 | 447 | } |
1115 | 447 | } |
1116 | 465 | } |
1117 | 101 | } |
1118 | 19 | output_block->set_columns(std::move(full_mutable_columns)); |
1119 | 19 | VLOG_DEBUG << "full block when publish: " << output_block->dump_data(); |
1120 | 19 | return Status::OK(); |
1121 | 19 | } |
1122 | | |
1123 | | static void fill_cell_for_flexible_partial_update( |
1124 | | std::map<uint32_t, uint32_t>& read_index_old, |
1125 | | std::map<uint32_t, uint32_t>& read_index_update, const TabletSchemaSPtr& rowset_schema, |
1126 | | const PartialUpdateInfo* partial_update_info, const TabletColumn& tablet_column, |
1127 | | std::size_t idx, MutableColumnPtr& new_col, const IColumn& default_value_col, |
1128 | | const IColumn& old_value_col, const IColumn& cur_col, bool skipped, |
1129 | 0 | bool row_has_sequence_col, const signed char* delete_sign_column_data) { |
1130 | 0 | if (skipped) { |
1131 | 0 | bool use_default = false; |
1132 | 0 | bool old_row_delete_sign = |
1133 | 0 | (delete_sign_column_data != nullptr && |
1134 | 0 | delete_sign_column_data[read_index_old[cast_set<uint32_t>(idx)]] != 0); |
1135 | 0 | if (old_row_delete_sign) { |
1136 | 0 | if (!rowset_schema->has_sequence_col()) { |
1137 | 0 | use_default = true; |
1138 | 0 | } else if (row_has_sequence_col || (!tablet_column.is_seqeunce_col() && |
1139 | 0 | (tablet_column.unique_id() != |
1140 | 0 | partial_update_info->sequence_map_col_uid()))) { |
1141 | | // to keep the sequence column value not decreasing, we should read values of seq column(and seq map column) |
1142 | | // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise |
1143 | | // it may cause the merge-on-read based compaction to produce incorrect results |
1144 | 0 | use_default = true; |
1145 | 0 | } |
1146 | 0 | } |
1147 | 0 | if (use_default) { |
1148 | 0 | if (tablet_column.has_default_value()) { |
1149 | 0 | new_col->insert_from(default_value_col, 0); |
1150 | 0 | } else if (tablet_column.is_nullable()) { |
1151 | 0 | assert_cast<ColumnNullable*, TypeCheckOnRelease::DISABLE>(new_col.get()) |
1152 | 0 | ->insert_many_defaults(1); |
1153 | 0 | } else if (tablet_column.is_auto_increment()) { |
1154 | | // For auto-increment column, its default value(generated value) is filled in current block in flush phase |
1155 | | // when the load doesn't specify the auto-increment column |
1156 | | // - if the previous conflicting row is deleted, we should use the value in current block as its final value |
1157 | | // - if the previous conflicting row is an insert, we should use the value in old block as its final value to |
1158 | | // keep consistency between replicas |
1159 | 0 | new_col->insert_from(cur_col, read_index_update[cast_set<uint32_t>(idx)]); |
1160 | 0 | } else { |
1161 | 0 | new_col->insert_default(); |
1162 | 0 | } |
1163 | 0 | } else { |
1164 | 0 | new_col->insert_from(old_value_col, read_index_old[cast_set<uint32_t>(idx)]); |
1165 | 0 | } |
1166 | 0 | } else { |
1167 | 0 | new_col->insert_from(cur_col, read_index_update[cast_set<uint32_t>(idx)]); |
1168 | 0 | } |
1169 | 0 | } |
1170 | | |
1171 | | Status BaseTablet::generate_new_block_for_flexible_partial_update( |
1172 | | TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, |
1173 | | std::set<uint32_t>& rids_be_overwritten, const FixedReadPlan& read_plan_ori, |
1174 | | const FixedReadPlan& read_plan_update, |
1175 | 0 | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, Block* output_block) { |
1176 | 0 | CHECK(output_block); |
1177 | |
|
1178 | 0 | int32_t seq_col_unique_id = -1; |
1179 | 0 | if (rowset_schema->has_sequence_col()) { |
1180 | 0 | seq_col_unique_id = rowset_schema->column(rowset_schema->sequence_col_idx()).unique_id(); |
1181 | 0 | } |
1182 | 0 | const auto& non_sort_key_cids = partial_update_info->missing_cids; |
1183 | 0 | std::vector<uint32_t> all_cids(rowset_schema->num_columns()); |
1184 | 0 | std::iota(all_cids.begin(), all_cids.end(), 0); |
1185 | 0 | auto old_block = rowset_schema->create_block_by_cids(non_sort_key_cids); |
1186 | 0 | auto update_block = rowset_schema->create_block_by_cids(all_cids); |
1187 | | |
1188 | | // rowid in the final block(start from 0, increase continuously) -> rowid to read in update_block |
1189 | 0 | std::map<uint32_t, uint32_t> read_index_update; |
1190 | | |
1191 | | // 1. read the current rowset first, if a row in the current rowset has delete sign mark |
1192 | | // we don't need to read values from old block for that row |
1193 | 0 | RETURN_IF_ERROR(read_plan_update.read_columns_by_plan(*rowset_schema, all_cids, rsid_to_rowset, |
1194 | 0 | update_block, &read_index_update, true)); |
1195 | 0 | size_t update_rows = read_index_update.size(); |
1196 | | |
1197 | | // TODO(bobhan1): add the delete sign optimazation here |
1198 | | // // if there is sequence column in the table, we need to read the sequence column, |
1199 | | // // otherwise it may cause the merge-on-read based compaction policy to produce incorrect results |
1200 | | // const auto* __restrict new_block_delete_signs = |
1201 | | // rowset_schema->has_sequence_col() |
1202 | | // ? nullptr |
1203 | | // : get_delete_sign_column_data(update_block, update_rows); |
1204 | | |
1205 | | // 2. read previous rowsets |
1206 | | // rowid in the final block(start from 0, increase, may not continuous becasue we skip to read some rows) -> rowid to read in old_block |
1207 | 0 | std::map<uint32_t, uint32_t> read_index_old; |
1208 | 0 | RETURN_IF_ERROR(read_plan_ori.read_columns_by_plan( |
1209 | 0 | *rowset_schema, non_sort_key_cids, rsid_to_rowset, old_block, &read_index_old, true)); |
1210 | 0 | size_t old_rows = read_index_old.size(); |
1211 | 0 | DCHECK(update_rows == old_rows); |
1212 | 0 | const auto* __restrict old_block_delete_signs = |
1213 | 0 | get_delete_sign_column_data(old_block, old_rows); |
1214 | 0 | DCHECK(old_block_delete_signs != nullptr); |
1215 | | |
1216 | | // 3. build default value block |
1217 | 0 | auto default_value_block = old_block.clone_empty(); |
1218 | 0 | RETURN_IF_ERROR(BaseTablet::generate_default_value_block(*rowset_schema, non_sort_key_cids, |
1219 | 0 | partial_update_info->default_values, |
1220 | 0 | old_block, default_value_block)); |
1221 | | |
1222 | | // 4. build the final block |
1223 | 0 | auto full_mutable_columns = output_block->mutate_columns(); |
1224 | 0 | DCHECK(rowset_schema->has_skip_bitmap_col()); |
1225 | 0 | auto skip_bitmap_col_idx = rowset_schema->skip_bitmap_col_idx(); |
1226 | 0 | const std::vector<BitmapValue>* skip_bitmaps = |
1227 | 0 | &(assert_cast<const ColumnBitmap*, TypeCheckOnRelease::DISABLE>( |
1228 | 0 | update_block.get_by_position(skip_bitmap_col_idx).column->get_ptr().get()) |
1229 | 0 | ->get_data()); |
1230 | |
|
1231 | 0 | if (rowset_schema->has_sequence_col() && !rids_be_overwritten.empty()) { |
1232 | | // If the row specifies the sequence column, we should delete the current row becase the |
1233 | | // flexible partial update on the current row has been `overwritten` by the previous one with larger sequence |
1234 | | // column value. |
1235 | 0 | for (auto it = rids_be_overwritten.begin(); it != rids_be_overwritten.end();) { |
1236 | 0 | auto rid = *it; |
1237 | 0 | if (!skip_bitmaps->at(rid).contains(seq_col_unique_id)) { |
1238 | 0 | ++it; |
1239 | 0 | } else { |
1240 | 0 | it = rids_be_overwritten.erase(it); |
1241 | 0 | } |
1242 | 0 | } |
1243 | 0 | } |
1244 | |
|
1245 | 0 | for (std::size_t cid {0}; cid < rowset_schema->num_columns(); cid++) { |
1246 | 0 | MutableColumnPtr& new_col = full_mutable_columns[cid]; |
1247 | 0 | const IColumn& cur_col = *update_block.get_by_position(cid).column; |
1248 | 0 | const auto& rs_column = rowset_schema->column(cid); |
1249 | 0 | auto col_uid = rs_column.unique_id(); |
1250 | 0 | for (auto idx = 0; idx < update_rows; ++idx) { |
1251 | 0 | if (cid < rowset_schema->num_key_columns()) { |
1252 | 0 | new_col->insert_from(cur_col, read_index_update[idx]); |
1253 | 0 | } else { |
1254 | 0 | const IColumn& default_value_col = |
1255 | 0 | *default_value_block.get_by_position(cid - rowset_schema->num_key_columns()) |
1256 | 0 | .column; |
1257 | 0 | const IColumn& old_value_col = |
1258 | 0 | *old_block.get_by_position(cid - rowset_schema->num_key_columns()).column; |
1259 | 0 | if (rids_be_overwritten.contains(idx)) { |
1260 | 0 | new_col->insert_from(old_value_col, read_index_old[idx]); |
1261 | 0 | } else { |
1262 | 0 | fill_cell_for_flexible_partial_update( |
1263 | 0 | read_index_old, read_index_update, rowset_schema, partial_update_info, |
1264 | 0 | rs_column, idx, new_col, default_value_col, old_value_col, cur_col, |
1265 | 0 | skip_bitmaps->at(idx).contains(col_uid), |
1266 | 0 | rowset_schema->has_sequence_col() |
1267 | 0 | ? !skip_bitmaps->at(idx).contains(seq_col_unique_id) |
1268 | 0 | : false, |
1269 | 0 | old_block_delete_signs); |
1270 | 0 | } |
1271 | 0 | } |
1272 | 0 | } |
1273 | 0 | DCHECK_EQ(full_mutable_columns[cid]->size(), update_rows); |
1274 | 0 | } |
1275 | |
|
1276 | 0 | output_block->set_columns(std::move(full_mutable_columns)); |
1277 | 0 | VLOG_DEBUG << "full block when publish: " << output_block->dump_data(); |
1278 | 0 | return Status::OK(); |
1279 | 0 | } |
1280 | | |
1281 | | Status BaseTablet::commit_phase_update_delete_bitmap( |
1282 | | const BaseTabletSPtr& tablet, const RowsetSharedPtr& rowset, |
1283 | | RowsetIdUnorderedSet& pre_rowset_ids, DeleteBitmapPtr delete_bitmap, |
1284 | | const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t txn_id, |
1285 | 26.2k | CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer) { |
1286 | 26.2k | DBUG_EXECUTE_IF("BaseTablet::commit_phase_update_delete_bitmap.enable_spin_wait", { |
1287 | 26.2k | auto tok = dp->param<std::string>("token", "invalid_token"); |
1288 | 26.2k | while (DebugPoints::instance()->is_enable( |
1289 | 26.2k | "BaseTablet::commit_phase_update_delete_bitmap.block")) { |
1290 | 26.2k | auto block_dp = DebugPoints::instance()->get_debug_point( |
1291 | 26.2k | "BaseTablet::commit_phase_update_delete_bitmap.block"); |
1292 | 26.2k | if (block_dp) { |
1293 | 26.2k | auto pass_token = block_dp->param<std::string>("pass_token", ""); |
1294 | 26.2k | if (pass_token == tok) { |
1295 | 26.2k | break; |
1296 | 26.2k | } |
1297 | 26.2k | } |
1298 | 26.2k | std::this_thread::sleep_for(std::chrono::milliseconds(50)); |
1299 | 26.2k | } |
1300 | 26.2k | }); |
1301 | 26.2k | SCOPED_BVAR_LATENCY(g_tablet_commit_phase_update_delete_bitmap_latency); |
1302 | 26.2k | RowsetIdUnorderedSet cur_rowset_ids; |
1303 | 26.2k | RowsetIdUnorderedSet rowset_ids_to_add; |
1304 | 26.2k | RowsetIdUnorderedSet rowset_ids_to_del; |
1305 | 26.2k | int64_t cur_version; |
1306 | | |
1307 | 26.2k | std::vector<RowsetSharedPtr> specified_rowsets; |
1308 | 26.2k | { |
1309 | | // to prevent seeing intermediate state of a tablet |
1310 | 26.2k | std::unique_lock<bthread::Mutex> sync_lock; |
1311 | 26.2k | if (config::is_cloud_mode()) { |
1312 | 19.2k | sync_lock = std::unique_lock<bthread::Mutex>( |
1313 | 19.2k | std::static_pointer_cast<CloudTablet>(tablet)->get_sync_meta_lock()); |
1314 | 19.2k | } |
1315 | 26.2k | std::shared_lock meta_rlock(tablet->_meta_lock); |
1316 | 26.2k | if (tablet->tablet_state() == TABLET_NOTREADY) { |
1317 | | // tablet is under alter process. The delete bitmap will be calculated after conversion. |
1318 | 20 | LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " |
1319 | 20 | "tablet_id: " |
1320 | 20 | << tablet->tablet_id() << " txn_id: " << txn_id; |
1321 | 20 | return Status::OK(); |
1322 | 20 | } |
1323 | 26.2k | cur_version = tablet->max_version_unlocked(); |
1324 | 26.2k | RETURN_IF_ERROR(tablet->get_all_rs_id_unlocked(cur_version, &cur_rowset_ids)); |
1325 | 26.2k | _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, |
1326 | 26.2k | &rowset_ids_to_del); |
1327 | 26.2k | specified_rowsets = tablet->get_rowset_by_ids(&rowset_ids_to_add); |
1328 | 26.2k | } |
1329 | 10.2k | for (const auto& to_del : rowset_ids_to_del) { |
1330 | 10.2k | delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX}); |
1331 | 10.2k | } |
1332 | | |
1333 | 26.2k | RETURN_IF_ERROR(calc_delete_bitmap(tablet, rowset, segments, specified_rowsets, delete_bitmap, |
1334 | 26.2k | cur_version, token, rowset_writer)); |
1335 | 26.2k | size_t total_rows = std::accumulate( |
1336 | 26.2k | segments.begin(), segments.end(), 0, |
1337 | 26.2k | [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); |
1338 | 26.2k | LOG(INFO) << "[Before Commit] construct delete bitmap tablet: " << tablet->tablet_id() |
1339 | 26.2k | << ", rowset_ids to add: " << rowset_ids_to_add.size() |
1340 | 26.2k | << ", rowset_ids to del: " << rowset_ids_to_del.size() |
1341 | 26.2k | << ", cur max_version: " << cur_version << ", transaction_id: " << txn_id |
1342 | 26.2k | << ", total rows: " << total_rows; |
1343 | 26.2k | pre_rowset_ids = cur_rowset_ids; |
1344 | 26.2k | return Status::OK(); |
1345 | 26.2k | } |
1346 | | |
1347 | | void BaseTablet::add_sentinel_mark_to_delete_bitmap(DeleteBitmap* delete_bitmap, |
1348 | 30.3k | const RowsetIdUnorderedSet& rowsetids) { |
1349 | 232k | for (const auto& rowsetid : rowsetids) { |
1350 | 232k | delete_bitmap->add( |
1351 | 232k | {rowsetid, DeleteBitmap::INVALID_SEGMENT_ID, DeleteBitmap::TEMP_VERSION_COMMON}, |
1352 | 232k | DeleteBitmap::ROWSET_SENTINEL_MARK); |
1353 | 232k | } |
1354 | 30.3k | } |
1355 | | |
1356 | | void BaseTablet::_rowset_ids_difference(const RowsetIdUnorderedSet& cur, |
1357 | | const RowsetIdUnorderedSet& pre, |
1358 | | RowsetIdUnorderedSet* to_add, |
1359 | 90.6k | RowsetIdUnorderedSet* to_del) { |
1360 | 672k | for (const auto& id : cur) { |
1361 | 672k | if (pre.find(id) == pre.end()) { |
1362 | 28.5k | to_add->insert(id); |
1363 | 28.5k | } |
1364 | 672k | } |
1365 | 672k | for (const auto& id : pre) { |
1366 | 672k | if (cur.find(id) == cur.end()) { |
1367 | 28.8k | to_del->insert(id); |
1368 | 28.8k | } |
1369 | 672k | } |
1370 | 90.6k | } |
1371 | | |
1372 | | Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, |
1373 | | int64_t max_version, int64_t txn_id, |
1374 | | const RowsetIdUnorderedSet& rowset_ids, |
1375 | 55.3k | std::vector<RowsetSharedPtr>* rowsets) { |
1376 | 55.3k | RowsetIdUnorderedSet missing_ids; |
1377 | 444k | for (const auto& rowsetid : rowset_ids) { |
1378 | 444k | if (!delete_bitmap->delete_bitmap.contains({rowsetid, DeleteBitmap::INVALID_SEGMENT_ID, |
1379 | 444k | DeleteBitmap::TEMP_VERSION_COMMON})) { |
1380 | 0 | missing_ids.insert(rowsetid); |
1381 | 0 | } |
1382 | 444k | } |
1383 | | |
1384 | 55.3k | if (!missing_ids.empty()) { |
1385 | 0 | LOG(WARNING) << "[txn_id:" << txn_id << "][tablet_id:" << tablet_id() |
1386 | 0 | << "][max_version: " << max_version |
1387 | 0 | << "] check delete bitmap correctness failed!"; |
1388 | 0 | rapidjson::Document root; |
1389 | 0 | root.SetObject(); |
1390 | 0 | rapidjson::Document required_rowsets_arr; |
1391 | 0 | required_rowsets_arr.SetArray(); |
1392 | 0 | rapidjson::Document missing_rowsets_arr; |
1393 | 0 | missing_rowsets_arr.SetArray(); |
1394 | |
|
1395 | 0 | if (rowsets != nullptr) { |
1396 | 0 | for (const auto& rowset : *rowsets) { |
1397 | 0 | rapidjson::Value value; |
1398 | 0 | std::string version_str = rowset->get_rowset_info_str(); |
1399 | 0 | value.SetString(version_str.c_str(), |
1400 | 0 | cast_set<rapidjson::SizeType>(version_str.length()), |
1401 | 0 | required_rowsets_arr.GetAllocator()); |
1402 | 0 | required_rowsets_arr.PushBack(value, required_rowsets_arr.GetAllocator()); |
1403 | 0 | } |
1404 | 0 | } else { |
1405 | 0 | std::vector<RowsetSharedPtr> tablet_rowsets; |
1406 | 0 | { |
1407 | 0 | std::shared_lock meta_rlock(_meta_lock); |
1408 | 0 | tablet_rowsets = get_rowset_by_ids(&rowset_ids); |
1409 | 0 | } |
1410 | 0 | for (const auto& rowset : tablet_rowsets) { |
1411 | 0 | rapidjson::Value value; |
1412 | 0 | std::string version_str = rowset->get_rowset_info_str(); |
1413 | 0 | value.SetString(version_str.c_str(), |
1414 | 0 | cast_set<rapidjson::SizeType>(version_str.length()), |
1415 | 0 | required_rowsets_arr.GetAllocator()); |
1416 | 0 | required_rowsets_arr.PushBack(value, required_rowsets_arr.GetAllocator()); |
1417 | 0 | } |
1418 | 0 | } |
1419 | 0 | for (const auto& missing_rowset_id : missing_ids) { |
1420 | 0 | rapidjson::Value miss_value; |
1421 | 0 | std::string rowset_id_str = missing_rowset_id.to_string(); |
1422 | 0 | miss_value.SetString(rowset_id_str.c_str(), |
1423 | 0 | cast_set<rapidjson::SizeType>(rowset_id_str.length()), |
1424 | 0 | missing_rowsets_arr.GetAllocator()); |
1425 | 0 | missing_rowsets_arr.PushBack(miss_value, missing_rowsets_arr.GetAllocator()); |
1426 | 0 | } |
1427 | |
|
1428 | 0 | root.AddMember("required_rowsets", required_rowsets_arr, root.GetAllocator()); |
1429 | 0 | root.AddMember("missing_rowsets", missing_rowsets_arr, root.GetAllocator()); |
1430 | 0 | rapidjson::StringBuffer strbuf; |
1431 | 0 | rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf); |
1432 | 0 | root.Accept(writer); |
1433 | 0 | std::string rowset_status_string = std::string(strbuf.GetString()); |
1434 | 0 | LOG_EVERY_SECOND(WARNING) << rowset_status_string; |
1435 | | // let it crash if correctness check failed in Debug mode |
1436 | 0 | DCHECK(false) << "delete bitmap correctness check failed in publish phase!"; |
1437 | 0 | return Status::InternalError("check delete bitmap failed!"); |
1438 | 0 | } |
1439 | 55.3k | return Status::OK(); |
1440 | 55.3k | } |
1441 | | |
1442 | | Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInfo* txn_info, |
1443 | | int64_t txn_id, int64_t txn_expiration, |
1444 | 65.6k | DeleteBitmapPtr tablet_delete_bitmap) { |
1445 | 65.6k | SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency); |
1446 | 65.6k | RowsetIdUnorderedSet cur_rowset_ids; |
1447 | 65.6k | RowsetIdUnorderedSet rowset_ids_to_add; |
1448 | 65.6k | RowsetIdUnorderedSet rowset_ids_to_del; |
1449 | 65.6k | RowsetSharedPtr rowset = txn_info->rowset; |
1450 | 65.6k | RowsetSharedPtr row_binlog_rowset; |
1451 | 65.6k | bool build_row_binlog = false; |
1452 | 65.6k | int64_t cur_version = rowset->start_version(); |
1453 | 65.6k | std::unique_ptr<RowsetWriter> transient_rs_writer; |
1454 | 65.6k | DeleteBitmapPtr delete_bitmap = txn_info->delete_bitmap; |
1455 | 65.6k | bool is_partial_update = |
1456 | 65.6k | txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update(); |
1457 | 65.6k | for (const auto& rs : txn_info->attach_rowsets) { |
1458 | 0 | if (rs != nullptr && rs->rowset_meta() != nullptr && rs->rowset_meta()->is_row_binlog()) { |
1459 | 0 | row_binlog_rowset = rs; |
1460 | 0 | build_row_binlog = is_partial_update || |
1461 | 0 | self->tablet_meta()->binlog_config().need_historical_value(); |
1462 | 0 | break; |
1463 | 0 | } |
1464 | 0 | } |
1465 | | |
1466 | | // rewrite conflict only when partial update or need before |
1467 | 65.6k | if (is_partial_update || build_row_binlog) { |
1468 | 3.55k | if (txn_info->partial_update_info == nullptr) { |
1469 | 0 | txn_info->partial_update_info = std::make_shared<PartialUpdateInfo>(); |
1470 | 0 | } |
1471 | 3.55k | if (txn_info->partial_update_info->partial_update_mode == UniqueKeyUpdateModePB::UPSERT) { |
1472 | 0 | txn_info->partial_update_info->partial_update_input_columns.clear(); |
1473 | 0 | txn_info->partial_update_info->missing_cids.clear(); |
1474 | 0 | txn_info->partial_update_info->default_values.clear(); |
1475 | 0 | auto& update_cids = txn_info->partial_update_info->update_cids; |
1476 | 0 | update_cids.resize(rowset->tablet_schema()->num_columns()); |
1477 | 0 | std::iota(update_cids.begin(), update_cids.end(), 0); |
1478 | 0 | } |
1479 | | |
1480 | 3.55k | DCHECK(txn_info->partial_update_info != nullptr); |
1481 | | |
1482 | 3.55k | transient_rs_writer = DORIS_TRY(self->create_transient_rowset_writer( |
1483 | 3.55k | *rowset, txn_info->partial_update_info, txn_expiration)); |
1484 | 3.55k | DBUG_EXECUTE_IF("BaseTablet::update_delete_bitmap.after.create_transient_rs_writer", |
1485 | 3.55k | DBUG_BLOCK); |
1486 | | // Partial update or upsert rewrite might generate new segments when there is conflicts while publish, and mark |
1487 | | // the same key in original segments as delete. |
1488 | | // When the new segment flush fails or the rowset build fails, the deletion marker for the |
1489 | | // duplicate key of the original segment should not remain in `txn_info->delete_bitmap`, |
1490 | | // so we need to make a copy of `txn_info->delete_bitmap` and make changes on it. |
1491 | 3.55k | delete_bitmap = std::make_shared<DeleteBitmap>(*(txn_info->delete_bitmap)); |
1492 | 3.55k | } |
1493 | | |
1494 | 65.6k | OlapStopWatch watch; |
1495 | 65.6k | std::vector<segment_v2::SegmentSharedPtr> segments; |
1496 | 65.6k | RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments)); |
1497 | 65.6k | auto t1 = watch.get_elapse_time_us(); |
1498 | | |
1499 | 65.6k | int64_t next_visible_version = txn_info->is_txn_load ? txn_info->next_visible_version |
1500 | 65.6k | : txn_info->rowset->start_version(); |
1501 | 65.6k | { |
1502 | 65.6k | std::shared_lock meta_rlock(self->_meta_lock); |
1503 | | // tablet is under alter process. The delete bitmap will be calculated after conversion. |
1504 | 65.6k | if (self->tablet_state() == TABLET_NOTREADY) { |
1505 | 2 | LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id=" |
1506 | 2 | << self->tablet_id(); |
1507 | 2 | return Status::OK(); |
1508 | 2 | } |
1509 | 65.6k | RETURN_IF_ERROR(self->get_all_rs_id_unlocked(next_visible_version - 1, &cur_rowset_ids)); |
1510 | 65.6k | } |
1511 | 65.6k | auto t2 = watch.get_elapse_time_us(); |
1512 | | |
1513 | 65.6k | _rowset_ids_difference(cur_rowset_ids, txn_info->rowset_ids, &rowset_ids_to_add, |
1514 | 65.6k | &rowset_ids_to_del); |
1515 | 65.6k | for (const auto& to_del : rowset_ids_to_del) { |
1516 | 18.5k | delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX}); |
1517 | 18.5k | } |
1518 | | |
1519 | 65.6k | std::vector<RowsetSharedPtr> specified_rowsets; |
1520 | 65.6k | { |
1521 | 65.6k | std::shared_lock meta_rlock(self->_meta_lock); |
1522 | 65.6k | specified_rowsets = self->get_rowset_by_ids(&rowset_ids_to_add); |
1523 | 65.6k | } |
1524 | 65.6k | if (txn_info->is_txn_load) { |
1525 | 154 | for (auto invisible_rowset : txn_info->invisible_rowsets) { |
1526 | 38 | specified_rowsets.emplace_back(invisible_rowset); |
1527 | 38 | } |
1528 | 154 | std::sort(specified_rowsets.begin(), specified_rowsets.end(), |
1529 | 489 | [](RowsetSharedPtr& lhs, RowsetSharedPtr& rhs) { |
1530 | 489 | return lhs->end_version() > rhs->end_version(); |
1531 | 489 | }); |
1532 | 154 | } |
1533 | 65.6k | auto t3 = watch.get_elapse_time_us(); |
1534 | | |
1535 | | // If a rowset is produced by compaction before the commit phase of the partial update load |
1536 | | // and is not included in txn_info->rowset_ids, we can skip the alignment process of that rowset |
1537 | | // because data remains the same before and after compaction. But we still need to calculate the |
1538 | | // the delete bitmap for that rowset. |
1539 | 65.6k | std::vector<RowsetSharedPtr> rowsets_skip_alignment; |
1540 | 65.6k | if (is_partial_update) { |
1541 | 3.65k | int64_t max_version_in_flush_phase = |
1542 | 3.65k | txn_info->partial_update_info->max_version_in_flush_phase; |
1543 | 3.65k | DCHECK(max_version_in_flush_phase != -1); |
1544 | 3.65k | std::vector<RowsetSharedPtr> remained_rowsets; |
1545 | 3.65k | for (const auto& specified_rowset : specified_rowsets) { |
1546 | 288 | if (specified_rowset->end_version() <= max_version_in_flush_phase && |
1547 | 288 | specified_rowset->produced_by_compaction()) { |
1548 | 41 | rowsets_skip_alignment.emplace_back(specified_rowset); |
1549 | 247 | } else { |
1550 | 247 | remained_rowsets.emplace_back(specified_rowset); |
1551 | 247 | } |
1552 | 288 | } |
1553 | 3.65k | if (!rowsets_skip_alignment.empty()) { |
1554 | 41 | specified_rowsets = std::move(remained_rowsets); |
1555 | 41 | } |
1556 | 3.65k | } |
1557 | | |
1558 | 65.6k | DBUG_EXECUTE_IF("BaseTablet::update_delete_bitmap.enable_spin_wait", { |
1559 | 65.6k | auto token = dp->param<std::string>("token", "invalid_token"); |
1560 | 65.6k | while (DebugPoints::instance()->is_enable("BaseTablet::update_delete_bitmap.block")) { |
1561 | 65.6k | auto block_dp = DebugPoints::instance()->get_debug_point( |
1562 | 65.6k | "BaseTablet::update_delete_bitmap.block"); |
1563 | 65.6k | if (block_dp) { |
1564 | 65.6k | auto wait_token = block_dp->param<std::string>("wait_token", ""); |
1565 | 65.6k | LOG(INFO) << "BaseTablet::update_delete_bitmap.enable_spin_wait, wait_token: " |
1566 | 65.6k | << wait_token << ", token: " << token; |
1567 | 65.6k | if (wait_token != token) { |
1568 | 65.6k | break; |
1569 | 65.6k | } |
1570 | 65.6k | } |
1571 | 65.6k | std::this_thread::sleep_for(std::chrono::milliseconds(50)); |
1572 | 65.6k | } |
1573 | 65.6k | }); |
1574 | | |
1575 | 65.6k | if (!rowsets_skip_alignment.empty()) { |
1576 | 41 | auto token = self->calc_delete_bitmap_executor()->create_token(); |
1577 | | // set rowset_writer to nullptr to skip the alignment process |
1578 | 41 | RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, rowsets_skip_alignment, |
1579 | 41 | delete_bitmap, cur_version - 1, token.get(), nullptr, |
1580 | 41 | tablet_delete_bitmap)); |
1581 | 41 | RETURN_IF_ERROR(token->wait()); |
1582 | 41 | } |
1583 | | |
1584 | | // Publish-phase partial update or upsert rewrite may generate transient segments. If row binlog is enabled for |
1585 | | // this txn (row binlog rowset attached), we need to build row binlog segments together with |
1586 | | // transient data segments: |
1587 | | // 1) create a transient row binlog writer that appends to the same version row binlog rowset |
1588 | | // 2) pre-allocate per-row LSNs for each transient segment and register them in binlog options |
1589 | | // 3) wrap both writers into a GroupRowsetWriter so calc_delete_bitmap writes to both. |
1590 | 65.6k | if (build_row_binlog) { |
1591 | 0 | DCHECK(transient_rs_writer != nullptr); |
1592 | | |
1593 | | // Create transient row binlog writer for publish-phase segment appending. |
1594 | 0 | auto transient_row_binlog_writer = DORIS_TRY(self->create_transient_rowset_writer( |
1595 | 0 | *row_binlog_rowset, txn_info->partial_update_info, txn_expiration)); |
1596 | | |
1597 | | // Prepare source MOW context for historical row retrieval in binlog writer. |
1598 | 0 | auto& data_ctx = const_cast<RowsetWriterContext&>(transient_rs_writer->context()); |
1599 | 0 | data_ctx.mow_context = std::make_shared<MowContext>( |
1600 | 0 | cur_version - 1, txn_id, std::make_shared<RowsetIdUnorderedSet>(), |
1601 | 0 | specified_rowsets, nullptr); |
1602 | |
|
1603 | 0 | auto& binlog_ctx = const_cast<RowsetWriterContext&>(transient_row_binlog_writer->context()); |
1604 | 0 | auto& cfg = binlog_ctx.write_binlog_opt().write_binlog_config(); |
1605 | 0 | cfg.source.tablet_schema = data_ctx.tablet_schema; |
1606 | 0 | cfg.source.partial_update_info = data_ctx.partial_update_info; |
1607 | 0 | cfg.source.mow_context = data_ctx.mow_context; |
1608 | 0 | cfg.source.is_transient_rowset_writer = data_ctx.is_transient_rowset_writer; |
1609 | 0 | cfg.source.source_write_type = data_ctx.write_type; |
1610 | | |
1611 | | // Wrap two transient writers into a group writer for dual flush/build. |
1612 | 0 | RowsetWriterSharedPtr data_writer_sp(std::move(transient_rs_writer)); |
1613 | 0 | RowsetWriterSharedPtr row_binlog_writer_sp(std::move(transient_row_binlog_writer)); |
1614 | 0 | std::unique_ptr<GroupRowsetWriter> group_writer; |
1615 | 0 | RETURN_IF_ERROR(RowsetFactory::create_empty_group_rowset_writer(&group_writer)); |
1616 | 0 | group_writer->set_data_writer(data_writer_sp); |
1617 | 0 | group_writer->set_row_binlog_writer(row_binlog_writer_sp); |
1618 | 0 | transient_rs_writer = std::move(group_writer); |
1619 | 0 | } |
1620 | | |
1621 | | // When there is only one segment, it will be calculated in the current thread. |
1622 | | // Otherwise, it will be submitted to the thread pool for calculation. |
1623 | 65.6k | if (segments.size() <= 1) { |
1624 | 64.8k | RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, specified_rowsets, delete_bitmap, |
1625 | 64.8k | cur_version - 1, nullptr, transient_rs_writer.get(), |
1626 | 64.8k | tablet_delete_bitmap)); |
1627 | | |
1628 | 64.8k | } else { |
1629 | 762 | auto token = self->calc_delete_bitmap_executor()->create_token(); |
1630 | 762 | RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, specified_rowsets, delete_bitmap, |
1631 | 762 | cur_version - 1, token.get(), transient_rs_writer.get(), |
1632 | 762 | tablet_delete_bitmap)); |
1633 | 762 | RETURN_IF_ERROR(token->wait()); |
1634 | 762 | } |
1635 | | |
1636 | 65.6k | std::stringstream ss; |
1637 | 65.6k | ss << "cost(us): (load segments: " << t1 << ", get all rsid: " << t2 - t1 |
1638 | 65.6k | << ", get rowsets: " << t3 - t2 |
1639 | 65.6k | << ", calc delete bitmap: " << watch.get_elapse_time_us() - t3 << ")"; |
1640 | | |
1641 | 65.6k | if (config::enable_merge_on_write_correctness_check && rowset->num_rows() != 0) { |
1642 | | // only do correctness check if the rowset has at least one row written |
1643 | | // check if all the rowset has ROWSET_SENTINEL_MARK |
1644 | 27.4k | auto st = self->check_delete_bitmap_correctness(delete_bitmap, cur_version - 1, -1, |
1645 | 27.4k | cur_rowset_ids, &specified_rowsets); |
1646 | 27.4k | if (!st.ok()) { |
1647 | 0 | LOG(WARNING) << fmt::format("delete bitmap correctness check failed in publish phase!"); |
1648 | 0 | } |
1649 | 27.4k | } |
1650 | | |
1651 | 65.6k | if (transient_rs_writer) { |
1652 | 3.50k | auto t4 = watch.get_elapse_time_us(); |
1653 | 3.50k | DBUG_EXECUTE_IF("Tablet.update_delete_bitmap.partial_update_write_rowset_fail", { |
1654 | 3.50k | if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
1655 | 3.50k | LOG_WARNING("Tablet.update_delete_bitmap.partial_update_write_rowset random failed") |
1656 | 3.50k | .tag("txn_id", txn_id); |
1657 | 3.50k | return Status::InternalError( |
1658 | 3.50k | "debug update_delete_bitmap partial update write rowset random failed"); |
1659 | 3.50k | } |
1660 | 3.50k | }); |
1661 | | // build rowset writer and merge transient rowset |
1662 | 3.50k | RETURN_IF_ERROR(transient_rs_writer->flush()); |
1663 | 3.50k | RowsetSharedPtr transient_rowset; |
1664 | 3.50k | RowsetSharedPtr transient_row_binlog; |
1665 | 3.50k | if (build_row_binlog) { |
1666 | 0 | auto* group_rowset_writer = typeid_cast<GroupRowsetWriter*>(transient_rs_writer.get()); |
1667 | 0 | DCHECK(group_rowset_writer != nullptr); |
1668 | 0 | std::vector<RowsetSharedPtr> waited_build_rowsets; |
1669 | 0 | RETURN_IF_ERROR(group_rowset_writer->build_rowsets(waited_build_rowsets)); |
1670 | 0 | transient_rowset = waited_build_rowsets.at(0); |
1671 | 0 | transient_row_binlog = waited_build_rowsets.at(1); |
1672 | 3.50k | } else { |
1673 | 3.50k | RETURN_IF_ERROR(transient_rs_writer->build(transient_rowset)); |
1674 | 3.50k | } |
1675 | 3.50k | auto old_segments = rowset->num_segments(); |
1676 | 3.50k | rowset->merge_rowset_meta(*transient_rowset->rowset_meta()); |
1677 | 3.50k | auto new_segments = rowset->num_segments(); |
1678 | 3.50k | ss << ", " << txn_info->partial_update_info->partial_update_mode_str() |
1679 | 3.50k | << " flush rowset (old segment num: " << old_segments |
1680 | 3.50k | << ", new segment num: " << new_segments << ")" |
1681 | 3.50k | << ", cost:" << watch.get_elapse_time_us() - t4 << "(us)"; |
1682 | | |
1683 | 3.50k | if (build_row_binlog) { |
1684 | 0 | DCHECK(row_binlog_rowset != nullptr); |
1685 | 0 | old_segments = row_binlog_rowset->num_segments(); |
1686 | 0 | row_binlog_rowset->merge_rowset_meta(*transient_row_binlog->rowset_meta()); |
1687 | 0 | new_segments = row_binlog_rowset->num_segments(); |
1688 | 0 | ss << ", " << txn_info->partial_update_info->partial_update_mode_str() |
1689 | 0 | << " flush binlog<row> (old segment num: " << old_segments |
1690 | 0 | << ", new segment num: " << new_segments << ")"; |
1691 | |
|
1692 | 0 | SegmentLoader::instance()->erase_segments(row_binlog_rowset->rowset_id(), |
1693 | 0 | row_binlog_rowset->num_segments()); |
1694 | 0 | } |
1695 | | |
1696 | | // update the shared_ptr to new bitmap, which is consistent with current rowset. |
1697 | 3.50k | txn_info->delete_bitmap = delete_bitmap; |
1698 | | // erase segment cache cause we will add a segment to rowset |
1699 | 3.50k | SegmentLoader::instance()->erase_segments(rowset->rowset_id(), rowset->num_segments()); |
1700 | 3.50k | } |
1701 | | |
1702 | 65.6k | size_t total_rows = std::accumulate( |
1703 | 65.6k | segments.begin(), segments.end(), 0, |
1704 | 65.6k | [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); |
1705 | 65.6k | auto t5 = watch.get_elapse_time_us(); |
1706 | 65.6k | int64_t lock_id = txn_info->is_txn_load ? txn_info->lock_id : -1; |
1707 | 65.6k | RETURN_IF_ERROR(self->save_delete_bitmap(txn_info, txn_id, delete_bitmap, |
1708 | 65.6k | transient_rs_writer.get(), cur_rowset_ids, lock_id, |
1709 | 65.6k | next_visible_version)); |
1710 | | |
1711 | | // defensive check, check that the delete bitmap cache we wrote is correct |
1712 | 65.6k | RETURN_IF_ERROR(self->check_delete_bitmap_cache(txn_id, delete_bitmap.get())); |
1713 | | |
1714 | 65.6k | LOG(INFO) << "[Publish] construct delete bitmap tablet: " << self->tablet_id() |
1715 | 65.6k | << ", rowset_ids to add: " |
1716 | 65.6k | << (specified_rowsets.size() + rowsets_skip_alignment.size()) |
1717 | 65.6k | << ", rowset_ids to del: " << rowset_ids_to_del.size() |
1718 | 65.6k | << ", cur version: " << cur_version << ", transaction_id: " << txn_id << "," |
1719 | 65.6k | << ss.str() << " , total rows: " << total_rows |
1720 | 65.6k | << ", update delete_bitmap cost: " << watch.get_elapse_time_us() - t5 << "(us)"; |
1721 | 65.6k | return Status::OK(); |
1722 | 65.6k | } |
1723 | | |
1724 | | void BaseTablet::calc_compaction_output_rowset_delete_bitmap( |
1725 | | const std::vector<RowsetSharedPtr>& input_rowsets, const RowIdConversion& rowid_conversion, |
1726 | | uint64_t start_version, uint64_t end_version, std::set<RowLocation>* missed_rows, |
1727 | | std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>* location_map, |
1728 | 10.1k | const DeleteBitmap& input_delete_bitmap, DeleteBitmap* output_rowset_delete_bitmap) { |
1729 | 10.1k | RowLocation src; |
1730 | 10.1k | RowLocation dst; |
1731 | 90.8k | for (auto& rowset : input_rowsets) { |
1732 | 90.8k | src.rowset_id = rowset->rowset_id(); |
1733 | 134k | for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) { |
1734 | 43.5k | src.segment_id = seg_id; |
1735 | 43.5k | DeleteBitmap subset_map(tablet_id()); |
1736 | 43.5k | input_delete_bitmap.subset({rowset->rowset_id(), seg_id, start_version}, |
1737 | 43.5k | {rowset->rowset_id(), seg_id, end_version}, &subset_map); |
1738 | | // traverse all versions and convert rowid |
1739 | 43.5k | for (auto iter = subset_map.delete_bitmap.begin(); |
1740 | 50.3k | iter != subset_map.delete_bitmap.end(); ++iter) { |
1741 | 6.79k | auto cur_version = std::get<2>(iter->first); |
1742 | 3.02M | for (auto index = iter->second.begin(); index != iter->second.end(); ++index) { |
1743 | 3.01M | src.row_id = *index; |
1744 | 3.01M | if (rowid_conversion.get(src, &dst) != 0) { |
1745 | 2.66M | VLOG_CRITICAL << "Can't find rowid, may be deleted by the delete_handler, " |
1746 | 1.29k | << " src loaction: |" << src.rowset_id << "|" |
1747 | 1.29k | << src.segment_id << "|" << src.row_id |
1748 | 1.29k | << " version: " << cur_version; |
1749 | 2.66M | if (missed_rows) { |
1750 | 1.29k | missed_rows->insert(src); |
1751 | 1.29k | } |
1752 | 2.66M | continue; |
1753 | 2.66M | } |
1754 | 18.4E | VLOG_DEBUG << "calc_compaction_output_rowset_delete_bitmap dst location: |" |
1755 | 18.4E | << dst.rowset_id << "|" << dst.segment_id << "|" << dst.row_id |
1756 | 18.4E | << " src location: |" << src.rowset_id << "|" << src.segment_id |
1757 | 18.4E | << "|" << src.row_id << " start version: " << start_version |
1758 | 18.4E | << "end version" << end_version; |
1759 | 354k | if (location_map) { |
1760 | 0 | (*location_map)[rowset].emplace_back(src, dst); |
1761 | 0 | } |
1762 | 354k | output_rowset_delete_bitmap->add({dst.rowset_id, dst.segment_id, cur_version}, |
1763 | 354k | dst.row_id); |
1764 | 354k | } |
1765 | 6.79k | } |
1766 | 43.5k | } |
1767 | 90.8k | } |
1768 | 10.1k | } |
1769 | | |
1770 | | Status BaseTablet::check_rowid_conversion( |
1771 | | RowsetSharedPtr dst_rowset, |
1772 | | const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>& |
1773 | 0 | location_map) { |
1774 | 0 | if (location_map.empty()) { |
1775 | 0 | VLOG_DEBUG << "check_rowid_conversion, location_map is empty"; |
1776 | 0 | return Status::OK(); |
1777 | 0 | } |
1778 | 0 | std::vector<segment_v2::SegmentSharedPtr> dst_segments; |
1779 | |
|
1780 | 0 | RETURN_IF_ERROR( |
1781 | 0 | std::dynamic_pointer_cast<BetaRowset>(dst_rowset)->load_segments(&dst_segments)); |
1782 | 0 | std::unordered_map<RowsetId, std::vector<segment_v2::SegmentSharedPtr>> input_rowsets_segment; |
1783 | |
|
1784 | 0 | VLOG_DEBUG << "check_rowid_conversion, dst_segments size: " << dst_segments.size(); |
1785 | 0 | for (auto [src_rowset, locations] : location_map) { |
1786 | 0 | std::vector<segment_v2::SegmentSharedPtr>& segments = |
1787 | 0 | input_rowsets_segment[src_rowset->rowset_id()]; |
1788 | 0 | if (segments.empty()) { |
1789 | 0 | RETURN_IF_ERROR( |
1790 | 0 | std::dynamic_pointer_cast<BetaRowset>(src_rowset)->load_segments(&segments)); |
1791 | 0 | } |
1792 | 0 | for (auto& [src, dst] : locations) { |
1793 | 0 | std::string src_key; |
1794 | 0 | std::string dst_key; |
1795 | 0 | Status s = segments[src.segment_id]->read_key_by_rowid(src.row_id, &src_key); |
1796 | 0 | if (UNLIKELY(s.is<NOT_IMPLEMENTED_ERROR>())) { |
1797 | 0 | LOG(INFO) << "primary key index of old version does not " |
1798 | 0 | "support reading key by rowid"; |
1799 | 0 | break; |
1800 | 0 | } |
1801 | 0 | if (UNLIKELY(!s)) { |
1802 | 0 | LOG(WARNING) << "failed to get src key: |" << src.rowset_id << "|" << src.segment_id |
1803 | 0 | << "|" << src.row_id << " status: " << s; |
1804 | 0 | DCHECK(false); |
1805 | 0 | return s; |
1806 | 0 | } |
1807 | | |
1808 | 0 | s = dst_segments[dst.segment_id]->read_key_by_rowid(dst.row_id, &dst_key); |
1809 | 0 | if (UNLIKELY(!s)) { |
1810 | 0 | LOG(WARNING) << "failed to get dst key: |" << dst.rowset_id << "|" << dst.segment_id |
1811 | 0 | << "|" << dst.row_id << " status: " << s; |
1812 | 0 | DCHECK(false); |
1813 | 0 | return s; |
1814 | 0 | } |
1815 | | |
1816 | 0 | VLOG_DEBUG << "check_rowid_conversion, src: |" << src.rowset_id << "|" << src.segment_id |
1817 | 0 | << "|" << src.row_id << "|" << src_key << " dst: |" << dst.rowset_id << "|" |
1818 | 0 | << dst.segment_id << "|" << dst.row_id << "|" << dst_key; |
1819 | 0 | if (UNLIKELY(src_key.compare(dst_key) != 0)) { |
1820 | 0 | LOG(WARNING) << "failed to check key, src key: |" << src.rowset_id << "|" |
1821 | 0 | << src.segment_id << "|" << src.row_id << "|" << src_key |
1822 | 0 | << " dst key: |" << dst.rowset_id << "|" << dst.segment_id << "|" |
1823 | 0 | << dst.row_id << "|" << dst_key; |
1824 | 0 | DCHECK(false); |
1825 | 0 | return Status::InternalError("failed to check rowid conversion"); |
1826 | 0 | } |
1827 | 0 | } |
1828 | 0 | } |
1829 | 0 | return Status::OK(); |
1830 | 0 | } |
1831 | | |
1832 | | // The caller should hold _rowset_update_lock and _meta_lock lock. |
1833 | | Status BaseTablet::update_delete_bitmap_without_lock( |
1834 | | const BaseTabletSPtr& self, const RowsetSharedPtr& rowset, |
1835 | 24 | const std::vector<RowsetSharedPtr>* specified_base_rowsets) { |
1836 | 24 | DBUG_EXECUTE_IF("BaseTablet.update_delete_bitmap_without_lock.random_failed", { |
1837 | 24 | auto rnd = rand() % 100; |
1838 | 24 | auto percent = dp->param("percent", 0.1); |
1839 | 24 | if (rnd < (100 * percent)) { |
1840 | 24 | LOG(WARNING) << "BaseTablet.update_delete_bitmap_without_lock.random_failed"; |
1841 | 24 | return Status::InternalError( |
1842 | 24 | "debug tablet update delete bitmap without lock random failed"); |
1843 | 24 | } else { |
1844 | 24 | LOG(INFO) << "BaseTablet.update_delete_bitmap_without_lock.random_failed not " |
1845 | 24 | "triggered" |
1846 | 24 | << ", rnd:" << rnd << ", percent: " << percent; |
1847 | 24 | } |
1848 | 24 | }); |
1849 | 24 | int64_t cur_version = rowset->start_version(); |
1850 | 24 | std::vector<segment_v2::SegmentSharedPtr> segments; |
1851 | 24 | RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments)); |
1852 | | |
1853 | | // If this rowset does not have a segment, there is no need for an update. |
1854 | 24 | if (segments.empty()) { |
1855 | 9 | LOG(INFO) << "[Schema Change or Clone] skip to construct delete bitmap tablet: " |
1856 | 9 | << self->tablet_id() << " cur max_version: " << cur_version; |
1857 | 9 | return Status::OK(); |
1858 | 9 | } |
1859 | | |
1860 | | // calculate delete bitmap between segments if necessary. |
1861 | 15 | DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(self->tablet_id()); |
1862 | 15 | RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments( |
1863 | 15 | rowset->tablet_schema(), rowset->rowset_id(), segments, delete_bitmap)); |
1864 | | |
1865 | | // get all base rowsets to calculate on |
1866 | 15 | std::vector<RowsetSharedPtr> specified_rowsets; |
1867 | 15 | RowsetIdUnorderedSet cur_rowset_ids; |
1868 | 15 | if (specified_base_rowsets == nullptr) { |
1869 | 15 | RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1, &cur_rowset_ids)); |
1870 | 15 | specified_rowsets = self->get_rowset_by_ids(&cur_rowset_ids); |
1871 | 15 | } else { |
1872 | 0 | specified_rowsets = *specified_base_rowsets; |
1873 | 0 | } |
1874 | | |
1875 | 15 | OlapStopWatch watch; |
1876 | 15 | auto token = self->calc_delete_bitmap_executor()->create_token(); |
1877 | 15 | RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, specified_rowsets, delete_bitmap, |
1878 | 15 | cur_version - 1, token.get())); |
1879 | 15 | RETURN_IF_ERROR(token->wait()); |
1880 | 15 | size_t total_rows = std::accumulate( |
1881 | 15 | segments.begin(), segments.end(), 0, |
1882 | 15 | [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); }); |
1883 | 15 | LOG(INFO) << "[Schema Change or Clone] construct delete bitmap tablet: " << self->tablet_id() |
1884 | 15 | << ", rowset_ids: " << cur_rowset_ids.size() << ", cur max_version: " << cur_version |
1885 | 15 | << ", transaction_id: " << -1 << ", cost: " << watch.get_elapse_time_us() |
1886 | 15 | << "(us), total rows: " << total_rows; |
1887 | 15 | if (config::enable_merge_on_write_correctness_check) { |
1888 | | // check if all the rowset has ROWSET_SENTINEL_MARK |
1889 | 15 | auto st = self->check_delete_bitmap_correctness(delete_bitmap, cur_version - 1, -1, |
1890 | 15 | cur_rowset_ids, &specified_rowsets); |
1891 | 15 | if (!st.ok()) { |
1892 | 0 | LOG(WARNING) << fmt::format("delete bitmap correctness check failed in publish phase!"); |
1893 | 0 | } |
1894 | 15 | delete_bitmap->remove_sentinel_marks(); |
1895 | 15 | } |
1896 | 15 | for (auto& iter : delete_bitmap->delete_bitmap) { |
1897 | 9 | self->_tablet_meta->delete_bitmap().merge( |
1898 | 9 | {std::get<0>(iter.first), std::get<1>(iter.first), cur_version}, iter.second); |
1899 | 9 | } |
1900 | | |
1901 | 15 | return Status::OK(); |
1902 | 15 | } |
1903 | | |
1904 | | void BaseTablet::agg_delete_bitmap_for_stale_rowsets( |
1905 | 4.82k | Version version, DeleteBitmapKeyRanges& remove_delete_bitmap_key_ranges) { |
1906 | 4.82k | if (!config::enable_agg_and_remove_pre_rowsets_delete_bitmap) { |
1907 | 0 | return; |
1908 | 0 | } |
1909 | 4.82k | if (!(keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write())) { |
1910 | 0 | return; |
1911 | 0 | } |
1912 | 4.82k | int64_t start_version = version.first; |
1913 | 4.82k | int64_t end_version = version.second; |
1914 | 4.82k | if (start_version == end_version) { |
1915 | 0 | return; |
1916 | 0 | } |
1917 | 4.82k | DCHECK(start_version < end_version) |
1918 | 0 | << ". start_version: " << start_version << ", end_version: " << end_version; |
1919 | | // get pre rowsets |
1920 | 4.82k | std::vector<RowsetSharedPtr> pre_rowsets {}; |
1921 | 4.82k | { |
1922 | 4.82k | std::shared_lock rdlock(_meta_lock); |
1923 | 31.1k | for (const auto& it2 : _rs_version_map) { |
1924 | 31.1k | if (it2.first.second < start_version) { |
1925 | 7.15k | pre_rowsets.emplace_back(it2.second); |
1926 | 7.15k | } |
1927 | 31.1k | } |
1928 | 4.82k | } |
1929 | 4.82k | std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator); |
1930 | | // do agg for pre rowsets |
1931 | 4.82k | DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id()); |
1932 | 7.15k | for (auto& rowset : pre_rowsets) { |
1933 | 9.47k | for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) { |
1934 | 2.31k | auto d = tablet_meta()->delete_bitmap().get_agg_without_cache( |
1935 | 2.31k | {rowset->rowset_id(), seg_id, end_version}, start_version); |
1936 | 2.31k | if (d->isEmpty()) { |
1937 | 1.53k | continue; |
1938 | 1.53k | } |
1939 | 781 | VLOG_DEBUG << "agg delete bitmap for tablet_id=" << tablet_id() |
1940 | 0 | << ", rowset_id=" << rowset->rowset_id() << ", seg_id=" << seg_id |
1941 | 0 | << ", rowset_version=" << rowset->version().to_string() |
1942 | 0 | << ". compaction start_version=" << start_version |
1943 | 0 | << ", end_version=" << end_version << ", delete_bitmap=" << d->cardinality(); |
1944 | 781 | DeleteBitmap::BitmapKey start_key {rowset->rowset_id(), seg_id, start_version}; |
1945 | 781 | DeleteBitmap::BitmapKey end_key {rowset->rowset_id(), seg_id, end_version}; |
1946 | 781 | new_delete_bitmap->set(end_key, *d); |
1947 | 781 | remove_delete_bitmap_key_ranges.emplace_back(start_key, end_key); |
1948 | 781 | } |
1949 | 7.15k | } |
1950 | 4.82k | DBUG_EXECUTE_IF("BaseTablet.agg_delete_bitmap_for_stale_rowsets.merge_delete_bitmap.block", |
1951 | 4.82k | DBUG_BLOCK); |
1952 | 4.82k | tablet_meta()->delete_bitmap().merge(*new_delete_bitmap); |
1953 | 4.82k | } |
1954 | | |
1955 | | void BaseTablet::check_agg_delete_bitmap_for_stale_rowsets(int64_t& useless_rowset_count, |
1956 | 0 | int64_t& useless_rowset_version_count) { |
1957 | 0 | std::map<RowsetId, Version> rowset_ids; |
1958 | 0 | std::set<int64_t> end_versions; |
1959 | 0 | traverse_rowsets( |
1960 | 0 | [&rowset_ids, &end_versions](const RowsetSharedPtr& rs) { |
1961 | 0 | rowset_ids[rs->rowset_id()] = rs->version(); |
1962 | 0 | end_versions.emplace(rs->end_version()); |
1963 | 0 | }, |
1964 | 0 | true); |
1965 | |
|
1966 | 0 | std::set<RowsetId> useless_rowsets; |
1967 | 0 | std::map<RowsetId, std::vector<int64_t>> useless_rowset_versions; |
1968 | 0 | { |
1969 | 0 | _tablet_meta->delete_bitmap().traverse_rowset_and_version( |
1970 | | // 0: rowset and rowset with version exists |
1971 | | // -1: rowset does not exist |
1972 | | // -2: find next <rowset, version> |
1973 | | // rowset exist, rowset with version does not exist |
1974 | | // sequence table |
1975 | 0 | [&](const RowsetId& rowset_id, int64_t version) { |
1976 | 0 | auto rowset_it = rowset_ids.find(rowset_id); |
1977 | 0 | if (rowset_it == rowset_ids.end()) { |
1978 | 0 | useless_rowsets.emplace(rowset_id); |
1979 | 0 | return -1; |
1980 | 0 | } |
1981 | 0 | if (end_versions.find(version) == end_versions.end()) { |
1982 | 0 | if (tablet_schema()->has_sequence_col()) { |
1983 | 0 | auto rowset_version = rowset_it->second; |
1984 | 0 | if (version >= rowset_version.first && |
1985 | 0 | version <= rowset_version.second) { |
1986 | 0 | return -2; |
1987 | 0 | } |
1988 | 0 | } |
1989 | 0 | if (useless_rowset_versions.find(rowset_id) == |
1990 | 0 | useless_rowset_versions.end()) { |
1991 | 0 | useless_rowset_versions[rowset_id] = {}; |
1992 | 0 | } |
1993 | 0 | useless_rowset_versions[rowset_id].emplace_back(version); |
1994 | 0 | return -2; |
1995 | 0 | } |
1996 | 0 | return 0; |
1997 | 0 | }); |
1998 | 0 | } |
1999 | 0 | useless_rowset_count = useless_rowsets.size(); |
2000 | 0 | useless_rowset_version_count = useless_rowset_versions.size(); |
2001 | 0 | if (!useless_rowsets.empty() || !useless_rowset_versions.empty()) { |
2002 | 0 | std::stringstream ss; |
2003 | 0 | if (!useless_rowsets.empty()) { |
2004 | 0 | ss << "useless rowsets: {"; |
2005 | 0 | for (auto it = useless_rowsets.begin(); it != useless_rowsets.end(); ++it) { |
2006 | 0 | if (it != useless_rowsets.begin()) { |
2007 | 0 | ss << ", "; |
2008 | 0 | } |
2009 | 0 | ss << it->to_string(); |
2010 | 0 | } |
2011 | 0 | ss << "}. "; |
2012 | 0 | } |
2013 | 0 | if (!useless_rowset_versions.empty()) { |
2014 | 0 | ss << "useless rowset versions: {"; |
2015 | 0 | for (auto iter = useless_rowset_versions.begin(); iter != useless_rowset_versions.end(); |
2016 | 0 | ++iter) { |
2017 | 0 | if (iter != useless_rowset_versions.begin()) { |
2018 | 0 | ss << ", "; |
2019 | 0 | } |
2020 | 0 | ss << iter->first.to_string() << ": ["; |
2021 | | // some versions are continuous, such as [8, 9, 10, 11, 13, 17, 18] |
2022 | | // print as [8-11, 13, 17-18] |
2023 | 0 | int64_t last_start_version = -1; |
2024 | 0 | int64_t last_end_version = -1; |
2025 | 0 | for (int64_t version : iter->second) { |
2026 | 0 | if (last_start_version == -1) { |
2027 | 0 | last_start_version = version; |
2028 | 0 | last_end_version = version; |
2029 | 0 | continue; |
2030 | 0 | } |
2031 | 0 | if (last_end_version + 1 == version) { |
2032 | 0 | last_end_version = version; |
2033 | 0 | } else { |
2034 | 0 | if (last_start_version == last_end_version) { |
2035 | 0 | ss << last_start_version << ", "; |
2036 | 0 | } else { |
2037 | 0 | ss << last_start_version << "-" << last_end_version << ", "; |
2038 | 0 | } |
2039 | 0 | last_start_version = version; |
2040 | 0 | last_end_version = version; |
2041 | 0 | } |
2042 | 0 | } |
2043 | 0 | if (last_start_version == last_end_version) { |
2044 | 0 | ss << last_start_version; |
2045 | 0 | } else { |
2046 | 0 | ss << last_start_version << "-" << last_end_version; |
2047 | 0 | } |
2048 | |
|
2049 | 0 | ss << "]"; |
2050 | 0 | } |
2051 | 0 | ss << "}."; |
2052 | 0 | } |
2053 | 0 | LOG(WARNING) << "failed check_agg_delete_bitmap_for_stale_rowsets for tablet_id=" |
2054 | 0 | << tablet_id() << ". " << ss.str(); |
2055 | 0 | } else { |
2056 | 0 | LOG(INFO) << "succeed check_agg_delete_bitmap_for_stale_rowsets for tablet_id=" |
2057 | 0 | << tablet_id(); |
2058 | 0 | } |
2059 | 0 | } |
2060 | | |
2061 | 7 | RowsetSharedPtr BaseTablet::get_rowset(const RowsetId& rowset_id) { |
2062 | 7 | std::shared_lock rdlock(_meta_lock); |
2063 | 11 | for (auto& version_rowset : _rs_version_map) { |
2064 | 11 | if (version_rowset.second->rowset_id() == rowset_id) { |
2065 | 7 | return version_rowset.second; |
2066 | 7 | } |
2067 | 11 | } |
2068 | 0 | for (auto& stale_version_rowset : _stale_rs_version_map) { |
2069 | 0 | if (stale_version_rowset.second->rowset_id() == rowset_id) { |
2070 | 0 | return stale_version_rowset.second; |
2071 | 0 | } |
2072 | 0 | } |
2073 | 0 | return nullptr; |
2074 | 0 | } |
2075 | | |
2076 | 1.69k | std::vector<RowsetSharedPtr> BaseTablet::get_snapshot_rowset(bool include_stale_rowset) const { |
2077 | 1.69k | std::shared_lock rdlock(_meta_lock); |
2078 | 1.69k | std::vector<RowsetSharedPtr> rowsets; |
2079 | 1.69k | std::transform(_rs_version_map.cbegin(), _rs_version_map.cend(), std::back_inserter(rowsets), |
2080 | 6.08k | [](auto& kv) { return kv.second; }); |
2081 | 1.69k | if (include_stale_rowset) { |
2082 | 282 | std::transform(_stale_rs_version_map.cbegin(), _stale_rs_version_map.cend(), |
2083 | 282 | std::back_inserter(rowsets), [](auto& kv) { return kv.second; }); |
2084 | 282 | } |
2085 | 1.69k | return rowsets; |
2086 | 1.69k | } |
2087 | | |
2088 | | void BaseTablet::calc_consecutive_empty_rowsets( |
2089 | | std::vector<RowsetSharedPtr>* empty_rowsets, |
2090 | 405 | const std::vector<RowsetSharedPtr>& candidate_rowsets, int64_t limit) { |
2091 | 405 | int len = cast_set<int>(candidate_rowsets.size()); |
2092 | 449 | for (int i = 0; i < len - 1; ++i) { |
2093 | 48 | auto rowset = candidate_rowsets[i]; |
2094 | 48 | auto next_rowset = candidate_rowsets[i + 1]; |
2095 | | |
2096 | | // identify two consecutive rowsets that are empty |
2097 | 48 | if (rowset->num_segments() == 0 && next_rowset->num_segments() == 0 && |
2098 | 48 | !rowset->rowset_meta()->has_delete_predicate() && |
2099 | 48 | !next_rowset->rowset_meta()->has_delete_predicate() && |
2100 | 48 | rowset->end_version() == next_rowset->start_version() - 1) { |
2101 | 4 | empty_rowsets->emplace_back(rowset); |
2102 | 4 | empty_rowsets->emplace_back(next_rowset); |
2103 | 4 | rowset = next_rowset; |
2104 | 4 | int next_index = i + 2; |
2105 | | |
2106 | | // keep searching for consecutive empty rowsets |
2107 | 18 | while (next_index < len && candidate_rowsets[next_index]->num_segments() == 0 && |
2108 | 18 | !candidate_rowsets[next_index]->rowset_meta()->has_delete_predicate() && |
2109 | 18 | rowset->end_version() == candidate_rowsets[next_index]->start_version() - 1) { |
2110 | 14 | empty_rowsets->emplace_back(candidate_rowsets[next_index]); |
2111 | 14 | rowset = candidate_rowsets[next_index++]; |
2112 | 14 | } |
2113 | | // if the number of consecutive empty rowset reach the limit, |
2114 | | // and there are still rowsets following them |
2115 | 4 | if (empty_rowsets->size() >= limit && next_index < len) { |
2116 | 4 | return; |
2117 | 4 | } else { |
2118 | | // current rowset is not empty, start searching from that rowset in the next |
2119 | 0 | i = next_index - 1; |
2120 | 0 | empty_rowsets->clear(); |
2121 | 0 | } |
2122 | 4 | } |
2123 | 48 | } |
2124 | 405 | } |
2125 | | |
2126 | | Status BaseTablet::calc_file_crc(uint32_t* crc_value, int64_t start_version, int64_t end_version, |
2127 | 2 | uint32_t* rowset_count, int64_t* file_count) { |
2128 | 2 | Version v(start_version, end_version); |
2129 | 2 | std::vector<RowsetSharedPtr> rowsets; |
2130 | 10 | traverse_rowsets([&rowsets, &v](const auto& rs) { |
2131 | | // get all rowsets |
2132 | 10 | if (v.contains(rs->version())) { |
2133 | 10 | rowsets.emplace_back(rs); |
2134 | 10 | } |
2135 | 10 | }); |
2136 | 2 | std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator); |
2137 | 2 | *rowset_count = cast_set<uint32_t>(rowsets.size()); |
2138 | | |
2139 | 2 | *crc_value = 0; |
2140 | 2 | *file_count = 0; |
2141 | 10 | for (const auto& rs : rowsets) { |
2142 | 10 | uint32_t rs_crc_value = 0; |
2143 | 10 | int64_t rs_file_count = 0; |
2144 | 10 | auto rowset = std::static_pointer_cast<BetaRowset>(rs); |
2145 | 10 | auto st = rowset->calc_file_crc(&rs_crc_value, &rs_file_count); |
2146 | 10 | if (!st.ok()) { |
2147 | 0 | return st; |
2148 | 0 | } |
2149 | | // crc_value is calculated based on the crc_value of each rowset. |
2150 | 10 | *crc_value = crc32c::Extend(*crc_value, reinterpret_cast<const uint8_t*>(&rs_crc_value), |
2151 | 10 | sizeof(rs_crc_value)); |
2152 | 10 | *file_count += rs_file_count; |
2153 | 10 | } |
2154 | 2 | return Status::OK(); |
2155 | 2 | } |
2156 | | |
2157 | 48 | Status BaseTablet::show_nested_index_file(std::string* json_meta) { |
2158 | 48 | Version v(0, max_version_unlocked()); |
2159 | 48 | std::vector<RowsetSharedPtr> rowsets; |
2160 | 147 | traverse_rowsets([&rowsets, &v](const auto& rs) { |
2161 | | // get all rowsets |
2162 | 147 | if (v.contains(rs->version())) { |
2163 | 147 | rowsets.emplace_back(rs); |
2164 | 147 | } |
2165 | 147 | }); |
2166 | 48 | std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator); |
2167 | | |
2168 | 48 | rapidjson::Document doc; |
2169 | 48 | doc.SetObject(); |
2170 | 48 | rapidjson::Document::AllocatorType& allocator = doc.GetAllocator(); |
2171 | 48 | rapidjson::Value tabletIdValue(tablet_id()); |
2172 | 48 | doc.AddMember("tablet_id", tabletIdValue, allocator); |
2173 | | |
2174 | 48 | rapidjson::Value rowsets_value(rapidjson::kArrayType); |
2175 | | |
2176 | 147 | for (const auto& rs : rowsets) { |
2177 | 147 | rapidjson::Value rowset_value(rapidjson::kObjectType); |
2178 | | |
2179 | 147 | auto rowset = std::static_pointer_cast<BetaRowset>(rs); |
2180 | 147 | RETURN_IF_ERROR(rowset->show_nested_index_file(&rowset_value, allocator)); |
2181 | 146 | rowsets_value.PushBack(rowset_value, allocator); |
2182 | 146 | } |
2183 | 47 | doc.AddMember("rowsets", rowsets_value, allocator); |
2184 | | |
2185 | 47 | rapidjson::StringBuffer buffer; |
2186 | 47 | rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer); |
2187 | 47 | doc.Accept(writer); |
2188 | 47 | *json_meta = std::string(buffer.GetString()); |
2189 | | |
2190 | 47 | return Status::OK(); |
2191 | 48 | } |
2192 | | |
2193 | | void BaseTablet::get_base_rowset_delete_bitmap_count( |
2194 | | uint64_t* max_base_rowset_delete_bitmap_score, |
2195 | 350 | int64_t* max_base_rowset_delete_bitmap_score_tablet_id) { |
2196 | 350 | std::vector<RowsetSharedPtr> rowsets_; |
2197 | 350 | std::string base_rowset_id_str; |
2198 | 350 | { |
2199 | 350 | std::shared_lock rowset_ldlock(this->get_header_lock()); |
2200 | 5.60k | for (const auto& it : _rs_version_map) { |
2201 | 5.60k | rowsets_.emplace_back(it.second); |
2202 | 5.60k | } |
2203 | 350 | } |
2204 | 350 | std::sort(rowsets_.begin(), rowsets_.end(), Rowset::comparator); |
2205 | 350 | if (!rowsets_.empty()) { |
2206 | 350 | bool base_found = false; |
2207 | 888 | for (auto& rowset : rowsets_) { |
2208 | 888 | if (rowset->start_version() > 2) { |
2209 | 249 | break; |
2210 | 249 | } |
2211 | 639 | base_found = true; |
2212 | 639 | uint64_t base_rowset_delete_bitmap_count = |
2213 | 639 | this->tablet_meta()->delete_bitmap().get_count_with_range( |
2214 | 639 | {rowset->rowset_id(), 0, 0}, |
2215 | 639 | {rowset->rowset_id(), UINT32_MAX, UINT64_MAX}); |
2216 | 639 | if (base_rowset_delete_bitmap_count > *max_base_rowset_delete_bitmap_score) { |
2217 | 37 | *max_base_rowset_delete_bitmap_score = base_rowset_delete_bitmap_count; |
2218 | 37 | *max_base_rowset_delete_bitmap_score_tablet_id = this->tablet_id(); |
2219 | 37 | } |
2220 | 639 | } |
2221 | 350 | if (!base_found) { |
2222 | 0 | LOG(WARNING) << "can not found base rowset for tablet " << tablet_id(); |
2223 | 0 | } |
2224 | 350 | } |
2225 | 350 | } |
2226 | | |
2227 | 1.40M | void TabletReadSource::fill_delete_predicates() { |
2228 | 1.40M | DCHECK_EQ(delete_predicates.size(), 0); |
2229 | 1.40M | auto delete_pred_view = |
2230 | 5.78M | rs_splits | std::views::transform([](auto&& split) { |
2231 | 5.78M | return split.rs_reader->rowset()->rowset_meta(); |
2232 | 5.78M | }) | |
2233 | 5.80M | std::views::filter([](const auto& rs_meta) { return rs_meta->has_delete_predicate(); }); |
2234 | 1.40M | delete_predicates = {delete_pred_view.begin(), delete_pred_view.end()}; |
2235 | 1.40M | } |
2236 | | |
2237 | 267M | int32_t BaseTablet::max_version_config() { |
2238 | 267M | int32_t max_version = tablet_meta()->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY |
2239 | 267M | ? std::max(config::time_series_max_tablet_version_num, |
2240 | 4.10k | config::max_tablet_version_num) |
2241 | 267M | : config::max_tablet_version_num; |
2242 | 267M | return max_version; |
2243 | 267M | } |
2244 | | |
2245 | 23.8k | void BaseTablet::prefill_dbm_agg_cache(const RowsetSharedPtr& rowset, int64_t version) { |
2246 | 36.4k | for (std::size_t i = 0; i < rowset->num_segments(); i++) { |
2247 | 12.5k | tablet_meta()->delete_bitmap().get_agg({rowset->rowset_id(), i, version}); |
2248 | 12.5k | } |
2249 | 23.8k | } |
2250 | | |
2251 | 9.07k | void BaseTablet::prefill_dbm_agg_cache_after_compaction(const RowsetSharedPtr& output_rowset) { |
2252 | 9.07k | if (keys_type() == KeysType::UNIQUE_KEYS && enable_unique_key_merge_on_write() && |
2253 | 9.07k | (config::enable_prefill_output_dbm_agg_cache_after_compaction || |
2254 | 5.01k | config::enable_prefill_all_dbm_agg_cache_after_compaction)) { |
2255 | 5.01k | int64_t cur_max_version {-1}; |
2256 | 5.01k | { |
2257 | 5.01k | std::shared_lock rlock(get_header_lock()); |
2258 | 5.01k | cur_max_version = max_version_unlocked(); |
2259 | 5.01k | } |
2260 | 5.01k | if (config::enable_prefill_all_dbm_agg_cache_after_compaction) { |
2261 | 5.01k | traverse_rowsets( |
2262 | 23.8k | [&](const RowsetSharedPtr& rs) { prefill_dbm_agg_cache(rs, cur_max_version); }, |
2263 | 5.01k | false); |
2264 | 5.01k | } else if (config::enable_prefill_output_dbm_agg_cache_after_compaction) { |
2265 | 0 | prefill_dbm_agg_cache(output_rowset, cur_max_version); |
2266 | 0 | } |
2267 | 5.01k | } |
2268 | 9.07k | } |
2269 | | |
2270 | | bool BaseTablet::_key_is_not_in_segment(Slice key, const KeyBoundsPB& segment_key_bounds, |
2271 | 3.96M | bool is_segments_key_bounds_truncated) { |
2272 | 3.96M | Slice maybe_truncated_min_key {segment_key_bounds.min_key()}; |
2273 | 3.96M | Slice maybe_truncated_max_key {segment_key_bounds.max_key()}; |
2274 | 3.96M | bool res1 = Slice::lhs_is_strictly_less_than_rhs(key, false, maybe_truncated_min_key, |
2275 | 3.96M | is_segments_key_bounds_truncated); |
2276 | 3.96M | bool res2 = Slice::lhs_is_strictly_less_than_rhs(maybe_truncated_max_key, |
2277 | 3.96M | is_segments_key_bounds_truncated, key, false); |
2278 | 3.96M | return res1 || res2; |
2279 | 3.96M | } |
2280 | | |
2281 | | } // namespace doris |