/root/doris/be/src/olap/rowset/segcompaction.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "segcompaction.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/olap_file.pb.h> |
22 | | #include <limits.h> |
23 | | |
24 | | #include <algorithm> |
25 | | #include <atomic> |
26 | | #include <condition_variable> |
27 | | #include <filesystem> |
28 | | #include <map> |
29 | | #include <memory> |
30 | | #include <mutex> |
31 | | #include <sstream> |
32 | | #include <string> |
33 | | #include <utility> |
34 | | |
35 | | #include "beta_rowset_writer.h" |
36 | | #include "common/compiler_util.h" // IWYU pragma: keep |
37 | | #include "common/logging.h" |
38 | | #include "gutil/stringprintf.h" |
39 | | #include "gutil/strings/substitute.h" |
40 | | #include "io/fs/file_system.h" |
41 | | #include "io/fs/file_writer.h" |
42 | | #include "io/io_common.h" |
43 | | #include "olap/data_dir.h" |
44 | | #include "olap/iterators.h" |
45 | | #include "olap/merger.h" |
46 | | #include "olap/olap_common.h" |
47 | | #include "olap/olap_define.h" |
48 | | #include "olap/rowset/beta_rowset.h" |
49 | | #include "olap/rowset/rowset_meta.h" |
50 | | #include "olap/rowset/rowset_writer_context.h" |
51 | | #include "olap/rowset/segment_v2/inverted_index_cache.h" |
52 | | #include "olap/rowset/segment_v2/inverted_index_desc.h" |
53 | | #include "olap/rowset/segment_v2/segment.h" |
54 | | #include "olap/rowset/segment_v2/segment_writer.h" |
55 | | #include "olap/schema.h" |
56 | | #include "olap/storage_engine.h" |
57 | | #include "olap/tablet_reader.h" |
58 | | #include "olap/tablet_schema.h" |
59 | | #include "runtime/memory/global_memory_arbitrator.h" |
60 | | #include "runtime/thread_context.h" |
61 | | #include "util/debug_points.h" |
62 | | #include "util/mem_info.h" |
63 | | #include "util/time.h" |
64 | | #include "vec/olap/vertical_block_reader.h" |
65 | | #include "vec/olap/vertical_merge_iterator.h" |
66 | | |
67 | | namespace doris { |
68 | | using namespace ErrorCode; |
69 | | |
70 | 799 | SegcompactionWorker::SegcompactionWorker(BetaRowsetWriter* writer) : _writer(writer) {} |
71 | | |
72 | 799 | void SegcompactionWorker::init_mem_tracker(const RowsetWriterContext& rowset_writer_context) { |
73 | 799 | _seg_compact_mem_tracker = MemTrackerLimiter::create_shared( |
74 | 799 | MemTrackerLimiter::Type::COMPACTION, |
75 | 799 | fmt::format("segcompaction-txnID_{}-loadID_{}-tabletID_{}-indexID_{}-" |
76 | 799 | "partitionID_{}-version_{}", |
77 | 799 | std::to_string(rowset_writer_context.txn_id), |
78 | 799 | print_id(rowset_writer_context.load_id), |
79 | 799 | std::to_string(rowset_writer_context.tablet_id), |
80 | 799 | std::to_string(rowset_writer_context.index_id), |
81 | 799 | std::to_string(rowset_writer_context.partition_id), |
82 | 799 | rowset_writer_context.version.to_string())); |
83 | 799 | } |
84 | | |
85 | | Status SegcompactionWorker::_get_segcompaction_reader( |
86 | | SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet, |
87 | | std::shared_ptr<Schema> schema, OlapReaderStatistics* stat, |
88 | | vectorized::RowSourcesBuffer& row_sources_buf, bool is_key, |
89 | | std::vector<uint32_t>& return_columns, std::vector<uint32_t>& key_group_cluster_key_idxes, |
90 | 22 | std::unique_ptr<vectorized::VerticalBlockReader>* reader) { |
91 | 22 | const auto& ctx = _writer->_context; |
92 | 22 | bool record_rowids = need_convert_delete_bitmap() && is_key; |
93 | 22 | StorageReadOptions read_options; |
94 | 22 | read_options.stats = stat; |
95 | 22 | read_options.use_page_cache = false; |
96 | 22 | read_options.tablet_schema = ctx.tablet_schema; |
97 | 22 | read_options.record_rowids = record_rowids; |
98 | 22 | if (!tablet->tablet_schema()->cluster_key_uids().empty()) { |
99 | 0 | DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id()); |
100 | 0 | RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments(ctx.rowset_id, *segments, |
101 | 0 | delete_bitmap)); |
102 | 0 | for (auto& seg_ptr : *segments) { |
103 | 0 | auto d = delete_bitmap->get_agg( |
104 | 0 | {ctx.rowset_id, seg_ptr->id(), DeleteBitmap::TEMP_VERSION_COMMON}); |
105 | 0 | if (d->isEmpty()) { |
106 | 0 | continue; // Empty delete bitmap for the segment |
107 | 0 | } |
108 | 0 | read_options.delete_bitmap.emplace(seg_ptr->id(), std::move(d)); |
109 | 0 | } |
110 | 0 | } |
111 | 22 | std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; |
112 | 22 | std::map<uint32_t, uint32_t> segment_rows; |
113 | 118 | for (auto& seg_ptr : *segments) { |
114 | 118 | std::unique_ptr<RowwiseIterator> iter; |
115 | 118 | auto s = seg_ptr->new_iterator(schema, read_options, &iter); |
116 | 118 | if (!s.ok()) { |
117 | 0 | return Status::Error<INIT_FAILED>("failed to create iterator[{}]: {}", seg_ptr->id(), |
118 | 0 | s.to_string()); |
119 | 0 | } |
120 | 118 | seg_iterators.push_back(std::move(iter)); |
121 | 118 | segment_rows.emplace(seg_ptr->id(), seg_ptr->num_rows()); |
122 | 118 | } |
123 | 22 | if (record_rowids && _rowid_conversion != nullptr) { |
124 | 4 | _rowid_conversion->reset_segment_map(segment_rows); |
125 | 4 | } |
126 | | |
127 | 22 | *reader = std::make_unique<vectorized::VerticalBlockReader>(&row_sources_buf); |
128 | | |
129 | 22 | TabletReader::ReaderParams reader_params; |
130 | 22 | reader_params.is_segcompaction = true; |
131 | 22 | reader_params.segment_iters_ptr = &seg_iterators; |
132 | | // no reader_params.version shouldn't break segcompaction |
133 | 22 | reader_params.tablet_schema = ctx.tablet_schema; |
134 | 22 | reader_params.tablet = tablet; |
135 | 22 | reader_params.return_columns = return_columns; |
136 | 22 | reader_params.is_key_column_group = is_key; |
137 | 22 | reader_params.use_page_cache = false; |
138 | 22 | reader_params.record_rowids = record_rowids; |
139 | 22 | reader_params.key_group_cluster_key_idxes = key_group_cluster_key_idxes; |
140 | 22 | return (*reader)->init(reader_params, nullptr); |
141 | 22 | } |
142 | | |
143 | | std::unique_ptr<segment_v2::SegmentWriter> SegcompactionWorker::_create_segcompaction_writer( |
144 | 11 | uint32_t begin, uint32_t end) { |
145 | 11 | Status status; |
146 | 11 | std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr; |
147 | 11 | status = _create_segment_writer_for_segcompaction(&writer, begin, end); |
148 | 11 | if (!status.ok() || writer == nullptr) { |
149 | 0 | LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end |
150 | 0 | << " status:" << status; |
151 | 0 | return nullptr; |
152 | 11 | } else { |
153 | 11 | return writer; |
154 | 11 | } |
155 | 11 | } |
156 | | |
157 | 11 | Status SegcompactionWorker::_delete_original_segments(uint32_t begin, uint32_t end) { |
158 | 11 | DCHECK(_writer->rowset_meta()->is_local()); |
159 | | |
160 | 11 | const auto& fs = io::global_local_filesystem(); |
161 | 11 | auto ctx = _writer->_context; |
162 | 11 | auto schema = ctx.tablet_schema; |
163 | | |
164 | 70 | for (uint32_t i = begin; i <= end; ++i) { |
165 | 59 | auto seg_path = local_segment_path(ctx.tablet_path, ctx.rowset_id.to_string(), i); |
166 | | // Even if an error is encountered, these files that have not been cleaned up |
167 | | // will be cleaned up by the GC background. So here we only print the error |
168 | | // message when we encounter an error. |
169 | 59 | RETURN_NOT_OK_STATUS_WITH_WARN(fs->delete_file(seg_path), |
170 | 59 | strings::Substitute("Failed to delete file=$0", seg_path)); |
171 | 59 | if (schema->has_inverted_index() && |
172 | 59 | schema->get_inverted_index_storage_format() >= InvertedIndexStorageFormatPB::V2) { |
173 | 29 | auto idx_path = InvertedIndexDescriptor::get_index_file_path_v2( |
174 | 29 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)); |
175 | 29 | VLOG_DEBUG << "segcompaction index. delete file " << idx_path; |
176 | 29 | RETURN_NOT_OK_STATUS_WITH_WARN( |
177 | 29 | fs->delete_file(idx_path), |
178 | 29 | strings::Substitute("Failed to delete file=$0", idx_path)); |
179 | 29 | } |
180 | | // Delete inverted index files |
181 | 207 | for (auto&& column : schema->columns()) { |
182 | 207 | if (const auto* index_info = schema->inverted_index(*column); index_info != nullptr) { |
183 | 58 | auto index_id = index_info->index_id(); |
184 | 58 | if (schema->get_inverted_index_storage_format() == |
185 | 58 | InvertedIndexStorageFormatPB::V1) { |
186 | 0 | auto idx_path = InvertedIndexDescriptor::get_index_file_path_v1( |
187 | 0 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), index_id, |
188 | 0 | index_info->get_index_suffix()); |
189 | 0 | VLOG_DEBUG << "segcompaction index. delete file " << idx_path; |
190 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN( |
191 | 0 | fs->delete_file(idx_path), |
192 | 0 | strings::Substitute("Failed to delete file=$0", idx_path)); |
193 | 0 | } |
194 | | // Erase the origin index file cache |
195 | 58 | auto idx_file_cache_key = InvertedIndexDescriptor::get_index_file_cache_key( |
196 | 58 | InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), index_id, |
197 | 58 | index_info->get_index_suffix()); |
198 | 58 | RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(idx_file_cache_key)); |
199 | 58 | } |
200 | 207 | } |
201 | 59 | } |
202 | 11 | return Status::OK(); |
203 | 11 | } |
204 | | |
205 | | Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat, |
206 | | Merger::Statistics& merger_stat, uint32_t begin, |
207 | 11 | uint32_t end, bool is_mow_with_cluster_keys) { |
208 | 11 | uint64_t raw_rows_read = reader_stat.raw_rows_read; /* total rows read before merge */ |
209 | 11 | uint64_t rows_del_by_bitmap = reader_stat.rows_del_by_bitmap; |
210 | 11 | uint64_t sum_src_row = 0; /* sum of rows in each involved source segments */ |
211 | 11 | uint64_t filtered_rows = merger_stat.filtered_rows; /* rows filtered by del conditions */ |
212 | 11 | uint64_t output_rows = merger_stat.output_rows; /* rows after merge */ |
213 | 11 | uint64_t merged_rows = merger_stat.merged_rows; /* dup key merged by unique/agg */ |
214 | | |
215 | 11 | { |
216 | 11 | std::lock_guard<std::mutex> lock(_writer->_segid_statistics_map_mutex); |
217 | 70 | for (int i = begin; i <= end; ++i) { |
218 | 59 | sum_src_row += _writer->_segid_statistics_map[i].row_num; |
219 | 59 | } |
220 | 11 | } |
221 | | |
222 | 11 | DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", { sum_src_row++; }); |
223 | 11 | uint64_t raw_rows = raw_rows_read; |
224 | 11 | if (is_mow_with_cluster_keys) { |
225 | 0 | raw_rows += rows_del_by_bitmap; |
226 | 0 | } |
227 | 11 | if (raw_rows != sum_src_row) { |
228 | 0 | return Status::Error<CHECK_LINES_ERROR>( |
229 | 0 | "segcompaction read row num does not match source. expect read row:{}, actual read " |
230 | 0 | "row:{}(raw_rows_read: {}, rows_del_by_bitmap: {})", |
231 | 0 | sum_src_row, raw_rows, raw_rows_read, rows_del_by_bitmap); |
232 | 0 | } |
233 | | |
234 | 11 | DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", { merged_rows++; }); |
235 | 11 | if ((output_rows + merged_rows) != raw_rows_read) { |
236 | 0 | return Status::Error<CHECK_LINES_ERROR>( |
237 | 0 | "segcompaction total row num does not match after merge. expect total row:{}, " |
238 | 0 | "actual total row:{}, (output_rows:{},merged_rows:{})", |
239 | 0 | raw_rows_read, output_rows + merged_rows, output_rows, merged_rows); |
240 | 0 | } |
241 | 11 | DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_filtered_rows", |
242 | 11 | { filtered_rows++; }); |
243 | 11 | if (filtered_rows != 0) { |
244 | 0 | return Status::Error<CHECK_LINES_ERROR>( |
245 | 0 | "segcompaction should not have filtered rows but actual filtered rows:{}", |
246 | 0 | filtered_rows); |
247 | 0 | } |
248 | 11 | return Status::OK(); |
249 | 11 | } |
250 | | |
251 | | Status SegcompactionWorker::_create_segment_writer_for_segcompaction( |
252 | 11 | std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t begin, uint32_t end) { |
253 | 11 | return _writer->create_segment_writer_for_segcompaction(writer, begin, end); |
254 | 11 | } |
255 | | |
256 | 11 | Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) { |
257 | 11 | DCHECK(_seg_compact_mem_tracker != nullptr); |
258 | 11 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_seg_compact_mem_tracker); |
259 | | /* throttle segcompaction task if memory depleted */ |
260 | 11 | if (GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { |
261 | 0 | return Status::Error<FETCH_MEMORY_EXCEEDED>("skip segcompaction due to memory shortage"); |
262 | 0 | } |
263 | | |
264 | 11 | uint32_t begin = (*(segments->begin()))->id(); |
265 | 11 | uint32_t end = (*(segments->end() - 1))->id(); |
266 | 11 | uint64_t begin_time = GetCurrentTimeMicros(); |
267 | 11 | uint64_t index_size = 0; |
268 | 11 | uint64_t total_index_size = 0; |
269 | 11 | auto ctx = _writer->_context; |
270 | | |
271 | 11 | auto writer = _create_segcompaction_writer(begin, end); |
272 | 11 | if (UNLIKELY(writer == nullptr)) { |
273 | 0 | return Status::Error<SEGCOMPACTION_INIT_WRITER>("failed to get segcompaction writer"); |
274 | 0 | } |
275 | | |
276 | 11 | DCHECK(ctx.tablet); |
277 | 11 | auto tablet = std::static_pointer_cast<Tablet>(ctx.tablet); |
278 | 11 | if (need_convert_delete_bitmap() && _rowid_conversion == nullptr) { |
279 | 3 | _rowid_conversion = std::make_unique<SimpleRowIdConversion>(_writer->rowset_id()); |
280 | 3 | } |
281 | | |
282 | 11 | std::vector<std::vector<uint32_t>> column_groups; |
283 | 11 | std::vector<uint32_t> key_group_cluster_key_idxes; |
284 | 11 | Merger::vertical_split_columns(*ctx.tablet_schema, &column_groups, |
285 | 11 | &key_group_cluster_key_idxes); |
286 | 11 | vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(), |
287 | 11 | ReaderType::READER_SEGMENT_COMPACTION); |
288 | | |
289 | 11 | KeyBoundsPB key_bounds; |
290 | 11 | Merger::Statistics key_merger_stats; |
291 | 11 | OlapReaderStatistics key_reader_stats; |
292 | | /* compact group one by one */ |
293 | 33 | for (auto i = 0; i < column_groups.size(); ++i) { |
294 | 22 | VLOG_NOTICE << "row source size: " << row_sources_buf.total_size(); |
295 | 22 | bool is_key = (i == 0); |
296 | 22 | std::vector<uint32_t> column_ids = column_groups[i]; |
297 | | |
298 | 22 | writer->clear(); |
299 | 22 | RETURN_IF_ERROR(writer->init(column_ids, is_key)); |
300 | 22 | auto schema = std::make_shared<Schema>(ctx.tablet_schema->columns(), column_ids); |
301 | 22 | OlapReaderStatistics reader_stats; |
302 | 22 | std::unique_ptr<vectorized::VerticalBlockReader> reader; |
303 | 22 | auto s = |
304 | 22 | _get_segcompaction_reader(segments, tablet, schema, &reader_stats, row_sources_buf, |
305 | 22 | is_key, column_ids, key_group_cluster_key_idxes, &reader); |
306 | 22 | if (UNLIKELY(reader == nullptr || !s.ok())) { |
307 | 0 | return Status::Error<SEGCOMPACTION_INIT_READER>( |
308 | 0 | "failed to get segcompaction reader. err: {}", s.to_string()); |
309 | 0 | } |
310 | | |
311 | 22 | Merger::Statistics merger_stats; |
312 | 22 | RETURN_IF_ERROR(Merger::vertical_compact_one_group( |
313 | 22 | tablet->tablet_id(), ReaderType::READER_SEGMENT_COMPACTION, *ctx.tablet_schema, |
314 | 22 | is_key, column_ids, &row_sources_buf, *reader, *writer, &merger_stats, &index_size, |
315 | 22 | key_bounds, _rowid_conversion.get())); |
316 | 22 | total_index_size += index_size; |
317 | 22 | if (is_key) { |
318 | 11 | RETURN_IF_ERROR(row_sources_buf.flush()); |
319 | 11 | key_merger_stats = merger_stats; |
320 | 11 | key_reader_stats = reader_stats; |
321 | 11 | } |
322 | 22 | RETURN_IF_ERROR(row_sources_buf.seek_to_begin()); |
323 | 22 | } |
324 | | |
325 | | /* check row num after merge/aggregation */ |
326 | 11 | bool is_mow_with_cluster_keys = !tablet->tablet_schema()->cluster_key_uids().empty(); |
327 | 11 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_correctness(key_reader_stats, key_merger_stats, begin, |
328 | 11 | end, is_mow_with_cluster_keys), |
329 | 11 | "check correctness failed"); |
330 | 11 | { |
331 | 11 | std::lock_guard<std::mutex> lock(_writer->_segid_statistics_map_mutex); |
332 | 11 | _writer->_clear_statistics_for_deleting_segments_unsafe(begin, end); |
333 | 11 | } |
334 | 11 | RETURN_IF_ERROR( |
335 | 11 | _writer->flush_segment_writer_for_segcompaction(&writer, total_index_size, key_bounds)); |
336 | | |
337 | 11 | if (_file_writer != nullptr && _file_writer->state() != io::FileWriter::State::CLOSED) { |
338 | 11 | RETURN_IF_ERROR(_file_writer->close()); |
339 | 11 | } |
340 | | |
341 | 11 | RETURN_IF_ERROR(_delete_original_segments(begin, end)); |
342 | 11 | if (_rowid_conversion != nullptr) { |
343 | 4 | convert_segment_delete_bitmap(ctx.mow_context->delete_bitmap, begin, end, |
344 | 4 | _writer->_num_segcompacted); |
345 | 4 | } |
346 | 11 | RETURN_IF_ERROR(_writer->_rename_compacted_segments(begin, end)); |
347 | 11 | if (_inverted_index_file_writer != nullptr) { |
348 | 7 | _inverted_index_file_writer.reset(); |
349 | 7 | } |
350 | 11 | if (VLOG_DEBUG_IS_ON) { |
351 | 0 | _writer->vlog_buffer.clear(); |
352 | 0 | for (const auto& entry : std::filesystem::directory_iterator(ctx.tablet_path)) { |
353 | 0 | fmt::format_to(_writer->vlog_buffer, "[{}]", string(entry.path())); |
354 | 0 | } |
355 | 0 | VLOG_DEBUG << "tablet_id:" << ctx.tablet_id << " rowset_id:" << ctx.rowset_id |
356 | 0 | << "_segcompacted_point:" << _writer->_segcompacted_point |
357 | 0 | << " _num_segment:" << _writer->_num_segment |
358 | 0 | << " _num_segcompacted:" << _writer->_num_segcompacted |
359 | 0 | << " list directory:" << fmt::to_string(_writer->vlog_buffer); |
360 | 0 | } |
361 | | |
362 | 11 | _writer->_segcompacted_point += (end - begin + 1); |
363 | | |
364 | 11 | uint64_t elapsed = GetCurrentTimeMicros() - begin_time; |
365 | 11 | LOG(INFO) << "segcompaction completed. tablet_id:" << ctx.tablet_id |
366 | 11 | << " rowset_id:" << ctx.rowset_id << " elapsed time:" << elapsed |
367 | 11 | << "us. update segcompacted_point:" << _writer->_segcompacted_point |
368 | 11 | << " segment num:" << segments->size() << " begin:" << begin << " end:" << end; |
369 | | |
370 | 11 | return Status::OK(); |
371 | 11 | } |
372 | | |
373 | 11 | void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segments) { |
374 | 11 | Status status = Status::OK(); |
375 | 11 | if (_is_compacting_state_mutable.exchange(false)) { |
376 | 11 | status = _do_compact_segments(segments); |
377 | 11 | } else { |
378 | | // note: be aware that _writer maybe released when the task is cancelled |
379 | 0 | LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task"; |
380 | 0 | return; |
381 | 0 | } |
382 | 11 | if (!status.ok()) { |
383 | 0 | int16_t errcode = status.code(); |
384 | 0 | switch (errcode) { |
385 | 0 | case FETCH_MEMORY_EXCEEDED: |
386 | 0 | case SEGCOMPACTION_INIT_READER: |
387 | 0 | case SEGCOMPACTION_INIT_WRITER: |
388 | 0 | LOG(WARNING) << "segcompaction failed, try next time:" << status; |
389 | 0 | break; |
390 | 0 | default: |
391 | 0 | auto ctx = _writer->_context; |
392 | 0 | LOG(WARNING) << "segcompaction fatal, terminating the write job." |
393 | 0 | << " tablet_id:" << ctx.tablet_id << " rowset_id:" << ctx.rowset_id |
394 | 0 | << " status:" << status; |
395 | | // status will be checked by the next trigger of segcompaction or the final wait |
396 | 0 | _writer->_segcompaction_status.store(ErrorCode::INTERNAL_ERROR); |
397 | 0 | } |
398 | 0 | } |
399 | 11 | DCHECK_EQ(_writer->_is_doing_segcompaction, true); |
400 | 11 | { |
401 | 11 | std::lock_guard lk(_writer->_is_doing_segcompaction_lock); |
402 | 11 | _writer->_is_doing_segcompaction = false; |
403 | 11 | _writer->_segcompacting_cond.notify_all(); |
404 | 11 | } |
405 | 11 | _is_compacting_state_mutable = true; |
406 | 11 | } |
407 | | |
408 | 84 | bool SegcompactionWorker::need_convert_delete_bitmap() { |
409 | 84 | if (_writer == nullptr) { |
410 | 0 | return false; |
411 | 0 | } |
412 | 84 | auto tablet = _writer->context().tablet; |
413 | 84 | return tablet != nullptr && tablet->keys_type() == KeysType::UNIQUE_KEYS && |
414 | 84 | tablet->enable_unique_key_merge_on_write() && |
415 | 84 | tablet->tablet_schema()->has_sequence_col(); |
416 | 84 | } |
417 | | |
418 | | void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, |
419 | 22 | uint32_t src_seg_id, uint32_t dest_seg_id) { |
420 | | // lazy init |
421 | 22 | if (nullptr == _converted_delete_bitmap) { |
422 | 1 | _converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id); |
423 | 1 | } |
424 | 22 | auto rowset_id = _writer->context().rowset_id; |
425 | 22 | const auto* seg_map = |
426 | 22 | src_delete_bitmap->get({rowset_id, src_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}); |
427 | 22 | if (seg_map != nullptr) { |
428 | 15 | _converted_delete_bitmap->set({rowset_id, dest_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, |
429 | 15 | *seg_map); |
430 | 15 | } |
431 | 22 | } |
432 | | |
433 | | void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, |
434 | | uint32_t src_begin, uint32_t src_end, |
435 | 4 | uint32_t dst_seg_id) { |
436 | | // lazy init |
437 | 4 | if (nullptr == _converted_delete_bitmap) { |
438 | 3 | _converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id); |
439 | 3 | } |
440 | 4 | auto rowset_id = _writer->context().rowset_id; |
441 | 4 | RowLocation src(rowset_id, 0, 0); |
442 | 34 | for (uint32_t seg_id = src_begin; seg_id <= src_end; seg_id++) { |
443 | 30 | const auto* seg_map = |
444 | 30 | src_delete_bitmap->get({rowset_id, seg_id, DeleteBitmap::TEMP_VERSION_COMMON}); |
445 | 30 | if (!seg_map) { |
446 | 6 | continue; |
447 | 6 | } |
448 | 24 | src.segment_id = seg_id; |
449 | 57.3k | for (unsigned int row_id : *seg_map) { |
450 | 57.3k | src.row_id = row_id; |
451 | 57.3k | auto dst_row_id = _rowid_conversion->get(src); |
452 | 57.3k | if (dst_row_id < 0) { |
453 | 5.26k | continue; |
454 | 5.26k | } |
455 | 52.0k | _converted_delete_bitmap->add( |
456 | 52.0k | {rowset_id, dst_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, dst_row_id); |
457 | 52.0k | } |
458 | 24 | } |
459 | 4 | } |
460 | | |
461 | 461 | bool SegcompactionWorker::cancel() { |
462 | | // return true if the task is canncellable (actual compaction is not started) |
463 | | // return false when the task is not cancellable (it is in the middle of segcompaction) |
464 | 461 | return _is_compacting_state_mutable.exchange(false); |
465 | 461 | } |
466 | | |
467 | | } // namespace doris |