Coverage Report

Created: 2026-04-01 13:27

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/segcompaction.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/rowset/segcompaction.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/olap_file.pb.h>
22
#include <limits.h>
23
24
#include <algorithm>
25
#include <atomic>
26
#include <condition_variable>
27
#include <filesystem>
28
#include <map>
29
#include <memory>
30
#include <mutex>
31
#include <sstream>
32
#include <string>
33
#include <utility>
34
35
#include "absl/strings/substitute.h"
36
#include "common/compiler_util.h" // IWYU pragma: keep
37
#include "common/config.h"
38
#include "common/logging.h"
39
#include "io/fs/file_system.h"
40
#include "io/fs/file_writer.h"
41
#include "io/io_common.h"
42
#include "runtime/memory/global_memory_arbitrator.h"
43
#include "runtime/thread_context.h"
44
#include "storage/data_dir.h"
45
#include "storage/index/inverted/inverted_index_cache.h"
46
#include "storage/index/inverted/inverted_index_desc.h"
47
#include "storage/iterator/vertical_block_reader.h"
48
#include "storage/iterator/vertical_merge_iterator.h"
49
#include "storage/iterators.h"
50
#include "storage/merger.h"
51
#include "storage/olap_common.h"
52
#include "storage/olap_define.h"
53
#include "storage/rowset/beta_rowset.h"
54
#include "storage/rowset/beta_rowset_writer.h"
55
#include "storage/rowset/rowset_meta.h"
56
#include "storage/rowset/rowset_writer_context.h"
57
#include "storage/schema.h"
58
#include "storage/segment/segment.h"
59
#include "storage/segment/segment_writer.h"
60
#include "storage/storage_engine.h"
61
#include "storage/tablet/tablet_reader.h"
62
#include "storage/tablet/tablet_schema.h"
63
#include "util/debug_points.h"
64
#include "util/mem_info.h"
65
#include "util/time.h"
66
67
namespace doris {
68
#include "common/compile_check_begin.h"
69
using namespace ErrorCode;
70
71
1.97k
SegcompactionWorker::SegcompactionWorker(BetaRowsetWriter* writer) : _writer(writer) {}
72
73
1.97k
void SegcompactionWorker::init_mem_tracker(const RowsetWriterContext& rowset_writer_context) {
74
1.97k
    _seg_compact_mem_tracker = MemTrackerLimiter::create_shared(
75
1.97k
            MemTrackerLimiter::Type::COMPACTION,
76
1.97k
            fmt::format("segcompaction-txnID_{}-loadID_{}-tabletID_{}-indexID_{}-"
77
1.97k
                        "partitionID_{}-version_{}",
78
1.97k
                        std::to_string(rowset_writer_context.txn_id),
79
1.97k
                        print_id(rowset_writer_context.load_id),
80
1.97k
                        std::to_string(rowset_writer_context.tablet_id),
81
1.97k
                        std::to_string(rowset_writer_context.index_id),
82
1.97k
                        std::to_string(rowset_writer_context.partition_id),
83
1.97k
                        rowset_writer_context.version.to_string()));
84
1.97k
}
85
86
Status SegcompactionWorker::_get_segcompaction_reader(
87
        SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet,
88
        std::shared_ptr<Schema> schema, OlapReaderStatistics* stat,
89
        RowSourcesBuffer& row_sources_buf, bool is_key, std::vector<uint32_t>& return_columns,
90
        std::vector<uint32_t>& key_group_cluster_key_idxes,
91
44
        std::unique_ptr<VerticalBlockReader>* reader) {
92
44
    const auto& ctx = _writer->_context;
93
44
    bool record_rowids = need_convert_delete_bitmap() && is_key;
94
44
    StorageReadOptions read_options;
95
44
    read_options.stats = stat;
96
44
    read_options.use_page_cache = false;
97
44
    read_options.tablet_schema = ctx.tablet_schema;
98
44
    read_options.record_rowids = record_rowids;
99
44
    if (!tablet->tablet_schema()->cluster_key_uids().empty()) {
100
0
        DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
101
0
        RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments(
102
0
                ctx.tablet_schema, ctx.rowset_id, *segments, delete_bitmap));
103
0
        for (auto& seg_ptr : *segments) {
104
0
            auto d = delete_bitmap->get_agg(
105
0
                    {ctx.rowset_id, seg_ptr->id(), DeleteBitmap::TEMP_VERSION_COMMON});
106
0
            if (d->isEmpty()) {
107
0
                continue; // Empty delete bitmap for the segment
108
0
            }
109
0
            read_options.delete_bitmap.emplace(seg_ptr->id(), std::move(d));
110
0
        }
111
0
    }
112
44
    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
113
44
    std::map<uint32_t, uint32_t> segment_rows;
114
236
    for (auto& seg_ptr : *segments) {
115
236
        std::unique_ptr<RowwiseIterator> iter;
116
236
        auto s = seg_ptr->new_iterator(schema, read_options, &iter);
117
236
        if (!s.ok()) {
118
0
            return Status::Error<INIT_FAILED>("failed to create iterator[{}]: {}", seg_ptr->id(),
119
0
                                              s.to_string());
120
0
        }
121
236
        seg_iterators.push_back(std::move(iter));
122
236
        segment_rows.emplace(seg_ptr->id(), seg_ptr->num_rows());
123
236
    }
124
44
    if (record_rowids && _rowid_conversion != nullptr) {
125
8
        _rowid_conversion->reset_segment_map(segment_rows);
126
8
    }
127
128
44
    *reader = std::make_unique<VerticalBlockReader>(&row_sources_buf);
129
130
44
    TabletReader::ReaderParams reader_params;
131
44
    reader_params.is_segcompaction = true;
132
44
    reader_params.segment_iters_ptr = &seg_iterators;
133
    // no reader_params.version shouldn't break segcompaction
134
44
    reader_params.tablet_schema = ctx.tablet_schema;
135
44
    reader_params.tablet = tablet;
136
44
    reader_params.return_columns = return_columns;
137
44
    reader_params.is_key_column_group = is_key;
138
44
    reader_params.use_page_cache = false;
139
44
    reader_params.record_rowids = record_rowids;
140
44
    reader_params.key_group_cluster_key_idxes = key_group_cluster_key_idxes;
141
44
    return (*reader)->init(reader_params, nullptr);
142
44
}
143
144
std::unique_ptr<segment_v2::SegmentWriter> SegcompactionWorker::_create_segcompaction_writer(
145
22
        uint32_t begin, uint32_t end) {
146
22
    Status status;
147
22
    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
148
22
    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
149
22
    if (!status.ok() || writer == nullptr) {
150
0
        LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end
151
0
                   << " status:" << status;
152
0
        return nullptr;
153
22
    } else {
154
22
        return writer;
155
22
    }
156
22
}
157
158
22
Status SegcompactionWorker::_delete_original_segments(uint32_t begin, uint32_t end) {
159
22
    DCHECK(_writer->rowset_meta()->is_local());
160
161
22
    const auto& fs = io::global_local_filesystem();
162
22
    auto ctx = _writer->_context;
163
22
    auto schema = ctx.tablet_schema;
164
165
140
    for (uint32_t i = begin; i <= end; ++i) {
166
118
        auto seg_path = local_segment_path(ctx.tablet_path, ctx.rowset_id.to_string(), i);
167
        // Even if an error is encountered, these files that have not been cleaned up
168
        // will be cleaned up by the GC background. So here we only print the error
169
        // message when we encounter an error.
170
118
        RETURN_NOT_OK_STATUS_WITH_WARN(fs->delete_file(seg_path),
171
118
                                       absl::Substitute("Failed to delete file=$0", seg_path));
172
118
        if ((schema->has_inverted_index() || schema->has_ann_index()) &&
173
118
            schema->get_inverted_index_storage_format() >= InvertedIndexStorageFormatPB::V2) {
174
58
            auto idx_path = InvertedIndexDescriptor::get_index_file_path_v2(
175
58
                    InvertedIndexDescriptor::get_index_file_path_prefix(seg_path));
176
58
            VLOG_DEBUG << "segcompaction index. delete file " << idx_path;
177
58
            RETURN_NOT_OK_STATUS_WITH_WARN(fs->delete_file(idx_path),
178
58
                                           absl::Substitute("Failed to delete file=$0", idx_path));
179
58
        }
180
        // Delete inverted index files
181
414
        for (auto&& column : schema->columns()) {
182
414
            auto index_infos = schema->inverted_indexs(*column);
183
414
            for (const auto& index_info : index_infos) {
184
116
                auto index_id = index_info->index_id();
185
116
                if (schema->get_inverted_index_storage_format() ==
186
116
                    InvertedIndexStorageFormatPB::V1) {
187
0
                    auto idx_path = InvertedIndexDescriptor::get_index_file_path_v1(
188
0
                            InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), index_id,
189
0
                            index_info->get_index_suffix());
190
0
                    VLOG_DEBUG << "segcompaction index. delete file " << idx_path;
191
0
                    RETURN_NOT_OK_STATUS_WITH_WARN(
192
0
                            fs->delete_file(idx_path),
193
0
                            absl::Substitute("Failed to delete file=$0", idx_path));
194
0
                }
195
                // Erase the origin index file cache
196
116
                auto idx_file_cache_key = InvertedIndexDescriptor::get_index_file_cache_key(
197
116
                        InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), index_id,
198
116
                        index_info->get_index_suffix());
199
116
                RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(idx_file_cache_key));
200
116
            }
201
414
        }
202
118
    }
203
22
    return Status::OK();
204
22
}
205
206
Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat,
207
                                               Merger::Statistics& merger_stat, uint32_t begin,
208
22
                                               uint32_t end, bool is_mow_with_cluster_keys) {
209
22
    uint64_t raw_rows_read = reader_stat.raw_rows_read; /* total rows read before merge */
210
22
    uint64_t rows_del_by_bitmap = reader_stat.rows_del_by_bitmap;
211
22
    uint64_t sum_src_row = 0; /* sum of rows in each involved source segments */
212
22
    uint64_t filtered_rows = merger_stat.filtered_rows; /* rows filtered by del conditions */
213
22
    uint64_t output_rows = merger_stat.output_rows;     /* rows after merge */
214
22
    uint64_t merged_rows = merger_stat.merged_rows;     /* dup key merged by unique/agg */
215
216
22
    {
217
22
        std::lock_guard<std::mutex> lock(_writer->_segid_statistics_map_mutex);
218
140
        for (int i = begin; i <= end; ++i) {
219
118
            sum_src_row += _writer->_segid_statistics_map[i].row_num;
220
118
        }
221
22
    }
222
223
22
    DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", { sum_src_row++; });
224
22
    uint64_t raw_rows = raw_rows_read;
225
22
    if (is_mow_with_cluster_keys) {
226
0
        raw_rows += rows_del_by_bitmap;
227
0
    }
228
22
    if (raw_rows != sum_src_row) {
229
0
        return Status::Error<CHECK_LINES_ERROR>(
230
0
                "segcompaction read row num does not match source. expect read row:{}, actual read "
231
0
                "row:{}(raw_rows_read: {}, rows_del_by_bitmap: {})",
232
0
                sum_src_row, raw_rows, raw_rows_read, rows_del_by_bitmap);
233
0
    }
234
235
22
    DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", { merged_rows++; });
236
22
    if ((output_rows + merged_rows) != raw_rows_read) {
237
0
        return Status::Error<CHECK_LINES_ERROR>(
238
0
                "segcompaction total row num does not match after merge. expect total row:{},  "
239
0
                "actual total row:{}, (output_rows:{},merged_rows:{})",
240
0
                raw_rows_read, output_rows + merged_rows, output_rows, merged_rows);
241
0
    }
242
22
    DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_filtered_rows",
243
22
                    { filtered_rows++; });
244
22
    if (filtered_rows != 0) {
245
0
        return Status::Error<CHECK_LINES_ERROR>(
246
0
                "segcompaction should not have filtered rows but actual filtered rows:{}",
247
0
                filtered_rows);
248
0
    }
249
22
    return Status::OK();
250
22
}
251
252
Status SegcompactionWorker::_create_segment_writer_for_segcompaction(
253
22
        std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t begin, uint32_t end) {
254
22
    return _writer->create_segment_writer_for_segcompaction(writer, begin, end);
255
22
}
256
257
22
Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) {
258
22
    DCHECK(_seg_compact_mem_tracker != nullptr);
259
22
    SCOPED_ATTACH_TASK(_seg_compact_mem_tracker);
260
    /* throttle segcompaction task if memory depleted */
261
22
    if (GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
262
0
        return Status::Error<FETCH_MEMORY_EXCEEDED>("skip segcompaction due to memory shortage");
263
0
    }
264
265
22
    uint32_t begin = (*(segments->begin()))->id();
266
22
    uint32_t end = (*(segments->end() - 1))->id();
267
22
    uint64_t begin_time = GetCurrentTimeMicros();
268
22
    uint64_t index_size = 0;
269
22
    uint64_t total_index_size = 0;
270
22
    auto ctx = _writer->_context;
271
272
22
    auto writer = _create_segcompaction_writer(begin, end);
273
22
    if (UNLIKELY(writer == nullptr)) {
274
0
        return Status::Error<SEGCOMPACTION_INIT_WRITER>("failed to get segcompaction writer");
275
0
    }
276
277
22
    DCHECK(ctx.tablet);
278
22
    auto tablet = std::static_pointer_cast<Tablet>(ctx.tablet);
279
22
    if (need_convert_delete_bitmap() && _rowid_conversion == nullptr) {
280
6
        _rowid_conversion = std::make_unique<SimpleRowIdConversion>(_writer->rowset_id());
281
6
    }
282
283
22
    std::vector<std::vector<uint32_t>> column_groups;
284
22
    std::vector<uint32_t> key_group_cluster_key_idxes;
285
    // If BE config vertical_compaction_num_columns_per_group has been modified from
286
    // its default value (5), use the BE config; otherwise use the tablet meta value.
287
22
    constexpr int32_t default_num_columns_per_group = 5;
288
22
    int32_t num_columns_per_group =
289
22
            config::vertical_compaction_num_columns_per_group != default_num_columns_per_group
290
22
                    ? config::vertical_compaction_num_columns_per_group
291
22
                    : tablet->tablet_meta()->vertical_compaction_num_columns_per_group();
292
22
    Merger::vertical_split_columns(*ctx.tablet_schema, &column_groups, &key_group_cluster_key_idxes,
293
22
                                   num_columns_per_group);
294
22
    RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(),
295
22
                                     ReaderType::READER_SEGMENT_COMPACTION);
296
297
22
    KeyBoundsPB key_bounds;
298
22
    Merger::Statistics key_merger_stats;
299
22
    OlapReaderStatistics key_reader_stats;
300
    /* compact group one by one */
301
66
    for (auto i = 0; i < column_groups.size(); ++i) {
302
44
        VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
303
44
        bool is_key = (i == 0);
304
44
        std::vector<uint32_t> column_ids = column_groups[i];
305
306
44
        writer->clear();
307
44
        RETURN_IF_ERROR(writer->init(column_ids, is_key));
308
44
        auto schema = std::make_shared<Schema>(ctx.tablet_schema->columns(), column_ids);
309
44
        OlapReaderStatistics reader_stats;
310
44
        std::unique_ptr<VerticalBlockReader> reader;
311
44
        auto s =
312
44
                _get_segcompaction_reader(segments, tablet, schema, &reader_stats, row_sources_buf,
313
44
                                          is_key, column_ids, key_group_cluster_key_idxes, &reader);
314
44
        if (UNLIKELY(reader == nullptr || !s.ok())) {
315
0
            return Status::Error<SEGCOMPACTION_INIT_READER>(
316
0
                    "failed to get segcompaction reader. err: {}", s.to_string());
317
0
        }
318
319
44
        Merger::Statistics merger_stats;
320
44
        RETURN_IF_ERROR(Merger::vertical_compact_one_group(
321
44
                tablet->tablet_id(), ReaderType::READER_SEGMENT_COMPACTION, *ctx.tablet_schema,
322
44
                is_key, column_ids, &row_sources_buf, *reader, *writer, &merger_stats, &index_size,
323
44
                key_bounds, _rowid_conversion.get()));
324
44
        total_index_size += index_size;
325
44
        if (is_key) {
326
22
            RETURN_IF_ERROR(row_sources_buf.flush());
327
22
            key_merger_stats = merger_stats;
328
22
            key_reader_stats = reader_stats;
329
22
        }
330
44
        RETURN_IF_ERROR(row_sources_buf.seek_to_begin());
331
44
    }
332
333
    /* check row num after merge/aggregation */
334
22
    bool is_mow_with_cluster_keys = !tablet->tablet_schema()->cluster_key_uids().empty();
335
22
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_correctness(key_reader_stats, key_merger_stats, begin,
336
22
                                                      end, is_mow_with_cluster_keys),
337
22
                                   "check correctness failed");
338
22
    {
339
22
        std::lock_guard<std::mutex> lock(_writer->_segid_statistics_map_mutex);
340
22
        _writer->_clear_statistics_for_deleting_segments_unsafe(begin, end);
341
22
    }
342
22
    RETURN_IF_ERROR(
343
22
            _writer->flush_segment_writer_for_segcompaction(&writer, total_index_size, key_bounds));
344
345
22
    if (_file_writer != nullptr && _file_writer->state() != io::FileWriter::State::CLOSED) {
346
22
        RETURN_IF_ERROR(_file_writer->close());
347
22
    }
348
349
22
    RETURN_IF_ERROR(_delete_original_segments(begin, end));
350
22
    if (_rowid_conversion != nullptr) {
351
8
        convert_segment_delete_bitmap(ctx.mow_context->delete_bitmap, begin, end,
352
8
                                      _writer->_num_segcompacted);
353
8
    }
354
22
    RETURN_IF_ERROR(_writer->_rename_compacted_segments(begin, end));
355
22
    if (_index_file_writer != nullptr) {
356
14
        _index_file_writer.reset();
357
14
    }
358
22
    if (VLOG_DEBUG_IS_ON) {
359
0
        _writer->vlog_buffer.clear();
360
0
        for (const auto& entry : std::filesystem::directory_iterator(ctx.tablet_path)) {
361
0
            fmt::format_to(_writer->vlog_buffer, "[{}]", std::string(entry.path()));
362
0
        }
363
0
        VLOG_DEBUG << "tablet_id:" << ctx.tablet_id << " rowset_id:" << ctx.rowset_id
364
0
                   << "_segcompacted_point:" << _writer->_segcompacted_point
365
0
                   << " _num_segment:" << _writer->_num_segment
366
0
                   << " _num_segcompacted:" << _writer->_num_segcompacted
367
0
                   << " list directory:" << fmt::to_string(_writer->vlog_buffer);
368
0
    }
369
370
22
    _writer->_segcompacted_point += (end - begin + 1);
371
372
22
    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
373
22
    LOG(INFO) << "segcompaction completed. tablet_id:" << ctx.tablet_id
374
22
              << " rowset_id:" << ctx.rowset_id << " elapsed time:" << elapsed
375
22
              << "us. update segcompacted_point:" << _writer->_segcompacted_point
376
22
              << " segment num:" << segments->size() << " begin:" << begin << " end:" << end;
377
378
22
    return Status::OK();
379
22
}
380
381
22
void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segments) {
382
22
    Status status = Status::OK();
383
22
    if (_is_compacting_state_mutable.exchange(false)) {
384
22
        status = _do_compact_segments(segments);
385
22
    } else {
386
        // note: be aware that _writer maybe released when the task is cancelled
387
0
        LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task";
388
0
        return;
389
0
    }
390
22
    if (!status.ok()) {
391
0
        int errcode = status.code();
392
0
        switch (errcode) {
393
0
        case FETCH_MEMORY_EXCEEDED:
394
0
        case SEGCOMPACTION_INIT_READER:
395
0
        case SEGCOMPACTION_INIT_WRITER:
396
0
            LOG(WARNING) << "segcompaction failed, try next time:" << status;
397
0
            break;
398
0
        default:
399
0
            auto ctx = _writer->_context;
400
0
            LOG(WARNING) << "segcompaction fatal, terminating the write job."
401
0
                         << " tablet_id:" << ctx.tablet_id << " rowset_id:" << ctx.rowset_id
402
0
                         << " status:" << status;
403
            // status will be checked by the next trigger of segcompaction or the final wait
404
0
            _writer->_segcompaction_status.store(ErrorCode::INTERNAL_ERROR);
405
0
        }
406
0
    }
407
22
    DCHECK_EQ(_writer->_is_doing_segcompaction, true);
408
22
    {
409
22
        std::lock_guard lk(_writer->_is_doing_segcompaction_lock);
410
22
        _writer->_is_doing_segcompaction = false;
411
22
        _writer->_segcompacting_cond.notify_all();
412
22
    }
413
22
    _is_compacting_state_mutable = true;
414
22
}
415
416
168
bool SegcompactionWorker::need_convert_delete_bitmap() {
417
168
    if (_writer == nullptr) {
418
0
        return false;
419
0
    }
420
168
    auto tablet = _writer->context().tablet;
421
168
    return tablet != nullptr && tablet->keys_type() == KeysType::UNIQUE_KEYS &&
422
168
           tablet->enable_unique_key_merge_on_write() &&
423
168
           tablet->tablet_schema()->has_sequence_col();
424
168
}
425
426
void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap,
427
44
                                                        uint32_t src_seg_id, uint32_t dest_seg_id) {
428
    // lazy init
429
44
    if (nullptr == _converted_delete_bitmap) {
430
2
        _converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id);
431
2
    }
432
44
    auto rowset_id = _writer->context().rowset_id;
433
44
    const auto* seg_map =
434
44
            src_delete_bitmap->get({rowset_id, src_seg_id, DeleteBitmap::TEMP_VERSION_COMMON});
435
44
    if (seg_map != nullptr) {
436
30
        _converted_delete_bitmap->set({rowset_id, dest_seg_id, DeleteBitmap::TEMP_VERSION_COMMON},
437
30
                                      *seg_map);
438
30
    }
439
44
}
440
441
void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap,
442
                                                        uint32_t src_begin, uint32_t src_end,
443
8
                                                        uint32_t dst_seg_id) {
444
    // lazy init
445
8
    if (nullptr == _converted_delete_bitmap) {
446
6
        _converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id);
447
6
    }
448
8
    auto rowset_id = _writer->context().rowset_id;
449
8
    RowLocation src(rowset_id, 0, 0);
450
68
    for (uint32_t seg_id = src_begin; seg_id <= src_end; seg_id++) {
451
60
        const auto* seg_map =
452
60
                src_delete_bitmap->get({rowset_id, seg_id, DeleteBitmap::TEMP_VERSION_COMMON});
453
60
        if (!seg_map) {
454
12
            continue;
455
12
        }
456
48
        src.segment_id = seg_id;
457
114k
        for (unsigned int row_id : *seg_map) {
458
114k
            src.row_id = row_id;
459
114k
            auto dst_row_id = _rowid_conversion->get(src);
460
114k
            if (dst_row_id < 0) {
461
10.5k
                continue;
462
10.5k
            }
463
104k
            _converted_delete_bitmap->add(
464
104k
                    {rowset_id, dst_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, dst_row_id);
465
104k
        }
466
48
    }
467
8
}
468
469
1.27k
bool SegcompactionWorker::cancel() {
470
    // return true if the task is canncellable (actual compaction is not started)
471
    // return false when the task is not cancellable (it is in the middle of segcompaction)
472
1.27k
    return _is_compacting_state_mutable.exchange(false);
473
1.27k
}
474
475
#include "common/compile_check_end.h"
476
} // namespace doris