/root/doris/be/src/olap/rowset/segcompaction.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "segcompaction.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/olap_file.pb.h> |
22 | | #include <limits.h> |
23 | | |
24 | | #include <algorithm> |
25 | | #include <atomic> |
26 | | #include <condition_variable> |
27 | | #include <filesystem> |
28 | | #include <map> |
29 | | #include <memory> |
30 | | #include <mutex> |
31 | | #include <sstream> |
32 | | #include <string> |
33 | | #include <utility> |
34 | | |
35 | | #include "beta_rowset_writer.h" |
36 | | #include "common/compiler_util.h" // IWYU pragma: keep |
37 | | #include "common/logging.h" |
38 | | #include "gutil/stringprintf.h" |
39 | | #include "gutil/strings/substitute.h" |
40 | | #include "io/fs/file_system.h" |
41 | | #include "io/fs/file_writer.h" |
42 | | #include "io/io_common.h" |
43 | | #include "olap/data_dir.h" |
44 | | #include "olap/iterators.h" |
45 | | #include "olap/merger.h" |
46 | | #include "olap/olap_common.h" |
47 | | #include "olap/olap_define.h" |
48 | | #include "olap/rowset/beta_rowset.h" |
49 | | #include "olap/rowset/rowset_meta.h" |
50 | | #include "olap/rowset/rowset_writer_context.h" |
51 | | #include "olap/rowset/segment_v2/inverted_index_cache.h" |
52 | | #include "olap/rowset/segment_v2/inverted_index_desc.h" |
53 | | #include "olap/rowset/segment_v2/segment.h" |
54 | | #include "olap/rowset/segment_v2/segment_writer.h" |
55 | | #include "olap/schema.h" |
56 | | #include "olap/storage_engine.h" |
57 | | #include "olap/tablet_reader.h" |
58 | | #include "olap/tablet_schema.h" |
59 | | #include "runtime/memory/global_memory_arbitrator.h" |
60 | | #include "runtime/thread_context.h" |
61 | | #include "util/debug_points.h" |
62 | | #include "util/mem_info.h" |
63 | | #include "util/time.h" |
64 | | #include "vec/olap/vertical_block_reader.h" |
65 | | #include "vec/olap/vertical_merge_iterator.h" |
66 | | |
67 | | namespace doris { |
68 | | using namespace ErrorCode; |
69 | | |
70 | 447 | SegcompactionWorker::SegcompactionWorker(BetaRowsetWriter* writer) : _writer(writer) {} |
71 | | |
72 | | Status SegcompactionWorker::_get_segcompaction_reader( |
73 | | SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet, |
74 | | std::shared_ptr<Schema> schema, OlapReaderStatistics* stat, |
75 | | vectorized::RowSourcesBuffer& row_sources_buf, bool is_key, |
76 | | std::vector<uint32_t>& return_columns, |
77 | 22 | std::unique_ptr<vectorized::VerticalBlockReader>* reader) { |
78 | 22 | auto ctx = _writer->_context; |
79 | 22 | bool record_rowids = need_convert_delete_bitmap() && is_key; |
80 | 22 | StorageReadOptions read_options; |
81 | 22 | read_options.stats = stat; |
82 | 22 | read_options.use_page_cache = false; |
83 | 22 | read_options.tablet_schema = ctx.tablet_schema; |
84 | 22 | read_options.record_rowids = record_rowids; |
85 | 22 | std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; |
86 | 22 | std::map<uint32_t, uint32_t> segment_rows; |
87 | 118 | for (auto& seg_ptr : *segments) { |
88 | 118 | std::unique_ptr<RowwiseIterator> iter; |
89 | 118 | auto s = seg_ptr->new_iterator(schema, read_options, &iter); |
90 | 118 | if (!s.ok()) { |
91 | 0 | return Status::Error<INIT_FAILED>("failed to create iterator[{}]: {}", seg_ptr->id(), |
92 | 0 | s.to_string()); |
93 | 0 | } |
94 | 118 | seg_iterators.push_back(std::move(iter)); |
95 | 118 | segment_rows.emplace(seg_ptr->id(), seg_ptr->num_rows()); |
96 | 118 | } |
97 | 22 | if (record_rowids && _rowid_conversion != nullptr) { |
98 | 4 | _rowid_conversion->reset_segment_map(segment_rows); |
99 | 4 | } |
100 | | |
101 | 22 | *reader = std::unique_ptr<vectorized::VerticalBlockReader> { |
102 | 22 | new vectorized::VerticalBlockReader(&row_sources_buf)}; |
103 | | |
104 | 22 | TabletReader::ReaderParams reader_params; |
105 | 22 | reader_params.is_segcompaction = true; |
106 | 22 | reader_params.segment_iters_ptr = &seg_iterators; |
107 | | // no reader_params.version shouldn't break segcompaction |
108 | 22 | reader_params.tablet_schema = ctx.tablet_schema; |
109 | 22 | reader_params.tablet = tablet; |
110 | 22 | reader_params.return_columns = return_columns; |
111 | 22 | reader_params.is_key_column_group = is_key; |
112 | 22 | reader_params.use_page_cache = false; |
113 | 22 | reader_params.record_rowids = record_rowids; |
114 | 22 | return (*reader)->init(reader_params, nullptr); |
115 | 22 | } |
116 | | |
117 | | std::unique_ptr<segment_v2::SegmentWriter> SegcompactionWorker::_create_segcompaction_writer( |
118 | 11 | uint32_t begin, uint32_t end) { |
119 | 11 | Status status; |
120 | 11 | std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; |
121 | 11 | status = _create_segment_writer_for_segcompaction(&writer, begin, end); |
122 | 11 | if (!status.ok() || writer == nullptr) { |
123 | 0 | LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end |
124 | 0 | << " status:" << status; |
125 | 0 | return nullptr; |
126 | 11 | } else { |
127 | 11 | return writer; |
128 | 11 | } |
129 | 11 | } |
130 | | |
131 | 11 | Status SegcompactionWorker::_delete_original_segments(uint32_t begin, uint32_t end) { |
132 | 11 | auto fs = _writer->_rowset_meta->fs(); |
133 | 11 | auto ctx = _writer->_context; |
134 | 11 | auto schema = ctx.tablet_schema; |
135 | 11 | if (!fs) { |
136 | 0 | return Status::Error<INIT_FAILED>( |
137 | 0 | "SegcompactionWorker::_delete_original_segments get fs failed"); |
138 | 0 | } |
139 | 70 | for (uint32_t i = begin; i <= end; ++i) { |
140 | 59 | auto seg_path = BetaRowset::segment_file_path(ctx.rowset_dir, ctx.rowset_id, i); |
141 | | // Even if an error is encountered, these files that have not been cleaned up |
142 | | // will be cleaned up by the GC background. So here we only print the error |
143 | | // message when we encounter an error. |
144 | 59 | RETURN_NOT_OK_STATUS_WITH_WARN(fs->delete_file(seg_path), |
145 | 59 | strings::Substitute("Failed to delete file=$0", seg_path)); |
146 | 59 | if (schema->has_inverted_index() && |
147 | 59 | schema->get_inverted_index_storage_format() != InvertedIndexStorageFormatPB::V1) { |
148 | 0 | auto idx_path = InvertedIndexDescriptor::get_index_file_name(seg_path); |
149 | 0 | VLOG_DEBUG << "segcompaction index. delete file " << idx_path; |
150 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN( |
151 | 0 | fs->delete_file(idx_path), |
152 | 0 | strings::Substitute("Failed to delete file=$0", idx_path)); |
153 | 0 | } |
154 | | // Delete inverted index files |
155 | 207 | for (auto column : schema->columns()) { |
156 | 207 | if (schema->has_inverted_index(*column)) { |
157 | 0 | auto index_info = schema->get_inverted_index(*column); |
158 | 0 | auto index_id = index_info->index_id(); |
159 | 0 | auto idx_path = InvertedIndexDescriptor::inverted_index_file_path( |
160 | 0 | ctx.rowset_dir, ctx.rowset_id, i, index_id, index_info->get_index_suffix()); |
161 | 0 | VLOG_DEBUG << "segcompaction index. delete file " << idx_path; |
162 | 0 | if (schema->get_inverted_index_storage_format() == |
163 | 0 | InvertedIndexStorageFormatPB::V1) { |
164 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN( |
165 | 0 | fs->delete_file(idx_path), |
166 | 0 | strings::Substitute("Failed to delete file=$0", idx_path)); |
167 | 0 | } |
168 | | // Erase the origin index file cache |
169 | 0 | RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(idx_path)); |
170 | 0 | } |
171 | 207 | } |
172 | 59 | } |
173 | 11 | return Status::OK(); |
174 | 11 | } |
175 | | |
176 | | Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat, |
177 | | Merger::Statistics& merger_stat, uint32_t begin, |
178 | 11 | uint32_t end) { |
179 | 11 | uint64_t raw_rows_read = reader_stat.raw_rows_read; /* total rows read before merge */ |
180 | 11 | uint64_t sum_src_row = 0; /* sum of rows in each involved source segments */ |
181 | 11 | uint64_t filtered_rows = merger_stat.filtered_rows; /* rows filtered by del conditions */ |
182 | 11 | uint64_t output_rows = merger_stat.output_rows; /* rows after merge */ |
183 | 11 | uint64_t merged_rows = merger_stat.merged_rows; /* dup key merged by unique/agg */ |
184 | | |
185 | 11 | { |
186 | 11 | std::lock_guard<std::mutex> lock(_writer->_segid_statistics_map_mutex); |
187 | 70 | for (int i = begin; i <= end; ++i) { |
188 | 59 | sum_src_row += _writer->_segid_statistics_map[i].row_num; |
189 | 59 | } |
190 | 11 | } |
191 | | |
192 | 11 | DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", { sum_src_row++; }); |
193 | 11 | if (raw_rows_read != sum_src_row) { |
194 | 0 | return Status::Error<CHECK_LINES_ERROR>( |
195 | 0 | "segcompaction read row num does not match source. expect read row:{}, actual read " |
196 | 0 | "row:{}", |
197 | 0 | sum_src_row, raw_rows_read); |
198 | 0 | } |
199 | | |
200 | 11 | DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", { merged_rows++; }); |
201 | 11 | if ((output_rows + merged_rows) != raw_rows_read) { |
202 | 0 | return Status::Error<CHECK_LINES_ERROR>( |
203 | 0 | "segcompaction total row num does not match after merge. expect total row:{}, " |
204 | 0 | "actual total row:{}, (output_rows:{},merged_rows:{})", |
205 | 0 | raw_rows_read, output_rows + merged_rows, output_rows, merged_rows); |
206 | 0 | } |
207 | 11 | DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_filtered_rows", |
208 | 11 | { filtered_rows++; }); |
209 | 11 | if (filtered_rows != 0) { |
210 | 0 | return Status::Error<CHECK_LINES_ERROR>( |
211 | 0 | "segcompaction should not have filtered rows but actual filtered rows:{}", |
212 | 0 | filtered_rows); |
213 | 0 | } |
214 | 11 | return Status::OK(); |
215 | 11 | } |
216 | | |
217 | | Status SegcompactionWorker::_create_segment_writer_for_segcompaction( |
218 | 11 | std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t begin, uint32_t end) { |
219 | 11 | return _writer->_create_segment_writer_for_segcompaction(writer, begin, end); |
220 | 11 | } |
221 | | |
222 | 11 | Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) { |
223 | 11 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->segcompaction_mem_tracker()); |
224 | | /* throttle segcompaction task if memory depleted */ |
225 | 11 | if (GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { |
226 | 0 | return Status::Error<FETCH_MEMORY_EXCEEDED>("skip segcompaction due to memory shortage"); |
227 | 0 | } |
228 | | |
229 | 11 | uint32_t begin = (*(segments->begin()))->id(); |
230 | 11 | uint32_t end = (*(segments->end() - 1))->id(); |
231 | 11 | uint64_t begin_time = GetCurrentTimeMicros(); |
232 | 11 | uint64_t index_size = 0; |
233 | 11 | uint64_t total_index_size = 0; |
234 | 11 | auto ctx = _writer->_context; |
235 | | |
236 | 11 | auto writer = _create_segcompaction_writer(begin, end); |
237 | 11 | if (UNLIKELY(writer == nullptr)) { |
238 | 0 | return Status::Error<SEGCOMPACTION_INIT_WRITER>("failed to get segcompaction writer"); |
239 | 0 | } |
240 | | |
241 | 11 | DCHECK(ctx.tablet); |
242 | 11 | auto tablet = std::static_pointer_cast<Tablet>(ctx.tablet); |
243 | 11 | if (need_convert_delete_bitmap() && _rowid_conversion == nullptr) { |
244 | 3 | _rowid_conversion = std::make_unique<SimpleRowIdConversion>(_writer->rowset_id()); |
245 | 3 | } |
246 | | |
247 | 11 | std::vector<std::vector<uint32_t>> column_groups; |
248 | 11 | Merger::vertical_split_columns(ctx.tablet_schema, &column_groups); |
249 | 11 | vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(), |
250 | 11 | ReaderType::READER_SEGMENT_COMPACTION); |
251 | | |
252 | 11 | KeyBoundsPB key_bounds; |
253 | 11 | Merger::Statistics key_merger_stats; |
254 | 11 | OlapReaderStatistics key_reader_stats; |
255 | | /* compact group one by one */ |
256 | 33 | for (auto i = 0; i < column_groups.size(); ++i) { |
257 | 22 | VLOG_NOTICE << "row source size: " << row_sources_buf.total_size(); |
258 | 22 | bool is_key = (i == 0); |
259 | 22 | std::vector<uint32_t> column_ids = column_groups[i]; |
260 | | |
261 | 22 | writer->clear(); |
262 | 22 | RETURN_IF_ERROR(writer->init(column_ids, is_key)); |
263 | 22 | auto schema = std::make_shared<Schema>(ctx.tablet_schema->columns(), column_ids); |
264 | 22 | OlapReaderStatistics reader_stats; |
265 | 22 | std::unique_ptr<vectorized::VerticalBlockReader> reader; |
266 | 22 | auto s = _get_segcompaction_reader(segments, tablet, schema, &reader_stats, row_sources_buf, |
267 | 22 | is_key, column_ids, &reader); |
268 | 22 | if (UNLIKELY(reader == nullptr || !s.ok())) { |
269 | 0 | return Status::Error<SEGCOMPACTION_INIT_READER>( |
270 | 0 | "failed to get segcompaction reader. err: {}", s.to_string()); |
271 | 0 | } |
272 | | |
273 | 22 | Merger::Statistics merger_stats; |
274 | 22 | RETURN_IF_ERROR(Merger::vertical_compact_one_group( |
275 | 22 | tablet, ReaderType::READER_SEGMENT_COMPACTION, ctx.tablet_schema, is_key, |
276 | 22 | column_ids, &row_sources_buf, *reader, *writer, &merger_stats, &index_size, |
277 | 22 | key_bounds, _rowid_conversion.get())); |
278 | 22 | total_index_size += index_size; |
279 | 22 | if (is_key) { |
280 | 11 | RETURN_IF_ERROR(row_sources_buf.flush()); |
281 | 11 | key_merger_stats = merger_stats; |
282 | 11 | key_reader_stats = reader_stats; |
283 | 11 | } |
284 | 22 | RETURN_IF_ERROR(row_sources_buf.seek_to_begin()); |
285 | 22 | } |
286 | | |
287 | | /* check row num after merge/aggregation */ |
288 | 11 | RETURN_NOT_OK_STATUS_WITH_WARN( |
289 | 11 | _check_correctness(key_reader_stats, key_merger_stats, begin, end), |
290 | 11 | "check correctness failed"); |
291 | 11 | { |
292 | 11 | std::lock_guard<std::mutex> lock(_writer->_segid_statistics_map_mutex); |
293 | 11 | _writer->_clear_statistics_for_deleting_segments_unsafe(begin, end); |
294 | 11 | } |
295 | 11 | RETURN_IF_ERROR( |
296 | 11 | _writer->flush_segment_writer_for_segcompaction(&writer, total_index_size, key_bounds)); |
297 | | |
298 | 11 | if (_file_writer != nullptr) { |
299 | 11 | RETURN_IF_ERROR(_file_writer->close()); |
300 | 11 | } |
301 | | |
302 | 11 | RETURN_IF_ERROR(_delete_original_segments(begin, end)); |
303 | 11 | if (_rowid_conversion != nullptr) { |
304 | 4 | convert_segment_delete_bitmap(ctx.mow_context->delete_bitmap, begin, end, |
305 | 4 | _writer->_num_segcompacted); |
306 | 4 | } |
307 | 11 | RETURN_IF_ERROR(_writer->_rename_compacted_segments(begin, end)); |
308 | | |
309 | 11 | if (VLOG_DEBUG_IS_ON) { |
310 | 0 | _writer->vlog_buffer.clear(); |
311 | 0 | for (const auto& entry : std::filesystem::directory_iterator(ctx.rowset_dir)) { |
312 | 0 | fmt::format_to(_writer->vlog_buffer, "[{}]", string(entry.path())); |
313 | 0 | } |
314 | 0 | VLOG_DEBUG << "tablet_id:" << ctx.tablet_id << " rowset_id:" << ctx.rowset_id |
315 | 0 | << "_segcompacted_point:" << _writer->_segcompacted_point |
316 | 0 | << " _num_segment:" << _writer->_num_segment |
317 | 0 | << " _num_segcompacted:" << _writer->_num_segcompacted |
318 | 0 | << " list directory:" << fmt::to_string(_writer->vlog_buffer); |
319 | 0 | } |
320 | | |
321 | 11 | _writer->_segcompacted_point += (end - begin + 1); |
322 | | |
323 | 11 | uint64_t elapsed = GetCurrentTimeMicros() - begin_time; |
324 | 11 | LOG(INFO) << "segcompaction completed. tablet_id:" << ctx.tablet_id |
325 | 11 | << " rowset_id:" << ctx.rowset_id << " elapsed time:" << elapsed |
326 | 11 | << "us. update segcompacted_point:" << _writer->_segcompacted_point |
327 | 11 | << " segment num:" << segments->size() << " begin:" << begin << " end:" << end; |
328 | | |
329 | 11 | return Status::OK(); |
330 | 11 | } |
331 | | |
332 | 11 | void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segments) { |
333 | 11 | Status status = Status::OK(); |
334 | 11 | if (_is_compacting_state_mutable.exchange(false)) { |
335 | 11 | status = _do_compact_segments(segments); |
336 | 11 | } else { |
337 | | // note: be aware that _writer maybe released when the task is cancelled |
338 | 0 | LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task"; |
339 | 0 | return; |
340 | 0 | } |
341 | 11 | if (!status.ok()) { |
342 | 0 | int16_t errcode = status.code(); |
343 | 0 | switch (errcode) { |
344 | 0 | case FETCH_MEMORY_EXCEEDED: |
345 | 0 | case SEGCOMPACTION_INIT_READER: |
346 | 0 | case SEGCOMPACTION_INIT_WRITER: |
347 | 0 | LOG(WARNING) << "segcompaction failed, try next time:" << status; |
348 | 0 | break; |
349 | 0 | default: |
350 | 0 | auto ctx = _writer->_context; |
351 | 0 | LOG(WARNING) << "segcompaction fatal, terminating the write job." |
352 | 0 | << " tablet_id:" << ctx.tablet_id << " rowset_id:" << ctx.rowset_id |
353 | 0 | << " status:" << status; |
354 | | // status will be checked by the next trigger of segcompaction or the final wait |
355 | 0 | _writer->_segcompaction_status.store(ErrorCode::INTERNAL_ERROR); |
356 | 0 | } |
357 | 0 | } |
358 | 11 | DCHECK_EQ(_writer->_is_doing_segcompaction, true); |
359 | 11 | { |
360 | 11 | std::lock_guard lk(_writer->_is_doing_segcompaction_lock); |
361 | 11 | _writer->_is_doing_segcompaction = false; |
362 | 11 | _writer->_segcompacting_cond.notify_all(); |
363 | 11 | } |
364 | 11 | _is_compacting_state_mutable = true; |
365 | 11 | } |
366 | | |
367 | 84 | bool SegcompactionWorker::need_convert_delete_bitmap() { |
368 | 84 | if (_writer == nullptr) { |
369 | 0 | return false; |
370 | 0 | } |
371 | 84 | auto tablet = _writer->context().tablet; |
372 | 84 | return tablet != nullptr && tablet->keys_type() == KeysType::UNIQUE_KEYS && |
373 | 84 | tablet->enable_unique_key_merge_on_write() && |
374 | 84 | tablet->tablet_schema()->has_sequence_col(); |
375 | 84 | } |
376 | | |
377 | | void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, |
378 | 22 | uint32_t src_seg_id, uint32_t dest_seg_id) { |
379 | | // lazy init |
380 | 22 | if (nullptr == _converted_delete_bitmap) { |
381 | 1 | _converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id); |
382 | 1 | } |
383 | 22 | auto rowset_id = _writer->context().rowset_id; |
384 | 22 | const auto* seg_map = |
385 | 22 | src_delete_bitmap->get({rowset_id, src_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}); |
386 | 22 | if (seg_map != nullptr) { |
387 | 15 | _converted_delete_bitmap->set({rowset_id, dest_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, |
388 | 15 | *seg_map); |
389 | 15 | } |
390 | 22 | } |
391 | | |
392 | | void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, |
393 | | uint32_t src_begin, uint32_t src_end, |
394 | 4 | uint32_t dst_seg_id) { |
395 | | // lazy init |
396 | 4 | if (nullptr == _converted_delete_bitmap) { |
397 | 3 | _converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id); |
398 | 3 | } |
399 | 4 | auto rowset_id = _writer->context().rowset_id; |
400 | 4 | RowLocation src(rowset_id, 0, 0); |
401 | 34 | for (uint32_t seg_id = src_begin; seg_id <= src_end; seg_id++) { |
402 | 30 | const auto* seg_map = |
403 | 30 | src_delete_bitmap->get({rowset_id, seg_id, DeleteBitmap::TEMP_VERSION_COMMON}); |
404 | 30 | if (!seg_map) { |
405 | 6 | continue; |
406 | 6 | } |
407 | 24 | src.segment_id = seg_id; |
408 | 57.3k | for (unsigned int row_id : *seg_map) { |
409 | 57.3k | src.row_id = row_id; |
410 | 57.3k | auto dst_row_id = _rowid_conversion->get(src); |
411 | 57.3k | if (dst_row_id < 0) { |
412 | 5.25k | continue; |
413 | 5.25k | } |
414 | 52.0k | _converted_delete_bitmap->add( |
415 | 52.0k | {rowset_id, dst_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, dst_row_id); |
416 | 52.0k | } |
417 | 24 | } |
418 | 4 | } |
419 | | |
420 | 439 | bool SegcompactionWorker::cancel() { |
421 | | // return true if the task is canncellable (actual compaction is not started) |
422 | | // return false when the task is not cancellable (it is in the middle of segcompaction) |
423 | 439 | return _is_compacting_state_mutable.exchange(false); |
424 | 439 | } |
425 | | |
426 | | } // namespace doris |