Coverage Report

Created: 2025-07-27 01:30

/root/doris/be/src/olap/rowset/segcompaction.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "segcompaction.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/olap_file.pb.h>
22
#include <limits.h>
23
24
#include <algorithm>
25
#include <atomic>
26
#include <condition_variable>
27
#include <filesystem>
28
#include <map>
29
#include <memory>
30
#include <mutex>
31
#include <sstream>
32
#include <string>
33
#include <utility>
34
35
#include "beta_rowset_writer.h"
36
#include "common/compiler_util.h" // IWYU pragma: keep
37
#include "common/logging.h"
38
#include "gutil/stringprintf.h"
39
#include "gutil/strings/substitute.h"
40
#include "io/fs/file_system.h"
41
#include "io/fs/file_writer.h"
42
#include "io/io_common.h"
43
#include "olap/data_dir.h"
44
#include "olap/iterators.h"
45
#include "olap/merger.h"
46
#include "olap/olap_common.h"
47
#include "olap/olap_define.h"
48
#include "olap/rowset/beta_rowset.h"
49
#include "olap/rowset/rowset_meta.h"
50
#include "olap/rowset/rowset_writer_context.h"
51
#include "olap/rowset/segment_v2/inverted_index_cache.h"
52
#include "olap/rowset/segment_v2/inverted_index_desc.h"
53
#include "olap/rowset/segment_v2/segment.h"
54
#include "olap/rowset/segment_v2/segment_writer.h"
55
#include "olap/schema.h"
56
#include "olap/storage_engine.h"
57
#include "olap/tablet_reader.h"
58
#include "olap/tablet_schema.h"
59
#include "runtime/memory/global_memory_arbitrator.h"
60
#include "runtime/thread_context.h"
61
#include "util/debug_points.h"
62
#include "util/mem_info.h"
63
#include "util/time.h"
64
#include "vec/olap/vertical_block_reader.h"
65
#include "vec/olap/vertical_merge_iterator.h"
66
67
namespace doris {
68
using namespace ErrorCode;
69
70
972
SegcompactionWorker::SegcompactionWorker(BetaRowsetWriter* writer) : _writer(writer) {}
71
72
972
void SegcompactionWorker::init_mem_tracker(const RowsetWriterContext& rowset_writer_context) {
73
972
    _seg_compact_mem_tracker = MemTrackerLimiter::create_shared(
74
972
            MemTrackerLimiter::Type::COMPACTION,
75
972
            fmt::format("segcompaction-txnID_{}-loadID_{}-tabletID_{}-indexID_{}-"
76
972
                        "partitionID_{}-version_{}",
77
972
                        std::to_string(rowset_writer_context.txn_id),
78
972
                        print_id(rowset_writer_context.load_id),
79
972
                        std::to_string(rowset_writer_context.tablet_id),
80
972
                        std::to_string(rowset_writer_context.index_id),
81
972
                        std::to_string(rowset_writer_context.partition_id),
82
972
                        rowset_writer_context.version.to_string()));
83
972
}
84
85
Status SegcompactionWorker::_get_segcompaction_reader(
86
        SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet,
87
        std::shared_ptr<Schema> schema, OlapReaderStatistics* stat,
88
        vectorized::RowSourcesBuffer& row_sources_buf, bool is_key,
89
        std::vector<uint32_t>& return_columns,
90
22
        std::unique_ptr<vectorized::VerticalBlockReader>* reader) {
91
22
    const auto& ctx = _writer->_context;
92
22
    bool record_rowids = need_convert_delete_bitmap() && is_key;
  Branch (92:26): [True: 8, False: 14]
  Branch (92:58): [True: 4, False: 4]
93
22
    StorageReadOptions read_options;
94
22
    read_options.stats = stat;
95
22
    read_options.use_page_cache = false;
96
22
    read_options.tablet_schema = ctx.tablet_schema;
97
22
    read_options.record_rowids = record_rowids;
98
22
    std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
99
22
    std::map<uint32_t, uint32_t> segment_rows;
100
118
    for (auto& seg_ptr : *segments) {
  Branch (100:24): [True: 118, False: 22]
101
118
        std::unique_ptr<RowwiseIterator> iter;
102
118
        auto s = seg_ptr->new_iterator(schema, read_options, &iter);
103
118
        if (!s.ok()) {
  Branch (103:13): [True: 0, False: 118]
104
0
            return Status::Error<INIT_FAILED>("failed to create iterator[{}]: {}", seg_ptr->id(),
105
0
                                              s.to_string());
106
0
        }
107
118
        seg_iterators.push_back(std::move(iter));
108
118
        segment_rows.emplace(seg_ptr->id(), seg_ptr->num_rows());
109
118
    }
110
22
    if (record_rowids && _rowid_conversion != nullptr) {
  Branch (110:9): [True: 4, False: 18]
  Branch (110:26): [True: 4, False: 0]
111
4
        _rowid_conversion->reset_segment_map(segment_rows);
112
4
    }
113
114
22
    *reader = std::make_unique<vectorized::VerticalBlockReader>(&row_sources_buf);
115
116
22
    TabletReader::ReaderParams reader_params;
117
22
    reader_params.is_segcompaction = true;
118
22
    reader_params.segment_iters_ptr = &seg_iterators;
119
    // no reader_params.version shouldn't break segcompaction
120
22
    reader_params.tablet_schema = ctx.tablet_schema;
121
22
    reader_params.tablet = tablet;
122
22
    reader_params.return_columns = return_columns;
123
22
    reader_params.is_key_column_group = is_key;
124
22
    reader_params.use_page_cache = false;
125
22
    reader_params.record_rowids = record_rowids;
126
22
    return (*reader)->init(reader_params, nullptr);
127
22
}
128
129
std::unique_ptr<segment_v2::SegmentWriter> SegcompactionWorker::_create_segcompaction_writer(
130
11
        uint32_t begin, uint32_t end) {
131
11
    Status status;
132
11
    std::unique_ptr<segment_v2::SegmentWriter> writer = nullptr;
133
11
    status = _create_segment_writer_for_segcompaction(&writer, begin, end);
134
11
    if (!status.ok() || writer == nullptr) {
  Branch (134:9): [True: 0, False: 11]
  Branch (134:25): [True: 0, False: 11]
135
0
        LOG(ERROR) << "failed to create segment writer for begin:" << begin << " end:" << end
136
0
                   << " status:" << status;
137
0
        return nullptr;
138
11
    } else {
139
11
        return writer;
140
11
    }
141
11
}
142
143
11
Status SegcompactionWorker::_delete_original_segments(uint32_t begin, uint32_t end) {
144
11
    DCHECK(_writer->rowset_meta()->is_local());
145
146
11
    const auto& fs = io::global_local_filesystem();
147
11
    auto ctx = _writer->_context;
148
11
    auto schema = ctx.tablet_schema;
149
150
70
    for (uint32_t i = begin; i <= end; ++i) {
  Branch (150:30): [True: 59, False: 11]
151
59
        auto seg_path = local_segment_path(ctx.tablet_path, ctx.rowset_id.to_string(), i);
152
        // Even if an error is encountered, these files that have not been cleaned up
153
        // will be cleaned up by the GC background. So here we only print the error
154
        // message when we encounter an error.
155
59
        RETURN_NOT_OK_STATUS_WITH_WARN(fs->delete_file(seg_path),
156
59
                                       strings::Substitute("Failed to delete file=$0", seg_path));
157
59
        if (schema->has_inverted_index() &&
  Branch (157:13): [True: 29, False: 30]
158
59
            schema->get_inverted_index_storage_format() >= InvertedIndexStorageFormatPB::V2) {
  Branch (158:13): [True: 29, False: 0]
159
29
            auto idx_path = InvertedIndexDescriptor::get_index_file_path_v2(
160
29
                    InvertedIndexDescriptor::get_index_file_path_prefix(seg_path));
161
29
            VLOG_DEBUG << "segcompaction index. delete file " << idx_path;
Line
Count
Source
41
0
#define VLOG_DEBUG VLOG(7)
162
29
            RETURN_NOT_OK_STATUS_WITH_WARN(
163
29
                    fs->delete_file(idx_path),
164
29
                    strings::Substitute("Failed to delete file=$0", idx_path));
165
29
        }
166
        // Delete inverted index files
167
207
        for (auto&& column : schema->columns()) {
  Branch (167:28): [True: 207, False: 59]
168
207
            auto index_infos = schema->inverted_indexs(*column);
169
207
            for (const auto& index_info : index_infos) {
  Branch (169:41): [True: 58, False: 207]
170
58
                auto index_id = index_info->index_id();
171
58
                if (schema->get_inverted_index_storage_format() ==
  Branch (171:21): [True: 0, False: 58]
172
58
                    InvertedIndexStorageFormatPB::V1) {
173
0
                    auto idx_path = InvertedIndexDescriptor::get_index_file_path_v1(
174
0
                            InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), index_id,
175
0
                            index_info->get_index_suffix());
176
0
                    VLOG_DEBUG << "segcompaction index. delete file " << idx_path;
Line
Count
Source
41
0
#define VLOG_DEBUG VLOG(7)
177
0
                    RETURN_NOT_OK_STATUS_WITH_WARN(
178
0
                            fs->delete_file(idx_path),
179
0
                            strings::Substitute("Failed to delete file=$0", idx_path));
180
0
                }
181
                // Erase the origin index file cache
182
58
                auto idx_file_cache_key = InvertedIndexDescriptor::get_index_file_cache_key(
183
58
                        InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), index_id,
184
58
                        index_info->get_index_suffix());
185
58
                RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(idx_file_cache_key));
186
58
            }
187
207
        }
188
59
    }
189
11
    return Status::OK();
190
11
}
191
192
Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat,
193
                                               Merger::Statistics& merger_stat, uint32_t begin,
194
11
                                               uint32_t end) {
195
11
    uint64_t raw_rows_read = reader_stat.raw_rows_read; /* total rows read before merge */
196
11
    uint64_t sum_src_row = 0; /* sum of rows in each involved source segments */
197
11
    uint64_t filtered_rows = merger_stat.filtered_rows; /* rows filtered by del conditions */
198
11
    uint64_t output_rows = merger_stat.output_rows;     /* rows after merge */
199
11
    uint64_t merged_rows = merger_stat.merged_rows;     /* dup key merged by unique/agg */
200
201
11
    {
202
11
        std::lock_guard<std::mutex> lock(_writer->_segid_statistics_map_mutex);
203
70
        for (int i = begin; i <= end; ++i) {
  Branch (203:29): [True: 59, False: 11]
204
59
            sum_src_row += _writer->_segid_statistics_map[i].row_num;
205
59
        }
206
11
    }
207
208
11
    DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", { sum_src_row++; });
Line
Count
Source
37
11
    if (UNLIKELY(config::enable_debug_points)) {                              \
38
2
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
2
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 2]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
42
0
        }                                                                     \
43
2
    }
209
11
    if (raw_rows_read != sum_src_row) {
  Branch (209:9): [True: 0, False: 11]
210
0
        return Status::Error<CHECK_LINES_ERROR>(
211
0
                "segcompaction read row num does not match source. expect read row:{}, actual read "
212
0
                "row:{}",
213
0
                sum_src_row, raw_rows_read);
214
0
    }
215
216
11
    DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", { merged_rows++; });
Line
Count
Source
37
11
    if (UNLIKELY(config::enable_debug_points)) {                              \
38
2
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
2
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 2]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
42
0
        }                                                                     \
43
2
    }
217
11
    if ((output_rows + merged_rows) != raw_rows_read) {
  Branch (217:9): [True: 0, False: 11]
218
0
        return Status::Error<CHECK_LINES_ERROR>(
219
0
                "segcompaction total row num does not match after merge. expect total row:{},  "
220
0
                "actual total row:{}, (output_rows:{},merged_rows:{})",
221
0
                raw_rows_read, output_rows + merged_rows, output_rows, merged_rows);
222
0
    }
223
11
    DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_filtered_rows",
Line
Count
Source
37
11
    if (UNLIKELY(config::enable_debug_points)) {                              \
38
2
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
2
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 2]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
42
0
        }                                                                     \
43
2
    }
224
11
                    { filtered_rows++; });
225
11
    if (filtered_rows != 0) {
  Branch (225:9): [True: 0, False: 11]
226
0
        return Status::Error<CHECK_LINES_ERROR>(
227
0
                "segcompaction should not have filtered rows but actual filtered rows:{}",
228
0
                filtered_rows);
229
0
    }
230
11
    return Status::OK();
231
11
}
232
233
Status SegcompactionWorker::_create_segment_writer_for_segcompaction(
234
11
        std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t begin, uint32_t end) {
235
11
    return _writer->create_segment_writer_for_segcompaction(writer, begin, end);
236
11
}
237
238
11
Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) {
239
11
    DCHECK(_seg_compact_mem_tracker != nullptr);
240
11
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_seg_compact_mem_tracker);
Line
Count
Source
76
11
    auto VARNAME_LINENUM(scoped_tls_stmtl) = doris::ScopedInitThreadContext()
241
    /* throttle segcompaction task if memory depleted */
242
11
    if (GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
  Branch (242:9): [True: 0, False: 11]
243
0
        return Status::Error<FETCH_MEMORY_EXCEEDED>("skip segcompaction due to memory shortage");
244
0
    }
245
246
11
    uint32_t begin = (*(segments->begin()))->id();
247
11
    uint32_t end = (*(segments->end() - 1))->id();
248
11
    uint64_t begin_time = GetCurrentTimeMicros();
249
11
    uint64_t index_size = 0;
250
11
    uint64_t total_index_size = 0;
251
11
    auto ctx = _writer->_context;
252
253
11
    auto writer = _create_segcompaction_writer(begin, end);
254
11
    if (UNLIKELY(writer == nullptr)) {
255
0
        return Status::Error<SEGCOMPACTION_INIT_WRITER>("failed to get segcompaction writer");
256
0
    }
257
258
11
    DCHECK(ctx.tablet);
259
11
    auto tablet = std::static_pointer_cast<Tablet>(ctx.tablet);
260
11
    if (need_convert_delete_bitmap() && _rowid_conversion == nullptr) {
  Branch (260:9): [True: 4, False: 7]
  Branch (260:41): [True: 3, False: 1]
261
3
        _rowid_conversion = std::make_unique<SimpleRowIdConversion>(_writer->rowset_id());
262
3
    }
263
264
11
    std::vector<std::vector<uint32_t>> column_groups;
265
11
    std::vector<uint32_t> key_group_cluster_key_idxes;
266
11
    Merger::vertical_split_columns(*ctx.tablet_schema, &column_groups,
267
11
                                   &key_group_cluster_key_idxes);
268
11
    vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(),
269
11
                                                 ReaderType::READER_SEGMENT_COMPACTION);
270
271
11
    KeyBoundsPB key_bounds;
272
11
    Merger::Statistics key_merger_stats;
273
11
    OlapReaderStatistics key_reader_stats;
274
    /* compact group one by one */
275
33
    for (auto i = 0; i < column_groups.size(); ++i) {
  Branch (275:22): [True: 22, False: 11]
276
22
        VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
Line
Count
Source
42
0
#define VLOG_NOTICE VLOG(3)
277
22
        bool is_key = (i == 0);
278
22
        std::vector<uint32_t> column_ids = column_groups[i];
279
280
22
        writer->clear();
281
22
        RETURN_IF_ERROR(writer->init(column_ids, is_key));
282
22
        auto schema = std::make_shared<Schema>(ctx.tablet_schema->columns(), column_ids);
283
22
        OlapReaderStatistics reader_stats;
284
22
        std::unique_ptr<vectorized::VerticalBlockReader> reader;
285
22
        auto s = _get_segcompaction_reader(segments, tablet, schema, &reader_stats, row_sources_buf,
286
22
                                           is_key, column_ids, &reader);
287
22
        if (UNLIKELY(reader == nullptr || !s.ok())) {
288
0
            return Status::Error<SEGCOMPACTION_INIT_READER>(
289
0
                    "failed to get segcompaction reader. err: {}", s.to_string());
290
0
        }
291
292
22
        Merger::Statistics merger_stats;
293
22
        RETURN_IF_ERROR(Merger::vertical_compact_one_group(
294
22
                tablet->tablet_id(), ReaderType::READER_SEGMENT_COMPACTION, *ctx.tablet_schema,
295
22
                is_key, column_ids, &row_sources_buf, *reader, *writer, &merger_stats, &index_size,
296
22
                key_bounds, _rowid_conversion.get()));
297
22
        total_index_size += index_size;
298
22
        if (is_key) {
  Branch (298:13): [True: 11, False: 11]
299
11
            RETURN_IF_ERROR(row_sources_buf.flush());
300
11
            key_merger_stats = merger_stats;
301
11
            key_reader_stats = reader_stats;
302
11
        }
303
22
        RETURN_IF_ERROR(row_sources_buf.seek_to_begin());
304
22
    }
305
306
    /* check row num after merge/aggregation */
307
11
    RETURN_NOT_OK_STATUS_WITH_WARN(
308
11
            _check_correctness(key_reader_stats, key_merger_stats, begin, end),
309
11
            "check correctness failed");
310
11
    {
311
11
        std::lock_guard<std::mutex> lock(_writer->_segid_statistics_map_mutex);
312
11
        _writer->_clear_statistics_for_deleting_segments_unsafe(begin, end);
313
11
    }
314
11
    RETURN_IF_ERROR(
315
11
            _writer->flush_segment_writer_for_segcompaction(&writer, total_index_size, key_bounds));
316
317
11
    if (_file_writer != nullptr && _file_writer->state() != io::FileWriter::State::CLOSED) {
  Branch (317:9): [True: 11, False: 0]
  Branch (317:36): [True: 11, False: 0]
318
11
        RETURN_IF_ERROR(_file_writer->close());
319
11
    }
320
321
11
    RETURN_IF_ERROR(_delete_original_segments(begin, end));
322
11
    if (_rowid_conversion != nullptr) {
  Branch (322:9): [True: 4, False: 7]
323
4
        convert_segment_delete_bitmap(ctx.mow_context->delete_bitmap, begin, end,
324
4
                                      _writer->_num_segcompacted);
325
4
    }
326
11
    RETURN_IF_ERROR(_writer->_rename_compacted_segments(begin, end));
327
11
    if (_inverted_index_file_writer != nullptr) {
  Branch (327:9): [True: 7, False: 4]
328
7
        _inverted_index_file_writer.reset();
329
7
    }
330
11
    if (VLOG_DEBUG_IS_ON) {
Line
Count
Source
51
11
#define VLOG_DEBUG_IS_ON VLOG_IS_ON(7)
331
0
        _writer->vlog_buffer.clear();
332
0
        for (const auto& entry : std::filesystem::directory_iterator(ctx.tablet_path)) {
  Branch (332:32): [True: 0, False: 0]
333
0
            fmt::format_to(_writer->vlog_buffer, "[{}]", string(entry.path()));
334
0
        }
335
0
        VLOG_DEBUG << "tablet_id:" << ctx.tablet_id << " rowset_id:" << ctx.rowset_id
Line
Count
Source
41
0
#define VLOG_DEBUG VLOG(7)
336
0
                   << "_segcompacted_point:" << _writer->_segcompacted_point
337
0
                   << " _num_segment:" << _writer->_num_segment
338
0
                   << " _num_segcompacted:" << _writer->_num_segcompacted
339
0
                   << " list directory:" << fmt::to_string(_writer->vlog_buffer);
340
0
    }
341
342
11
    _writer->_segcompacted_point += (end - begin + 1);
343
344
11
    uint64_t elapsed = GetCurrentTimeMicros() - begin_time;
345
11
    LOG(INFO) << "segcompaction completed. tablet_id:" << ctx.tablet_id
346
11
              << " rowset_id:" << ctx.rowset_id << " elapsed time:" << elapsed
347
11
              << "us. update segcompacted_point:" << _writer->_segcompacted_point
348
11
              << " segment num:" << segments->size() << " begin:" << begin << " end:" << end;
349
350
11
    return Status::OK();
351
11
}
352
353
11
void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segments) {
354
11
    Status status = Status::OK();
355
11
    if (_is_compacting_state_mutable.exchange(false)) {
  Branch (355:9): [True: 11, False: 0]
356
11
        status = _do_compact_segments(segments);
357
11
    } else {
358
        // note: be aware that _writer maybe released when the task is cancelled
359
0
        LOG(INFO) << "segcompaction worker is cancelled, skipping segcompaction task";
360
0
        return;
361
0
    }
362
11
    if (!status.ok()) {
  Branch (362:9): [True: 0, False: 11]
363
0
        int16_t errcode = status.code();
364
0
        switch (errcode) {
365
0
        case FETCH_MEMORY_EXCEEDED:
  Branch (365:9): [True: 0, False: 0]
366
0
        case SEGCOMPACTION_INIT_READER:
  Branch (366:9): [True: 0, False: 0]
367
0
        case SEGCOMPACTION_INIT_WRITER:
  Branch (367:9): [True: 0, False: 0]
368
0
            LOG(WARNING) << "segcompaction failed, try next time:" << status;
369
0
            break;
370
0
        default:
  Branch (370:9): [True: 0, False: 0]
371
0
            auto ctx = _writer->_context;
372
0
            LOG(WARNING) << "segcompaction fatal, terminating the write job."
373
0
                         << " tablet_id:" << ctx.tablet_id << " rowset_id:" << ctx.rowset_id
374
0
                         << " status:" << status;
375
            // status will be checked by the next trigger of segcompaction or the final wait
376
0
            _writer->_segcompaction_status.store(ErrorCode::INTERNAL_ERROR);
377
0
        }
378
0
    }
379
11
    DCHECK_EQ(_writer->_is_doing_segcompaction, true);
380
11
    {
381
11
        std::lock_guard lk(_writer->_is_doing_segcompaction_lock);
382
11
        _writer->_is_doing_segcompaction = false;
383
11
        _writer->_segcompacting_cond.notify_all();
384
11
    }
385
11
    _is_compacting_state_mutable = true;
386
11
}
387
388
84
bool SegcompactionWorker::need_convert_delete_bitmap() {
389
84
    if (_writer == nullptr) {
  Branch (389:9): [True: 0, False: 84]
390
0
        return false;
391
0
    }
392
84
    auto tablet = _writer->context().tablet;
393
84
    return tablet != nullptr && tablet->keys_type() == KeysType::UNIQUE_KEYS &&
  Branch (393:12): [True: 84, False: 0]
  Branch (393:33): [True: 47, False: 37]
394
84
           tablet->enable_unique_key_merge_on_write() &&
  Branch (394:12): [True: 38, False: 9]
395
84
           tablet->tablet_schema()->has_sequence_col();
  Branch (395:12): [True: 38, False: 0]
396
84
}
397
398
void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap,
399
22
                                                        uint32_t src_seg_id, uint32_t dest_seg_id) {
400
    // lazy init
401
22
    if (nullptr == _converted_delete_bitmap) {
  Branch (401:9): [True: 1, False: 21]
402
1
        _converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id);
403
1
    }
404
22
    auto rowset_id = _writer->context().rowset_id;
405
22
    const auto* seg_map =
406
22
            src_delete_bitmap->get({rowset_id, src_seg_id, DeleteBitmap::TEMP_VERSION_COMMON});
407
22
    if (seg_map != nullptr) {
  Branch (407:9): [True: 15, False: 7]
408
15
        _converted_delete_bitmap->set({rowset_id, dest_seg_id, DeleteBitmap::TEMP_VERSION_COMMON},
409
15
                                      *seg_map);
410
15
    }
411
22
}
412
413
void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap,
414
                                                        uint32_t src_begin, uint32_t src_end,
415
4
                                                        uint32_t dst_seg_id) {
416
    // lazy init
417
4
    if (nullptr == _converted_delete_bitmap) {
  Branch (417:9): [True: 3, False: 1]
418
3
        _converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id);
419
3
    }
420
4
    auto rowset_id = _writer->context().rowset_id;
421
4
    RowLocation src(rowset_id, 0, 0);
422
34
    for (uint32_t seg_id = src_begin; seg_id <= src_end; seg_id++) {
  Branch (422:39): [True: 30, False: 4]
423
30
        const auto* seg_map =
424
30
                src_delete_bitmap->get({rowset_id, seg_id, DeleteBitmap::TEMP_VERSION_COMMON});
425
30
        if (!seg_map) {
  Branch (425:13): [True: 6, False: 24]
426
6
            continue;
427
6
        }
428
24
        src.segment_id = seg_id;
429
57.3k
        for (unsigned int row_id : *seg_map) {
  Branch (429:34): [True: 57.3k, False: 24]
430
57.3k
            src.row_id = row_id;
431
57.3k
            auto dst_row_id = _rowid_conversion->get(src);
432
57.3k
            if (dst_row_id < 0) {
  Branch (432:17): [True: 5.28k, False: 52.0k]
433
5.28k
                continue;
434
5.28k
            }
435
52.0k
            _converted_delete_bitmap->add(
436
52.0k
                    {rowset_id, dst_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, dst_row_id);
437
52.0k
        }
438
24
    }
439
4
}
440
441
624
bool SegcompactionWorker::cancel() {
442
    // return true if the task is canncellable (actual compaction is not started)
443
    // return false when the task is not cancellable (it is in the middle of segcompaction)
444
624
    return _is_compacting_state_mutable.exchange(false);
445
624
}
446
447
} // namespace doris