/root/doris/be/src/olap/rowset/segcompaction.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "segcompaction.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/olap_file.pb.h> |
22 | | #include <limits.h> |
23 | | |
24 | | #include <algorithm> |
25 | | #include <atomic> |
26 | | #include <condition_variable> |
27 | | #include <filesystem> |
28 | | #include <map> |
29 | | #include <memory> |
30 | | #include <mutex> |
31 | | #include <sstream> |
32 | | #include <string> |
33 | | #include <utility> |
34 | | |
35 | | #include "beta_rowset_writer.h" |
36 | | #include "common/compiler_util.h" // IWYU pragma: keep |
37 | | #include "common/logging.h" |
38 | | #include "gutil/stringprintf.h" |
39 | | #include "gutil/strings/substitute.h" |
40 | | #include "io/fs/file_system.h" |
41 | | #include "io/fs/file_writer.h" |
42 | | #include "io/io_common.h" |
43 | | #include "olap/data_dir.h" |
44 | | #include "olap/iterators.h" |
45 | | #include "olap/merger.h" |
46 | | #include "olap/olap_common.h" |
47 | | #include "olap/olap_define.h" |
48 | | #include "olap/rowset/beta_rowset.h" |
49 | | #include "olap/rowset/rowset_meta.h" |
50 | | #include "olap/rowset/rowset_writer_context.h" |
51 | | #include "olap/rowset/segment_v2/inverted_index_cache.h" |
52 | | #include "olap/rowset/segment_v2/inverted_index_desc.h" |
53 | | #include "olap/rowset/segment_v2/segment.h" |
54 | | #include "olap/rowset/segment_v2/segment_writer.h" |
55 | | #include "olap/schema.h" |
56 | | #include "olap/storage_engine.h" |
57 | | #include "olap/tablet_reader.h" |
58 | | #include "olap/tablet_schema.h" |
59 | | #include "runtime/memory/global_memory_arbitrator.h" |
60 | | #include "runtime/thread_context.h" |
61 | | #include "util/debug_points.h" |
62 | | #include "util/mem_info.h" |
63 | | #include "util/time.h" |
64 | | #include "vec/olap/vertical_block_reader.h" |
65 | | #include "vec/olap/vertical_merge_iterator.h" |
66 | | |
67 | | namespace doris { |
68 | | using namespace ErrorCode; |
69 | | |
70 | 472 | SegcompactionWorker::SegcompactionWorker(BetaRowsetWriter* writer) : _writer(writer) {} |
71 | | |
72 | 472 | void SegcompactionWorker::init_mem_tracker(const RowsetWriterContext& rowset_writer_context) { |
73 | 472 | _seg_compact_mem_tracker = MemTrackerLimiter::create_shared( |
74 | 472 | MemTrackerLimiter::Type::COMPACTION, |
75 | 472 | fmt::format("segcompaction-txnID_{}-loadID_{}-tabletID_{}-indexID_{}-" |
76 | 472 | "partitionID_{}-version_{}", |
77 | 472 | std::to_string(rowset_writer_context.txn_id), |
78 | 472 | print_id(rowset_writer_context.load_id), |
79 | 472 | std::to_string(rowset_writer_context.tablet_id), |
80 | 472 | std::to_string(rowset_writer_context.index_id), |
81 | 472 | std::to_string(rowset_writer_context.partition_id), |
82 | 472 | rowset_writer_context.version.to_string())); |
83 | 472 | } |
84 | | |
85 | | Status SegcompactionWorker::_get_segcompaction_reader( |
86 | | SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet, |
87 | | std::shared_ptr<Schema> schema, OlapReaderStatistics* stat, |
88 | | vectorized::RowSourcesBuffer& row_sources_buf, bool is_key, |
89 | | std::vector<uint32_t>& return_columns, |
90 | 22 | std::unique_ptr<vectorized::VerticalBlockReader>* reader) { |
91 | 22 | const auto& ctx = _writer->_context; |
92 | 22 | bool record_rowids = need_convert_delete_bitmap() && is_key; |
93 | 22 | StorageReadOptions read_options; |
94 | 22 | read_options.stats = stat; |
95 | 22 | read_options.use_page_cache = false; |
96 | 22 | read_options.tablet_schema = ctx.tablet_schema; |
97 | 22 | read_options.record_rowids = record_rowids; |
98 | 22 | std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; |
99 | 22 | std::map<uint32_t, uint32_t> segment_rows; |
100 | 118 | for (auto& seg_ptr : *segments) { |
101 | 118 | std::unique_ptr<RowwiseIterator> iter; |
102 | 118 | auto s = seg_ptr->new_iterator(schema, read_options, &iter); |
103 | 118 | if (!s.ok()) { |
104 | 0 | return Status::Error<INIT_FAILED>("failed to create iterator[{}]: {}", seg_ptr->id(), |
105 | 0 | s.to_string()); |
106 | 0 | } |
107 | 118 | seg_iterators.push_back(std::move(iter)); |
108 | 118 | segment_rows.emplace(seg_ptr->id(), seg_ptr->num_rows()); |
109 | 118 | } |
110 | 22 | if (record_rowids && _rowid_conversion != nullptr) { |
111 | 4 | _rowid_conversion->reset_segment_map(segment_rows); |
112 | 4 | } |
113 | | |
114 | 22 | *reader = std::make_unique<vectorized::VerticalBlockReader>(&row_sources_buf); |
115 | | |
116 | 22 | TabletReader::ReaderParams reader_params; |
117 | 22 | reader_params.is_segcompaction = true; |
118 | 22 | reader_params.segment_iters_ptr = &seg_iterators; |
119 | | // no reader_params.version shouldn't break segcompaction |
120 | 22 | reader_params.tablet_schema = ctx.tablet_schema; |
121 | 22 | reader_params.tablet = tablet; |
122 | 22 | reader_params.return_columns = return_columns; |
123 | 22 | reader_params.is_key_column_group = is_key; |
124 | 22 | reader_params.use_page_cache = false; |
125 | 22 | reader_params.record_rowids = record_rowids; |
126 | 22 | return (*reader)->init(reader_params, nullptr); |
127 | 22 | } |
128 | | |
129 | | std::unique_ptr<segment_v2::SegmentWriter> SegcompactionWorker::_create_segcompaction_writer( |
130 | 11 | uint32_t begin, uint32_t end) { |
131 | 11 | Status status; |
132 | 11 | std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; |
133 | 11 | status = _create_segment_writer_for_segcompaction(&writer, begin, end); |
134 | 11 | if (!status.ok() || writer == nullptr) { |
135 | 0 | LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end |
136 | 0 | << " status:" << status; |
137 | 0 | return nullptr; |
138 | 11 | } else { |
139 | 11 | return writer; |
140 | 11 | } |
141 | 11 | } |
142 | | |
143 | 11 | Status SegcompactionWorker::_delete_original_segments(uint32_t begin, uint32_t end) { |
144 | 11 | DCHECK(_writer->rowset_meta()->is_local()); |
145 | | |
146 | 11 | const auto& fs = io::global_local_filesystem(); |
147 | 11 | auto ctx = _writer->_context; |
148 | 11 | auto schema = ctx.tablet_schema; |
149 | | |
150 | 70 | for (uint32_t i = begin; i <= end; ++i) { |
151 | 59 | auto seg_path = local_segment_path(ctx.tablet_path, ctx.rowset_id.to_string(), i); |
152 | | // Even if an error is encountered, these files that have not been cleaned up |
153 | | // will be cleaned up by the GC background. So here we only print the error |
154 | | // message when we encounter an error. |
155 | 59 | RETURN_NOT_OK_STATUS_WITH_WARN(fs->delete_file(seg_path), |
156 | 59 | strings::Substitute("Failed to delete file=$0", seg_path)); |
157 | 59 | if (schema->has_inverted_index() && |
158 | 59 | schema->get_inverted_index_storage_format() >= InvertedIndexStorageFormatPB::V2) { |
159 | 29 | auto idx_path = InvertedIndexDescriptor::get_index_file_path_v2( |
160 | 29 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)); |
161 | 29 | VLOG_DEBUG << "segcompaction index. delete file " << idx_path; |
162 | 29 | RETURN_NOT_OK_STATUS_WITH_WARN( |
163 | 29 | fs->delete_file(idx_path), |
164 | 29 | strings::Substitute("Failed to delete file=$0", idx_path)); |
165 | 29 | } |
166 | | // Delete inverted index files |
167 | 207 | for (auto&& column : schema->columns()) { |
168 | 207 | if (const auto* index_info = schema->inverted_index(*column); index_info != nullptr) { |
169 | 58 | auto index_id = index_info->index_id(); |
170 | 58 | if (schema->get_inverted_index_storage_format() == |
171 | 58 | InvertedIndexStorageFormatPB::V1) { |
172 | 0 | auto idx_path = InvertedIndexDescriptor::get_index_file_path_v1( |
173 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), index_id, |
174 | 0 | index_info->get_index_suffix()); |
175 | 0 | VLOG_DEBUG << "segcompaction index. delete file " << idx_path; |
176 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN( |
177 | 0 | fs->delete_file(idx_path), |
178 | 0 | strings::Substitute("Failed to delete file=$0", idx_path)); |
179 | 0 | } |
180 | | // Erase the origin index file cache |
181 | 58 | auto idx_file_cache_key = InvertedIndexDescriptor::get_index_file_cache_key( |
182 | 58 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), index_id, |
183 | 58 | index_info->get_index_suffix()); |
184 | 58 | RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(idx_file_cache_key)); |
185 | 58 | } |
186 | 207 | } |
187 | 59 | } |
188 | 11 | return Status::OK(); |
189 | 11 | } |
190 | | |
191 | | Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat, |
192 | | Merger::Statistics& merger_stat, uint32_t begin, |
193 | 11 | uint32_t end) { |
194 | 11 | uint64_t raw_rows_read = reader_stat.raw_rows_read; /* total rows read before merge */ |
195 | 11 | uint64_t sum_src_row = 0; /* sum of rows in each involved source segments */ |
196 | 11 | uint64_t filtered_rows = merger_stat.filtered_rows; /* rows filtered by del conditions */ |
197 | 11 | uint64_t output_rows = merger_stat.output_rows; /* rows after merge */ |
198 | 11 | uint64_t merged_rows = merger_stat.merged_rows; /* dup key merged by unique/agg */ |
199 | | |
200 | 11 | { |
201 | 11 | std::lock_guard<std::mutex> lock(_writer->_segid_statistics_map_mutex); |
202 | 70 | for (int i = begin; i <= end; ++i) { |
203 | 59 | sum_src_row += _writer->_segid_statistics_map[i].row_num; |
204 | 59 | } |
205 | 11 | } |
206 | | |
207 | 11 | DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", { sum_src_row++; }); |
208 | 11 | if (raw_rows_read != sum_src_row) { |
209 | 0 | return Status::Error<CHECK_LINES_ERROR>( |
210 | 0 | "segcompaction read row num does not match source. expect read row:{}, actual read " |
211 | 0 | "row:{}", |
212 | 0 | sum_src_row, raw_rows_read); |
213 | 0 | } |
214 | | |
215 | 11 | DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", { merged_rows++; }); |
216 | 11 | if ((output_rows + merged_rows) != raw_rows_read) { |
217 | 0 | return Status::Error<CHECK_LINES_ERROR>( |
218 | 0 | "segcompaction total row num does not match after merge. expect total row:{}, " |
219 | 0 | "actual total row:{}, (output_rows:{},merged_rows:{})", |
220 | 0 | raw_rows_read, output_rows + merged_rows, output_rows, merged_rows); |
221 | 0 | } |
222 | 11 | DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_filtered_rows", |
223 | 11 | { filtered_rows++; }); |
224 | 11 | if (filtered_rows != 0) { |
225 | 0 | return Status::Error<CHECK_LINES_ERROR>( |
226 | 0 | "segcompaction should not have filtered rows but actual filtered rows:{}", |
227 | 0 | filtered_rows); |
228 | 0 | } |
229 | 11 | return Status::OK(); |
230 | 11 | } |
231 | | |
232 | | Status SegcompactionWorker::_create_segment_writer_for_segcompaction( |
233 | 11 | std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t begin, uint32_t end) { |
234 | 11 | return _writer->create_segment_writer_for_segcompaction(writer, begin, end); |
235 | 11 | } |
236 | | |
237 | 11 | Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) { |
238 | 11 | DCHECK(_seg_compact_mem_tracker != nullptr); |
239 | 11 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_seg_compact_mem_tracker); |
240 | | /* throttle segcompaction task if memory depleted */ |
241 | 11 | if (GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { |
242 | 0 | return Status::Error<FETCH_MEMORY_EXCEEDED>("skip segcompaction due to memory shortage"); |
243 | 0 | } |
244 | | |
245 | 11 | uint32_t begin = (*(segments->begin()))->id(); |
246 | 11 | uint32_t end = (*(segments->end() - 1))->id(); |
247 | 11 | uint64_t begin_time = GetCurrentTimeMicros(); |
248 | 11 | uint64_t index_size = 0; |
249 | 11 | uint64_t total_index_size = 0; |
250 | 11 | auto ctx = _writer->_context; |
251 | | |
252 | 11 | auto writer = _create_segcompaction_writer(begin, end); |
253 | 11 | if (UNLIKELY(writer == nullptr)) { |
254 | 0 | return Status::Error<SEGCOMPACTION_INIT_WRITER>("failed to get segcompaction writer"); |
255 | 0 | } |
256 | | |
257 | 11 | DCHECK(ctx.tablet); |
258 | 11 | auto tablet = std::static_pointer_cast<Tablet>(ctx.tablet); |
259 | 11 | if (need_convert_delete_bitmap() && _rowid_conversion == nullptr) { |
260 | 3 | _rowid_conversion = std::make_unique<SimpleRowIdConversion>(_writer->rowset_id()); |
261 | 3 | } |
262 | | |
263 | 11 | std::vector<std::vector<uint32_t>> column_groups; |
264 | 11 | std::vector<uint32_t> key_group_cluster_key_idxes; |
265 | 11 | Merger::vertical_split_columns(*ctx.tablet_schema, &column_groups, |
266 | 11 | &key_group_cluster_key_idxes); |
267 | 11 | vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(), |
268 | 11 | ReaderType::READER_SEGMENT_COMPACTION); |
269 | | |
270 | 11 | KeyBoundsPB key_bounds; |
271 | 11 | Merger::Statistics key_merger_stats; |
272 | 11 | OlapReaderStatistics key_reader_stats; |
273 | | /* compact group one by one */ |
274 | 33 | for (auto i = 0; i < column_groups.size(); ++i) { |
275 | 22 | VLOG_NOTICE << "row source size: " << row_sources_buf.total_size(); |
276 | 22 | bool is_key = (i == 0); |
277 | 22 | std::vector<uint32_t> column_ids = column_groups[i]; |
278 | | |
279 | 22 | writer->clear(); |
280 | 22 | RETURN_IF_ERROR(writer->init(column_ids, is_key)); |
281 | 22 | auto schema = std::make_shared<Schema>(ctx.tablet_schema->columns(), column_ids); |
282 | 22 | OlapReaderStatistics reader_stats; |
283 | 22 | std::unique_ptr<vectorized::VerticalBlockReader> reader; |
284 | 22 | auto s = _get_segcompaction_reader(segments, tablet, schema, &reader_stats, row_sources_buf, |
285 | 22 | is_key, column_ids, &reader); |
286 | 22 | if (UNLIKELY(reader == nullptr || !s.ok())) { |
287 | 0 | return Status::Error<SEGCOMPACTION_INIT_READER>( |
288 | 0 | "failed to get segcompaction reader. err: {}", s.to_string()); |
289 | 0 | } |
290 | | |
291 | 22 | Merger::Statistics merger_stats; |
292 | 22 | RETURN_IF_ERROR(Merger::vertical_compact_one_group( |
293 | 22 | tablet->tablet_id(), ReaderType::READER_SEGMENT_COMPACTION, *ctx.tablet_schema, |
294 | 22 | is_key, column_ids, &row_sources_buf, *reader, *writer, &merger_stats, &index_size, |
295 | 22 | key_bounds, _rowid_conversion.get())); |
296 | 22 | total_index_size += index_size; |
297 | 22 | if (is_key) { |
298 | 11 | RETURN_IF_ERROR(row_sources_buf.flush()); |
299 | 11 | key_merger_stats = merger_stats; |
300 | 11 | key_reader_stats = reader_stats; |
301 | 11 | } |
302 | 22 | RETURN_IF_ERROR(row_sources_buf.seek_to_begin()); |
303 | 22 | } |
304 | | |
305 | | /* check row num after merge/aggregation */ |
306 | 11 | RETURN_NOT_OK_STATUS_WITH_WARN( |
307 | 11 | _check_correctness(key_reader_stats, key_merger_stats, begin, end), |
308 | 11 | "check correctness failed"); |
309 | 11 | { |
310 | 11 | std::lock_guard<std::mutex> lock(_writer->_segid_statistics_map_mutex); |
311 | 11 | _writer->_clear_statistics_for_deleting_segments_unsafe(begin, end); |
312 | 11 | } |
313 | 11 | RETURN_IF_ERROR( |
314 | 11 | _writer->flush_segment_writer_for_segcompaction(&writer, total_index_size, key_bounds)); |
315 | | |
316 | 11 | if (_file_writer != nullptr && _file_writer->state() != io::FileWriter::State::CLOSED) { |
317 | 11 | RETURN_IF_ERROR(_file_writer->close()); |
318 | 11 | } |
319 | | |
320 | 11 | RETURN_IF_ERROR(_delete_original_segments(begin, end)); |
321 | 11 | if (_rowid_conversion != nullptr) { |
322 | 4 | convert_segment_delete_bitmap(ctx.mow_context->delete_bitmap, begin, end, |
323 | 4 | _writer->_num_segcompacted); |
324 | 4 | } |
325 | 11 | RETURN_IF_ERROR(_writer->_rename_compacted_segments(begin, end)); |
326 | 11 | if (_inverted_index_file_writer != nullptr) { |
327 | 7 | _inverted_index_file_writer.reset(); |
328 | 7 | } |
329 | 11 | if (VLOG_DEBUG_IS_ON) { |
330 | 0 | _writer->vlog_buffer.clear(); |
331 | 0 | for (const auto& entry : std::filesystem::directory_iterator(ctx.tablet_path)) { |
332 | 0 | fmt::format_to(_writer->vlog_buffer, "[{}]", string(entry.path())); |
333 | 0 | } |
334 | 0 | VLOG_DEBUG << "tablet_id:" << ctx.tablet_id << " rowset_id:" << ctx.rowset_id |
335 | 0 | << "_segcompacted_point:" << _writer->_segcompacted_point |
336 | 0 | << " _num_segment:" << _writer->_num_segment |
337 | 0 | << " _num_segcompacted:" << _writer->_num_segcompacted |
338 | 0 | << " list directory:" << fmt::to_string(_writer->vlog_buffer); |
339 | 0 | } |
340 | | |
341 | 11 | _writer->_segcompacted_point += (end - begin + 1); |
342 | | |
343 | 11 | uint64_t elapsed = GetCurrentTimeMicros() - begin_time; |
344 | 11 | LOG(INFO) << "segcompaction completed. tablet_id:" << ctx.tablet_id |
345 | 11 | << " rowset_id:" << ctx.rowset_id << " elapsed time:" << elapsed |
346 | 11 | << "us. update segcompacted_point:" << _writer->_segcompacted_point |
347 | 11 | << " segment num:" << segments->size() << " begin:" << begin << " end:" << end; |
348 | | |
349 | 11 | return Status::OK(); |
350 | 11 | } |
351 | | |
352 | 11 | void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segments) { |
353 | 11 | Status status = Status::OK(); |
354 | 11 | if (_is_compacting_state_mutable.exchange(false)) { |
355 | 11 | status = _do_compact_segments(segments); |
356 | 11 | } else { |
357 | | // note: be aware that _writer maybe released when the task is cancelled |
358 | 0 | LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task"; |
359 | 0 | return; |
360 | 0 | } |
361 | 11 | if (!status.ok()) { |
362 | 0 | int16_t errcode = status.code(); |
363 | 0 | switch (errcode) { |
364 | 0 | case FETCH_MEMORY_EXCEEDED: |
365 | 0 | case SEGCOMPACTION_INIT_READER: |
366 | 0 | case SEGCOMPACTION_INIT_WRITER: |
367 | 0 | LOG(WARNING) << "segcompaction failed, try next time:" << status; |
368 | 0 | break; |
369 | 0 | default: |
370 | 0 | auto ctx = _writer->_context; |
371 | 0 | LOG(WARNING) << "segcompaction fatal, terminating the write job." |
372 | 0 | << " tablet_id:" << ctx.tablet_id << " rowset_id:" << ctx.rowset_id |
373 | 0 | << " status:" << status; |
374 | | // status will be checked by the next trigger of segcompaction or the final wait |
375 | 0 | _writer->_segcompaction_status.store(ErrorCode::INTERNAL_ERROR); |
376 | 0 | } |
377 | 0 | } |
378 | 11 | DCHECK_EQ(_writer->_is_doing_segcompaction, true); |
379 | 11 | { |
380 | 11 | std::lock_guard lk(_writer->_is_doing_segcompaction_lock); |
381 | 11 | _writer->_is_doing_segcompaction = false; |
382 | 11 | _writer->_segcompacting_cond.notify_all(); |
383 | 11 | } |
384 | 11 | _is_compacting_state_mutable = true; |
385 | 11 | } |
386 | | |
387 | 84 | bool SegcompactionWorker::need_convert_delete_bitmap() { |
388 | 84 | if (_writer == nullptr) { |
389 | 0 | return false; |
390 | 0 | } |
391 | 84 | auto tablet = _writer->context().tablet; |
392 | 84 | return tablet != nullptr && tablet->keys_type() == KeysType::UNIQUE_KEYS && |
393 | 84 | tablet->enable_unique_key_merge_on_write() && |
394 | 84 | tablet->tablet_schema()->has_sequence_col(); |
395 | 84 | } |
396 | | |
397 | | void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, |
398 | 22 | uint32_t src_seg_id, uint32_t dest_seg_id) { |
399 | | // lazy init |
400 | 22 | if (nullptr == _converted_delete_bitmap) { |
401 | 1 | _converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id); |
402 | 1 | } |
403 | 22 | auto rowset_id = _writer->context().rowset_id; |
404 | 22 | const auto* seg_map = |
405 | 22 | src_delete_bitmap->get({rowset_id, src_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}); |
406 | 22 | if (seg_map != nullptr) { |
407 | 15 | _converted_delete_bitmap->set({rowset_id, dest_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, |
408 | 15 | *seg_map); |
409 | 15 | } |
410 | 22 | } |
411 | | |
412 | | void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, |
413 | | uint32_t src_begin, uint32_t src_end, |
414 | 4 | uint32_t dst_seg_id) { |
415 | | // lazy init |
416 | 4 | if (nullptr == _converted_delete_bitmap) { |
417 | 3 | _converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id); |
418 | 3 | } |
419 | 4 | auto rowset_id = _writer->context().rowset_id; |
420 | 4 | RowLocation src(rowset_id, 0, 0); |
421 | 34 | for (uint32_t seg_id = src_begin; seg_id <= src_end; seg_id++) { |
422 | 30 | const auto* seg_map = |
423 | 30 | src_delete_bitmap->get({rowset_id, seg_id, DeleteBitmap::TEMP_VERSION_COMMON}); |
424 | 30 | if (!seg_map) { |
425 | 6 | continue; |
426 | 6 | } |
427 | 24 | src.segment_id = seg_id; |
428 | 57.3k | for (unsigned int row_id : *seg_map) { |
429 | 57.3k | src.row_id = row_id; |
430 | 57.3k | auto dst_row_id = _rowid_conversion->get(src); |
431 | 57.3k | if (dst_row_id < 0) { |
432 | 5.22k | continue; |
433 | 5.22k | } |
434 | 52.1k | _converted_delete_bitmap->add( |
435 | 52.1k | {rowset_id, dst_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, dst_row_id); |
436 | 52.1k | } |
437 | 24 | } |
438 | 4 | } |
439 | | |
440 | 179 | bool SegcompactionWorker::cancel() { |
441 | | // return true if the task is canncellable (actual compaction is not started) |
442 | | // return false when the task is not cancellable (it is in the middle of segcompaction) |
443 | 179 | return _is_compacting_state_mutable.exchange(false); |
444 | 179 | } |
445 | | |
446 | | } // namespace doris |