Coverage Report

Created: 2026-06-24 12:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/beta_rowset_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/rowset/beta_rowset_writer.h"
19
20
#include <assert.h>
21
// IWYU pragma: no_include <bthread/errno.h>
22
#include <errno.h> // IWYU pragma: keep
23
#include <fmt/format.h>
24
#include <stdio.h>
25
26
#include <chrono>
27
#include <ctime> // time
28
#include <filesystem>
29
#include <memory>
30
#include <mutex>
31
#include <sstream>
32
#include <string>
33
#include <thread>
34
#include <utility>
35
#include <vector>
36
37
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
38
#include "common/cast_set.h"
39
#include "common/compiler_util.h" // IWYU pragma: keep
40
#include "common/config.h"
41
#include "common/logging.h"
42
#include "common/status.h"
43
#include "core/block/block.h"
44
#include "core/column/column.h"
45
#include "core/data_type/data_type_factory.hpp"
46
#include "io/fs/file_reader.h"
47
#include "io/fs/file_system.h"
48
#include "io/fs/file_writer.h"
49
#include "runtime/thread_context.h"
50
#include "storage/index/inverted/inverted_index_cache.h"
51
#include "storage/index/inverted/inverted_index_desc.h"
52
#include "storage/olap_define.h"
53
#include "storage/rowset/beta_rowset.h"
54
#include "storage/rowset/rowset_factory.h"
55
#include "storage/rowset/rowset_writer.h"
56
#include "storage/rowset/segcompaction.h"
57
#include "storage/schema_change/schema_change.h"
58
#include "storage/segment/segment.h"
59
#include "storage/segment/segment_writer.h"
60
#include "storage/storage_engine.h"
61
#include "storage/tablet/tablet_schema.h"
62
#include "util/debug_points.h"
63
#include "util/pretty_printer.h"
64
#include "util/slice.h"
65
#include "util/stopwatch.hpp"
66
#include "util/time.h"
67
68
namespace doris {
69
using namespace ErrorCode;
70
71
namespace {
72
73
699
bool is_segment_overlapping(const std::vector<KeyBoundsPB>& segments_encoded_key_bounds) {
74
699
    std::string_view last;
75
1.78k
    for (auto&& segment_encode_key : segments_encoded_key_bounds) {
76
1.78k
        auto&& cur_min = segment_encode_key.min_key();
77
1.78k
        auto&& cur_max = segment_encode_key.max_key();
78
1.78k
        if (cur_min <= last) {
79
255
            return true;
80
255
        }
81
1.53k
        last = cur_max;
82
1.53k
    }
83
444
    return false;
84
699
}
85
86
1.21k
bool copy_key_bounds_with_truncation(const KeyBoundsPB& src, KeyBoundsPB* dst) {
87
1.21k
    DCHECK(dst != nullptr);
88
1.21k
    if (config::random_segments_key_bounds_truncation) {
89
0
        dst->CopyFrom(src);
90
0
        return false;
91
0
    }
92
1.21k
    const int32_t truncation_threshold = config::segments_key_bounds_truncation_threshold;
93
1.21k
    if (truncation_threshold <= 0) {
94
355
        dst->CopyFrom(src);
95
355
        return false;
96
355
    }
97
859
    const size_t truncation_size = cast_set<size_t>(truncation_threshold);
98
99
859
    bool truncated = false;
100
1.71k
    auto copy_key = [&](const std::string& key, std::string* stored_key) {
101
1.71k
        if (key.size() > truncation_size) {
102
1.39k
            stored_key->assign(key.data(), truncation_size);
103
1.39k
            truncated = true;
104
1.39k
            return;
105
1.39k
        }
106
323
        stored_key->assign(key.data(), key.size());
107
323
    };
108
859
    copy_key(src.min_key(), dst->mutable_min_key());
109
859
    copy_key(src.max_key(), dst->mutable_max_key());
110
859
    return truncated;
111
1.21k
}
112
113
SegmentStatistics copy_segment_statistics_with_truncated_key_bounds(const SegmentStatistics& src,
114
1.20k
                                                                    bool& key_bounds_truncated) {
115
1.20k
    SegmentStatistics dst;
116
1.20k
    dst.row_num = src.row_num;
117
1.20k
    dst.data_size = src.data_size;
118
1.20k
    dst.index_size = src.index_size;
119
1.20k
    key_bounds_truncated = copy_key_bounds_with_truncation(src.key_bounds, &dst.key_bounds);
120
1.20k
    return dst;
121
1.20k
}
122
123
void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta,
124
24
                                       const RowsetMeta& spec_rowset_meta) {
125
24
    rowset_meta.set_num_rows(spec_rowset_meta.num_rows());
126
24
    rowset_meta.set_total_disk_size(spec_rowset_meta.total_disk_size());
127
24
    rowset_meta.set_data_disk_size(spec_rowset_meta.data_disk_size());
128
24
    rowset_meta.set_index_disk_size(spec_rowset_meta.index_disk_size());
129
    // TODO write zonemap to meta
130
24
    rowset_meta.set_empty(spec_rowset_meta.num_rows() == 0);
131
24
    rowset_meta.set_creation_time(time(nullptr));
132
24
    rowset_meta.set_num_segments(spec_rowset_meta.num_segments());
133
24
    rowset_meta.set_segments_overlap(spec_rowset_meta.segments_overlap());
134
24
    rowset_meta.set_rowset_state(spec_rowset_meta.rowset_state());
135
24
    rowset_meta.set_segments_key_bounds_truncated(
136
24
            spec_rowset_meta.is_segments_key_bounds_truncated());
137
24
    rowset_meta.set_db_id(spec_rowset_meta.db_id());
138
24
    rowset_meta.set_table_id(spec_rowset_meta.table_id());
139
24
    std::vector<KeyBoundsPB> segments_key_bounds;
140
24
    spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds);
141
    // Preserve source layout: if source was aggregated (size 1), re-aggregating
142
    // the single entry is a no-op that also keeps the flag consistent.
143
24
    rowset_meta.set_segments_key_bounds(segments_key_bounds,
144
24
                                        spec_rowset_meta.is_segments_key_bounds_aggregated());
145
24
    std::vector<uint32_t> num_segment_rows;
146
24
    spec_rowset_meta.get_num_segment_rows(&num_segment_rows);
147
24
    rowset_meta.set_num_segment_rows(num_segment_rows);
148
24
    if (spec_rowset_meta.has_commit_tso()) {
149
6
        rowset_meta.set_commit_tso(spec_rowset_meta.commit_tso());
150
6
    }
151
24
    if (spec_rowset_meta.is_row_binlog()) {
152
0
        rowset_meta.mark_row_binlog();
153
0
    }
154
24
}
155
156
} // namespace
157
158
904
SegmentFileCollection::~SegmentFileCollection() = default;
159
160
2.38k
Status SegmentFileCollection::add(int seg_id, io::FileWriterPtr&& writer) {
161
2.38k
    std::lock_guard lock(_lock);
162
2.38k
    if (_closed) [[unlikely]] {
163
0
        DCHECK(false) << writer->path();
164
0
        return Status::InternalError("add to closed SegmentFileCollection");
165
0
    }
166
167
2.38k
    _file_writers.emplace(seg_id, std::move(writer));
168
2.38k
    return Status::OK();
169
2.38k
}
170
171
63
io::FileWriter* SegmentFileCollection::get(int seg_id) const {
172
63
    std::lock_guard lock(_lock);
173
63
    if (auto it = _file_writers.find(seg_id); it != _file_writers.end()) {
174
63
        return it->second.get();
175
63
    } else {
176
0
        return nullptr;
177
0
    }
178
63
}
179
180
869
Status SegmentFileCollection::close() {
181
869
    {
182
869
        std::lock_guard lock(_lock);
183
869
        if (_closed) [[unlikely]] {
184
0
            DCHECK(false);
185
0
            return Status::InternalError("double close SegmentFileCollection");
186
0
        }
187
869
        _closed = true;
188
869
    }
189
190
2.38k
    for (auto&& [_, writer] : _file_writers) {
191
2.38k
        DBUG_EXECUTE_IF("SegmentFileCollection.close.wait_dat_closed", {
192
2.38k
            auto before_state = writer->state();
193
2.38k
            for (int i = 0; i < 3000 && writer->state() != io::FileWriter::State::CLOSED; ++i) {
194
2.38k
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
195
2.38k
            }
196
2.38k
            LOG(INFO) << "SegmentFileCollection.close.wait_dat_closed path="
197
2.38k
                      << writer->path().native()
198
2.38k
                      << " before_state=" << static_cast<int>(before_state)
199
2.38k
                      << " after_state=" << static_cast<int>(writer->state());
200
2.38k
        });
201
2.38k
        if (writer->state() != io::FileWriter::State::CLOSED) {
202
2.31k
            RETURN_IF_ERROR(writer->close());
203
2.31k
        }
204
2.38k
    }
205
206
869
    return Status::OK();
207
869
}
208
209
0
Result<std::vector<size_t>> SegmentFileCollection::segments_file_size(int seg_id_offset) {
210
0
    std::lock_guard lock(_lock);
211
0
    if (!_closed) [[unlikely]] {
212
0
        DCHECK(false);
213
0
        return ResultError(Status::InternalError("get segments file size without closed"));
214
0
    }
215
216
0
    Status st;
217
0
    std::vector<size_t> seg_file_size(_file_writers.size(), 0);
218
0
    bool succ = std::all_of(_file_writers.begin(), _file_writers.end(), [&](auto&& it) {
219
0
        auto&& [seg_id, writer] = it;
220
221
0
        int idx = seg_id - seg_id_offset;
222
0
        if (idx >= seg_file_size.size()) [[unlikely]] {
223
0
            auto err_msg = fmt::format(
224
0
                    "invalid seg_id={} num_file_writers={} seg_id_offset={} path={}", seg_id,
225
0
                    seg_file_size.size(), seg_id_offset, writer->path().native());
226
0
            DCHECK(false) << err_msg;
227
0
            st = Status::InternalError(err_msg);
228
0
            return false;
229
0
        }
230
231
0
        auto& fsize = seg_file_size[idx];
232
0
        if (fsize != 0) {
233
            // File size should not been set
234
0
            auto err_msg =
235
0
                    fmt::format("duplicate seg_id={} path={}", seg_id, writer->path().native());
236
0
            DCHECK(false) << err_msg;
237
0
            st = Status::InternalError(err_msg);
238
0
            return false;
239
0
        }
240
241
0
        fsize = writer->bytes_appended();
242
0
        if (fsize <= 0) {
243
0
            auto err_msg =
244
0
                    fmt::format("invalid segment fsize={} path={}", fsize, writer->path().native());
245
0
            DCHECK(false) << err_msg;
246
0
            st = Status::InternalError(err_msg);
247
0
            return false;
248
0
        }
249
250
0
        return true;
251
0
    });
252
253
0
    if (succ) {
254
0
        return seg_file_size;
255
0
    }
256
257
0
    return ResultError(st);
258
0
}
259
260
904
InvertedIndexFileCollection::~InvertedIndexFileCollection() = default;
261
262
264
Status InvertedIndexFileCollection::add(int seg_id, IndexFileWriterPtr&& index_writer) {
263
264
    std::lock_guard lock(_lock);
264
264
    if (_inverted_index_file_writers.find(seg_id) != _inverted_index_file_writers.end())
265
0
            [[unlikely]] {
266
0
        DCHECK(false);
267
0
        return Status::InternalError("The seg_id already exists, seg_id is {}", seg_id);
268
0
    }
269
264
    _inverted_index_file_writers.emplace(seg_id, std::move(index_writer));
270
264
    return Status::OK();
271
264
}
272
273
328
Status InvertedIndexFileCollection::begin_close() {
274
328
    std::lock_guard lock(_lock);
275
328
    for (auto&& [id, writer] : _inverted_index_file_writers) {
276
36
        RETURN_IF_ERROR(writer->begin_close());
277
36
        _total_size += writer->get_index_file_total_size();
278
36
    }
279
280
328
    return Status::OK();
281
328
}
282
283
869
Status InvertedIndexFileCollection::finish_close() {
284
869
    std::lock_guard lock(_lock);
285
869
    for (auto&& [id, writer] : _inverted_index_file_writers) {
286
263
        RETURN_IF_ERROR(writer->finish_close());
287
263
    }
288
869
    return Status::OK();
289
869
}
290
291
Result<std::vector<const InvertedIndexFileInfo*>>
292
107
InvertedIndexFileCollection::inverted_index_file_info(int seg_id_offset) {
293
107
    std::lock_guard lock(_lock);
294
295
107
    Status st;
296
107
    std::vector<const InvertedIndexFileInfo*> idx_file_info(_inverted_index_file_writers.size());
297
107
    bool succ = std::all_of(
298
107
            _inverted_index_file_writers.begin(), _inverted_index_file_writers.end(),
299
214
            [&](auto&& it) {
300
214
                auto&& [seg_id, writer] = it;
301
302
214
                int idx = seg_id - seg_id_offset;
303
214
                if (idx >= idx_file_info.size()) [[unlikely]] {
304
0
                    auto err_msg =
305
0
                            fmt::format("invalid seg_id={} num_file_writers={} seg_id_offset={}",
306
0
                                        seg_id, idx_file_info.size(), seg_id_offset);
307
0
                    DCHECK(false) << err_msg;
308
0
                    st = Status::InternalError(err_msg);
309
0
                    return false;
310
0
                }
311
214
                idx_file_info[idx] = _inverted_index_file_writers[seg_id]->get_index_file_info();
312
214
                return true;
313
214
            });
314
315
107
    if (succ) {
316
107
        return idx_file_info;
317
107
    }
318
319
0
    return ResultError(st);
320
107
}
321
322
BaseBetaRowsetWriter::BaseBetaRowsetWriter()
323
903
        : _num_segment(0),
324
903
          _segment_start_id(0),
325
903
          _num_rows_written(0),
326
903
          _total_data_size(0),
327
903
          _total_index_size(0),
328
903
          _segment_creator(_context, _seg_files, _idx_files) {}
329
330
BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine)
331
902
        : _engine(engine), _segcompaction_worker(std::make_shared<SegcompactionWorker>(this)) {}
332
333
10
RowBinlogRowsetWriter::RowBinlogRowsetWriter(StorageEngine& engine) : BetaRowsetWriter(engine) {}
334
335
903
BaseBetaRowsetWriter::~BaseBetaRowsetWriter() {
336
903
    if (!_already_built && _rowset_meta->is_local()) {
337
        // abnormal exit, remove all files generated
338
10
        auto& fs = io::global_local_filesystem();
339
12
        for (int i = _segment_start_id; i < _segment_creator.next_segment_id(); ++i) {
340
2
            std::string seg_path =
341
2
                    local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), i);
342
            // Even if an error is encountered, these files that have not been cleaned up
343
            // will be cleaned up by the GC background. So here we only print the error
344
            // message when we encounter an error.
345
2
            WARN_IF_ERROR(fs->delete_file(seg_path),
346
2
                          fmt::format("Failed to delete file={}", seg_path));
347
2
        }
348
10
    }
349
903
    if (_calc_delete_bitmap_token) {
350
9
        _calc_delete_bitmap_token->cancel();
351
9
    }
352
903
}
353
354
902
BetaRowsetWriter::~BetaRowsetWriter() {
355
    /* Note that segcompaction is async and in parallel with load job. So we should handle carefully
356
     * when the job is cancelled. Although it is meaningless to continue segcompaction when the job
357
     * is cancelled, the objects involved in the job should be preserved during segcompaction to
358
     * avoid crashs for memory issues. */
359
902
    WARN_IF_ERROR(_wait_flying_segcompaction(), "segment compaction failed");
360
902
}
361
362
902
Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
363
902
    _context = rowset_writer_context;
364
902
    DCHECK(_context.tablet_schema != nullptr);
365
902
    _rowset_meta.reset(new RowsetMeta);
366
902
    if (_context.storage_resource) {
367
0
        _rowset_meta->set_remote_storage_resource(*_context.storage_resource);
368
0
    }
369
902
    _rowset_meta->set_rowset_id(_context.rowset_id);
370
902
    _rowset_meta->set_partition_id(_context.partition_id);
371
902
    _rowset_meta->set_tablet_id(_context.tablet_id);
372
902
    _rowset_meta->set_db_id(_context.db_id);
373
902
    _rowset_meta->set_table_id(_context.table_id);
374
902
    _rowset_meta->set_index_id(_context.index_id);
375
902
    _rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash);
376
902
    _rowset_meta->set_rowset_type(_context.rowset_type);
377
902
    _rowset_meta->set_rowset_state(_context.rowset_state);
378
902
    _rowset_meta->set_segments_overlap(_context.segments_overlap);
379
902
    if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) {
380
36
        _is_pending = true;
381
36
        _rowset_meta->set_txn_id(_context.txn_id);
382
36
        _rowset_meta->set_load_id(_context.load_id);
383
866
    } else {
384
866
        _rowset_meta->set_version(_context.version);
385
866
        _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp);
386
866
    }
387
902
    _rowset_meta->set_tablet_uid(_context.tablet_uid);
388
902
    _rowset_meta->set_tablet_schema(_context.tablet_schema);
389
902
    _rowset_meta->set_compaction_level(_context.compaction_level);
390
902
    if (_context.write_binlog_opt().enable) {
391
10
        _rowset_meta->mark_row_binlog();
392
10
    }
393
902
    _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
394
902
    _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
395
902
    return Status::OK();
396
902
}
397
398
1.36k
Status BaseBetaRowsetWriter::add_block(const Block* block) {
399
1.36k
    return _segment_creator.add_block(block);
400
1.36k
}
401
402
77
bool BaseBetaRowsetWriter::_is_segment_delete_bitmap_calculated(uint32_t segment_id) const {
403
77
    if (_context.mow_context == nullptr) {
404
        // This segcompaction gate is only for local MoW rowsets. Non-MoW rowsets
405
        // have no pending async delete bitmap work.
406
38
        return true;
407
38
    }
408
39
    std::lock_guard lock(_delete_bitmap_calculated_segments_mutex);
409
39
    return _delete_bitmap_calculated_segments.contains(segment_id);
410
77
}
411
412
63
void BaseBetaRowsetWriter::_mark_segment_delete_bitmap_calculated(uint32_t segment_id) {
413
63
    std::lock_guard lock(_delete_bitmap_calculated_segments_mutex);
414
63
    _delete_bitmap_calculated_segments.add(segment_id);
415
63
}
416
417
63
Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
418
63
    SCOPED_RAW_TIMER(&_delete_bitmap_ns);
419
63
    if (_context.is_transient_rowset_writer ||
420
63
        !_context.tablet->enable_unique_key_merge_on_write() ||
421
63
        (_context.partial_update_info && _context.partial_update_info->is_partial_update())) {
422
0
        _mark_segment_delete_bitmap_calculated(segment_id);
423
0
        return Status::OK();
424
0
    }
425
63
    std::vector<RowsetSharedPtr> specified_rowsets;
426
63
    {
427
63
        std::shared_lock meta_rlock(_context.tablet->get_header_lock());
428
63
        specified_rowsets =
429
63
                _context.tablet->get_rowset_by_ids(_context.mow_context->rowset_ids.get());
430
63
    }
431
432
    // Submit the entire delete bitmap calculation process to thread pool for async execution
433
    // This avoids blocking memtable flush thread while waiting for file upload to complete
434
    // The process includes: file_writer->close(), _build_tmp, load_segments, and calc_delete_bitmap
435
63
    return _calc_delete_bitmap_token->submit_func(
436
63
            [this, segment_id, specified_rowsets = std::move(specified_rowsets)]() -> Status {
437
63
                Status st = Status::OK();
438
                // If this async pipeline fails, leave the segment unmarked. build() waits
439
                // on the token and aborts this writer; marking here would allow
440
                // segcompaction to consume a segment whose delete bitmap was not published.
441
                // Step 1: Close file_writer (must be done before load_segments)
442
63
                auto* file_writer = _seg_files.get(segment_id);
443
63
                if (file_writer && file_writer->state() != io::FileWriter::State::CLOSED) {
444
63
                    MonotonicStopWatch close_timer;
445
63
                    close_timer.start();
446
63
                    st = file_writer->close();
447
63
                    close_timer.stop();
448
449
63
                    auto close_time_ms = close_timer.elapsed_time_milliseconds();
450
63
                    if (close_time_ms > 1000) {
451
0
                        LOG(INFO) << "file_writer->close() took " << close_time_ms
452
0
                                  << "ms for segment_id=" << segment_id
453
0
                                  << ", tablet_id=" << _context.tablet_id
454
0
                                  << ", rowset_id=" << _context.rowset_id;
455
0
                    }
456
63
                    if (!st.ok()) {
457
0
                        return st;
458
0
                    }
459
63
                }
460
461
63
                DBUG_EXECUTE_IF("BetaRowsetWriter.generate_delete_bitmap.sleep_before_build_tmp", {
462
63
                    auto sleep_ms = dp->param<int64_t>("sleep_ms", 30000);
463
63
                    std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
464
63
                });
465
466
63
                OlapStopWatch watch;
467
                // Step 2: Build tmp rowset (needs file_writer to be closed)
468
63
                RowsetSharedPtr rowset_ptr;
469
63
                st = _build_tmp(rowset_ptr);
470
63
                if (!st.ok()) {
471
0
                    return st;
472
0
                }
473
474
                // Step 3: Load segments (needs file_writer to be closed and rowset to be built)
475
63
                auto* beta_rowset = reinterpret_cast<BetaRowset*>(rowset_ptr.get());
476
63
                std::vector<segment_v2::SegmentSharedPtr> segments;
477
63
                st = beta_rowset->load_segments(segment_id, segment_id + 1, &segments);
478
63
                if (!st.ok()) {
479
0
                    return st;
480
0
                }
481
482
                // Step 4: Calculate delete bitmap
483
63
                st = BaseTablet::calc_delete_bitmap(
484
63
                        _context.tablet, rowset_ptr, segments, specified_rowsets,
485
63
                        _context.mow_context->delete_bitmap, _context.mow_context->max_version,
486
63
                        nullptr, nullptr, nullptr);
487
63
                if (!st.ok()) {
488
0
                    return st;
489
0
                }
490
491
63
                size_t total_rows =
492
63
                        std::accumulate(segments.begin(), segments.end(), 0,
493
63
                                        [](size_t sum, const segment_v2::SegmentSharedPtr& s) {
494
63
                                            return sum += s->num_rows();
495
63
                                        });
496
63
                LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: "
497
63
                          << _context.tablet->tablet_id()
498
63
                          << ", rowset_ids: " << _context.mow_context->rowset_ids->size()
499
63
                          << ", cur max_version: " << _context.mow_context->max_version
500
63
                          << ", transaction_id: " << _context.mow_context->txn_id
501
63
                          << ", delete_bitmap_count: "
502
63
                          << _context.mow_context->delete_bitmap->get_delete_bitmap_count()
503
63
                          << ", delete_bitmap_cardinality: "
504
63
                          << _context.mow_context->delete_bitmap->cardinality()
505
63
                          << ", cost: " << watch.get_elapse_time_us()
506
63
                          << "(us), total rows: " << total_rows;
507
                // Mark only after delete bitmap entries are published to mow_context. Otherwise
508
                // segcompaction could convert bitmap before this segment's entries exist.
509
63
                _mark_segment_delete_bitmap_calculated(segment_id);
510
63
                return Status::OK();
511
63
            });
512
63
}
513
514
902
Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
515
902
    RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context));
516
902
    if (_segcompaction_worker) {
517
902
        _segcompaction_worker->init_mem_tracker(rowset_writer_context);
518
902
    }
519
902
    if (_context.mow_context != nullptr) {
520
9
        _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor_for_load()->create_token();
521
9
    }
522
902
    return Status::OK();
523
902
}
524
525
Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment,
526
77
                                                    int32_t segment_id) {
527
77
    DCHECK(_rowset_meta->is_local());
528
77
    auto fs = _rowset_meta->fs();
529
77
    if (!fs) {
530
0
        return Status::Error<INIT_FAILED>(
531
0
                "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs get failed");
532
0
    }
533
77
    auto path =
534
77
            local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), segment_id);
535
77
    io::FileReaderOptions reader_options {
536
77
            .cache_type =
537
77
                    _context.write_file_cache
538
77
                            ? (config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
539
0
                                                         : io::FileCachePolicy::NO_CACHE)
540
77
                            : io::FileCachePolicy::NO_CACHE,
541
77
            .is_doris_table = true,
542
77
            .cache_base_path {},
543
77
            .tablet_id = _rowset_meta->tablet_id(),
544
77
    };
545
77
    auto s = segment_v2::Segment::open(fs, path, _rowset_meta->tablet_id(), segment_id, rowset_id(),
546
77
                                       _context.tablet_schema, reader_options, &segment);
547
77
    if (!s.ok()) {
548
0
        LOG(WARNING) << "failed to open segment. " << path << ":" << s;
549
0
        return s;
550
0
    }
551
77
    return Status::OK();
552
77
}
553
554
/* policy of segcompaction target selection:
555
 *  1. skip big segments
556
 *  2. if the consecutive smalls end up with a big, compact the smalls, except
557
 *     single small
558
 *  3. if the consecutive smalls end up with small, compact the smalls if the
559
 *     length is beyond (config::segcompaction_batch_size / 2)
560
 */
561
// NOLINTNEXTLINE(readability-function-cognitive-complexity): existing selection logic is kept intact.
562
Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
563
15
        SegCompactionCandidatesSharedPtr& segments) {
564
15
    segments = std::make_shared<SegCompactionCandidates>();
565
    // skip last (maybe active) segment
566
15
    int32_t last_segment = _num_segment - 1;
567
15
    size_t task_bytes = 0;
568
15
    uint32_t task_rows = 0;
569
15
    int32_t segid;
570
15
    for (segid = _segcompacted_point;
571
86
         segid < last_segment && segments->size() < config::segcompaction_batch_size; segid++) {
572
77
        if (!_is_segment_delete_bitmap_calculated(segid)) {
573
0
            VLOG_DEBUG << "stop segcompaction at segid=" << segid
574
0
                       << " because its delete bitmap is not calculated yet, tablet_id="
575
0
                       << _context.tablet_id << ", rowset_id=" << _context.rowset_id;
576
0
            break;
577
0
        }
578
77
        segment_v2::SegmentSharedPtr segment;
579
77
        RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid));
580
77
        const auto segment_rows = segment->num_rows();
581
77
        const auto segment_bytes = segment->file_reader()->size();
582
77
        bool is_large_segment = segment_rows > config::segcompaction_candidate_max_rows ||
583
77
                                segment_bytes > config::segcompaction_candidate_max_bytes;
584
77
        if (is_large_segment) {
585
14
            if (segid == _segcompacted_point) {
586
                // skip large segments at the front
587
8
                auto dst_seg_id = _num_segcompacted.load();
588
8
                RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
589
8
                if (_segcompaction_worker->need_convert_delete_bitmap()) {
590
4
                    _segcompaction_worker->convert_segment_delete_bitmap(
591
4
                            _context.mow_context->delete_bitmap, segid, dst_seg_id);
592
4
                }
593
8
                continue;
594
8
            } else {
595
                // stop because we need consecutive segments
596
6
                break;
597
6
            }
598
14
        }
599
63
        bool is_task_full = task_rows + segment_rows > config::segcompaction_task_max_rows ||
600
63
                            task_bytes + segment_bytes > config::segcompaction_task_max_bytes;
601
63
        if (is_task_full) {
602
0
            break;
603
0
        }
604
63
        segments->push_back(segment);
605
63
        task_rows += segment->num_rows();
606
63
        task_bytes += segment->file_reader()->size();
607
63
    }
608
15
    size_t s = segments->size();
609
15
    const size_t min_final_batch_size = cast_set<size_t>(config::segcompaction_batch_size / 2);
610
15
    if (segid == last_segment && s <= min_final_batch_size) {
611
        // we didn't collect enough segments, better to do it in next
612
        // round to compact more at once
613
0
        segments->clear();
614
0
        return Status::OK();
615
0
    }
616
15
    if (s == 1) { // poor bachelor, let it go
617
4
        VLOG_DEBUG << "only one candidate segment";
618
4
        auto src_seg_id = _segcompacted_point.load();
619
4
        auto dst_seg_id = _num_segcompacted.load();
620
4
        RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
621
4
        if (_segcompaction_worker->need_convert_delete_bitmap()) {
622
2
            _segcompaction_worker->convert_segment_delete_bitmap(
623
2
                    _context.mow_context->delete_bitmap, src_seg_id, dst_seg_id);
624
2
        }
625
4
        segments->clear();
626
4
        return Status::OK();
627
4
    }
628
11
    if (VLOG_DEBUG_IS_ON) {
629
0
        vlog_buffer.clear();
630
0
        for (auto& segment : (*segments.get())) {
631
0
            fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), segment->num_rows());
632
0
        }
633
0
        VLOG_DEBUG << "candidate segments num:" << s
634
0
                   << " list of candidates:" << fmt::to_string(vlog_buffer);
635
0
    }
636
11
    return Status::OK();
637
15
}
638
639
11
Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) {
640
11
    int ret;
641
11
    auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
642
11
                                                                    _context.rowset_id, begin, end);
643
11
    auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(),
644
11
                                           _num_segcompacted);
645
11
    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
646
11
    if (ret) {
647
0
        return Status::Error<ROWSET_RENAME_FILE_FAILED>(
648
0
                "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret,
649
0
                errno);
650
0
    }
651
11
    RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path));
652
653
    // rename inverted index files
654
11
    RETURN_IF_ERROR(_rename_compacted_indices(begin, end, 0));
655
656
11
    _num_segcompacted++;
657
11
    return Status::OK();
658
11
}
659
660
void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint32_t begin,
661
43
                                                                      uint32_t end) {
662
43
    VLOG_DEBUG << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end;
663
134
    for (uint32_t i = begin; i <= end; ++i) {
664
91
        _segid_statistics_map.erase(i);
665
91
    }
666
43
}
667
668
42
Status BetaRowsetWriter::_rename_compacted_segment_plain(uint32_t seg_id) {
669
42
    if (seg_id == cast_set<uint32_t>(_num_segcompacted.load())) {
670
10
        ++_num_segcompacted;
671
10
        return Status::OK();
672
10
    }
673
674
32
    auto src_seg_path =
675
32
            local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), seg_id);
676
32
    auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(),
677
32
                                           _num_segcompacted);
678
32
    VLOG_DEBUG << "segcompaction skip this segment. rename " << src_seg_path << " to "
679
0
               << dst_seg_path;
680
32
    {
681
32
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
682
32
        DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false);
683
32
        DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(),
684
32
                  true);
685
32
        auto org = _segid_statistics_map[seg_id];
686
32
        _segid_statistics_map.emplace(_num_segcompacted, org);
687
32
        _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id);
688
32
    }
689
32
    int ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
690
32
    if (ret) {
691
0
        return Status::Error<ROWSET_RENAME_FILE_FAILED>(
692
0
                "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret,
693
0
                errno);
694
0
    }
695
696
32
    RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path));
697
    // rename remaining inverted index files
698
32
    RETURN_IF_ERROR(_rename_compacted_indices(-1, -1, seg_id));
699
700
32
    ++_num_segcompacted;
701
32
    return Status::OK();
702
32
}
703
704
Status BetaRowsetWriter::_remove_segment_footer_cache(const uint32_t seg_id,
705
43
                                                      const std::string& segment_path) {
706
43
    auto* footer_page_cache = ExecEnv::GetInstance()->get_storage_page_cache();
707
43
    if (!footer_page_cache) {
708
0
        return Status::OK();
709
0
    }
710
711
43
    auto fs = _rowset_meta->fs();
712
43
    bool exists = false;
713
43
    RETURN_IF_ERROR(fs->exists(segment_path, &exists));
714
43
    if (exists) {
715
43
        io::FileReaderSPtr file_reader;
716
43
        io::FileReaderOptions reader_options {
717
43
                .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
718
43
                                                        : io::FileCachePolicy::NO_CACHE,
719
43
                .is_doris_table = true,
720
43
                .cache_base_path = "",
721
43
                .file_size = _rowset_meta->segment_file_size(static_cast<int>(seg_id)),
722
43
                .tablet_id = _rowset_meta->tablet_id(),
723
43
        };
724
43
        RETURN_IF_ERROR(fs->open_file(segment_path, &file_reader, &reader_options));
725
43
        DCHECK(file_reader != nullptr);
726
43
        auto cache_key = segment_v2::Segment::get_segment_footer_cache_key(file_reader);
727
43
        footer_page_cache->erase(cache_key, segment_v2::PageTypePB::INDEX_PAGE);
728
43
    }
729
43
    return Status::OK();
730
43
}
731
732
43
Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id) {
733
43
    int ret;
734
735
43
    auto src_seg_path = begin < 0 ? local_segment_path(_context.tablet_path,
736
32
                                                       _context.rowset_id.to_string(), seg_id)
737
43
                                  : BetaRowset::local_segment_path_segcompacted(
738
11
                                            _context.tablet_path, _context.rowset_id, begin, end);
739
43
    auto src_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(src_seg_path);
740
43
    auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(),
741
43
                                           _num_segcompacted);
742
43
    auto dst_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(dst_seg_path);
743
744
43
    if (_context.tablet_schema->get_inverted_index_storage_format() >=
745
43
        InvertedIndexStorageFormatPB::V2) {
746
22
        if (_context.tablet_schema->has_inverted_index() ||
747
22
            _context.tablet_schema->has_ann_index()) {
748
22
            auto src_idx_path =
749
22
                    InvertedIndexDescriptor::get_index_file_path_v2(src_index_path_prefix);
750
22
            auto dst_idx_path =
751
22
                    InvertedIndexDescriptor::get_index_file_path_v2(dst_index_path_prefix);
752
753
22
            ret = rename(src_idx_path.c_str(), dst_idx_path.c_str());
754
22
            if (ret) {
755
0
                return Status::Error<ROWSET_RENAME_FILE_FAILED>(
756
0
                        "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, dst_idx_path,
757
0
                        ret, errno);
758
0
            }
759
22
        }
760
22
    }
761
    // rename remaining inverted index files
762
150
    for (auto column : _context.tablet_schema->columns()) {
763
150
        auto index_infos = _context.tablet_schema->inverted_indexs(*column);
764
150
        for (const auto& index_info : index_infos) {
765
44
            auto index_id = index_info->index_id();
766
44
            if (_context.tablet_schema->get_inverted_index_storage_format() ==
767
44
                InvertedIndexStorageFormatPB::V1) {
768
0
                auto src_idx_path = InvertedIndexDescriptor::get_index_file_path_v1(
769
0
                        src_index_path_prefix, index_id, index_info->get_index_suffix());
770
0
                auto dst_idx_path = InvertedIndexDescriptor::get_index_file_path_v1(
771
0
                        dst_index_path_prefix, index_id, index_info->get_index_suffix());
772
0
                VLOG_DEBUG << "segcompaction skip this index. rename " << src_idx_path << " to "
773
0
                           << dst_idx_path;
774
0
                ret = rename(src_idx_path.c_str(), dst_idx_path.c_str());
775
0
                if (ret) {
776
0
                    return Status::Error<INVERTED_INDEX_RENAME_FILE_FAILED>(
777
0
                            "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path,
778
0
                            dst_idx_path, ret, errno);
779
0
                }
780
0
            }
781
            // Erase the origin index file cache
782
44
            auto src_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key(
783
44
                    src_index_path_prefix, index_id, index_info->get_index_suffix());
784
44
            auto dst_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key(
785
44
                    dst_index_path_prefix, index_id, index_info->get_index_suffix());
786
44
            RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(src_idx_cache_key));
787
44
            RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(dst_idx_cache_key));
788
44
        }
789
150
    }
790
43
    return Status::OK();
791
43
}
792
793
1.20k
Status BetaRowsetWriter::_segcompaction_if_necessary() {
794
1.20k
    Status status = Status::OK();
795
    // if not doing segcompaction, just check segment number
796
1.20k
    if (!config::enable_segcompaction || !_context.enable_segcompaction ||
797
1.20k
        _context.tablet_schema->num_variant_columns() > 0) {
798
1.08k
        return _check_segment_number_limit(_num_segment);
799
1.08k
    }
800
    // leave _is_doing_segcompaction as the last condition
801
    // otherwise _segcompacting_cond will never get notified
802
121
    if (_is_doing_segcompaction.exchange(true)) {
803
0
        return status;
804
0
    }
805
121
    if (_segcompaction_status.load() != OK) {
806
0
        status = Status::Error<SEGCOMPACTION_FAILED>(
807
0
                "BetaRowsetWriter::_segcompaction_if_necessary meet invalid state, error code: {}",
808
0
                _segcompaction_status.load());
809
121
    } else {
810
121
        status = _check_segment_number_limit(_num_segcompacted);
811
121
    }
812
121
    if (status.ok() && (_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) {
813
15
        SegCompactionCandidatesSharedPtr segments;
814
15
        status = _find_longest_consecutive_small_segment(segments);
815
15
        if (LIKELY(status.ok()) && (!segments->empty())) {
816
11
            LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id
817
11
                      << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment
818
11
                      << ", segcompacted_point:" << _segcompacted_point;
819
11
            status = _engine.submit_seg_compaction_task(_segcompaction_worker, segments);
820
11
            if (status.ok()) {
821
11
                return status;
822
11
            }
823
11
        }
824
15
    }
825
110
    {
826
110
        std::lock_guard lk(_is_doing_segcompaction_lock);
827
110
        _is_doing_segcompaction = false;
828
110
        _segcompacting_cond.notify_all();
829
110
    }
830
110
    return status;
831
121
}
832
833
541
Status BetaRowsetWriter::_segcompaction_rename_last_segments() {
834
541
    DCHECK_EQ(_is_doing_segcompaction, false);
835
541
    if (!config::enable_segcompaction) {
836
144
        return Status::OK();
837
144
    }
838
397
    if (_segcompaction_status.load() != OK) {
839
0
        return Status::Error<SEGCOMPACTION_FAILED>(
840
0
                "BetaRowsetWriter::_segcompaction_rename_last_segments meet invalid state, error "
841
0
                "code: {}",
842
0
                _segcompaction_status.load());
843
0
    }
844
397
    if (!is_segcompacted() || _segcompacted_point == _num_segment) {
845
        // no need if never segcompact before or all segcompacted
846
388
        return Status::OK();
847
388
    }
848
    // currently we only rename remaining segments to reduce wait time
849
    // so that transaction can be committed ASAP
850
9
    VLOG_DEBUG << "segcompaction last few segments";
851
39
    for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) {
852
30
        auto dst_segid = _num_segcompacted.load();
853
30
        RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
854
30
        if (_segcompaction_worker->need_convert_delete_bitmap()) {
855
16
            _segcompaction_worker->convert_segment_delete_bitmap(
856
16
                    _context.mow_context->delete_bitmap, segid, dst_segid);
857
16
        }
858
30
    }
859
9
    return Status::OK();
860
9
}
861
862
10
Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
863
10
    assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET);
864
10
    RETURN_IF_ERROR(rowset->link_files_to(_context.tablet_path, _context.rowset_id));
865
10
    _num_rows_written += rowset->num_rows();
866
10
    const auto& rowset_meta = rowset->rowset_meta();
867
10
    auto index_size = rowset_meta->index_disk_size();
868
10
    auto total_size = rowset_meta->total_disk_size();
869
10
    auto data_size = rowset_meta->data_disk_size();
870
    // corrupted index size caused by bug before 2.1.5 or 3.0.0 version
871
    // try to get real index size from disk.
872
10
    if (index_size < 0 || index_size > total_size * 2) {
873
0
        LOG(ERROR) << "invalid index size:" << index_size << " total size:" << total_size
874
0
                   << " data size:" << data_size << " tablet:" << rowset_meta->tablet_id()
875
0
                   << " rowset:" << rowset_meta->rowset_id();
876
0
        index_size = 0;
877
0
        auto st = rowset->get_inverted_index_size(&index_size);
878
0
        if (!st.ok()) {
879
0
            if (!st.is<NOT_FOUND>()) {
880
0
                LOG(ERROR) << "failed to get inverted index size. res=" << st;
881
0
                return st;
882
0
            }
883
0
        }
884
0
    }
885
10
    _total_data_size += data_size;
886
10
    _total_index_size += index_size;
887
10
    _num_segment += cast_set<int32_t>(rowset->num_segments());
888
    // append key_bounds to current rowset
889
10
    RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds));
890
10
    rowset->get_num_segment_rows(&_segment_num_rows);
891
10
    _segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated();
892
893
    // TODO update zonemap
894
10
    if (rowset->rowset_meta()->has_delete_predicate()) {
895
0
        _rowset_meta->set_delete_predicate(rowset->rowset_meta()->delete_predicate());
896
0
    }
897
    // Update the tablet schema in the rowset metadata if the tablet schema contains a variant.
898
    // During the build process, _context.tablet_schema will be used as the rowset schema.
899
    // This situation may arise in the event of a linked schema change. If this schema is not set,
900
    // the subcolumns of the variant will be lost.
901
10
    if (_context.tablet_schema->num_variant_columns() > 0 && rowset->tablet_schema() != nullptr) {
902
0
        _context.tablet_schema = rowset->tablet_schema();
903
0
    }
904
10
    return Status::OK();
905
10
}
906
907
0
Status BaseBetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) {
908
    // TODO use schema_mapping to transfer zonemap
909
0
    return add_rowset(rowset);
910
0
}
911
912
1.04k
Status BaseBetaRowsetWriter::flush() {
913
1.04k
    return _segment_creator.flush();
914
1.04k
}
915
916
12
Status BaseBetaRowsetWriter::flush_memtable(Block* block, int32_t segment_id, int64_t* flush_size) {
917
12
    if (block->rows() == 0) {
918
0
        return Status::OK();
919
0
    }
920
921
12
    {
922
12
        SCOPED_RAW_TIMER(&_segment_writer_ns);
923
12
        RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size));
924
12
    }
925
12
    return Status::OK();
926
12
}
927
928
4
Status BaseBetaRowsetWriter::flush_single_block(const Block* block) {
929
4
    return _segment_creator.flush_single_block(block);
930
4
}
931
932
902
Status BetaRowsetWriter::_wait_flying_segcompaction() {
933
902
    std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock);
934
902
    uint64_t begin_wait = GetCurrentTimeMicros();
935
902
    while (_is_doing_segcompaction) {
936
        // change sync wait to async?
937
0
        _segcompacting_cond.wait(l);
938
0
    }
939
902
    uint64_t elapsed = GetCurrentTimeMicros() - begin_wait;
940
902
    if (elapsed >= MICROS_PER_SEC) {
941
0
        LOG(INFO) << "wait flying segcompaction finish time:" << elapsed << "us";
942
0
    }
943
902
    if (_segcompaction_status.load() != OK) {
944
0
        return Status::Error<SEGCOMPACTION_FAILED>(
945
0
                "BetaRowsetWriter meet invalid state, error code: {}",
946
0
                _segcompaction_status.load());
947
0
    }
948
902
    return Status::OK();
949
902
}
950
951
8
Status BetaRowsetWriter::_retry_pending_segcompaction_after_delete_bitmap() {
952
8
    while ((_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) {
953
0
        RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(),
954
0
                                       "segcompaction failed when build new rowset");
955
0
        const int32_t old_segcompacted_point = _segcompacted_point.load();
956
0
        const int32_t old_num_segcompacted = _num_segcompacted.load();
957
0
        RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_if_necessary(),
958
0
                                       "segcompaction failed when build new rowset");
959
0
        RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(),
960
0
                                       "segcompaction failed when build new rowset");
961
0
        if (_segcompacted_point == old_segcompacted_point &&
962
0
            _num_segcompacted == old_num_segcompacted) {
963
0
            break;
964
0
        }
965
0
    }
966
8
    return Status::OK();
967
8
}
968
969
541
Status BetaRowsetWriter::_finish_flying_segcompaction(bool need_final_segcompaction_retry) {
970
541
    if (need_final_segcompaction_retry) {
971
8
        return _retry_pending_segcompaction_after_delete_bitmap();
972
8
    }
973
533
    if (_segcompaction_worker->cancel()) {
974
533
        std::lock_guard lk(_is_doing_segcompaction_lock);
975
533
        _is_doing_segcompaction = false;
976
533
        _segcompacting_cond.notify_all();
977
533
        return Status::OK();
978
533
    }
979
0
    RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(),
980
0
                                   "segcompaction failed when build new rowset");
981
0
    return Status::OK();
982
0
}
983
984
24
RowsetSharedPtr BaseBetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_rowset_meta) {
985
24
    if (_rowset_meta->newest_write_timestamp() == -1) {
986
0
        _rowset_meta->set_newest_write_timestamp(UnixSeconds());
987
0
    }
988
989
24
    build_rowset_meta_with_spec_field(*_rowset_meta, *spec_rowset_meta);
990
24
    RowsetSharedPtr rowset;
991
24
    auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path,
992
24
                                               _rowset_meta, &rowset);
993
24
    if (!status.ok()) {
994
0
        LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
995
0
        return nullptr;
996
0
    }
997
24
    _already_built = true;
998
24
    return rowset;
999
24
}
1000
1001
0
Status BaseBetaRowsetWriter::_close_file_writers() {
1002
    // Flush and close segment files
1003
0
    RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(),
1004
0
                                   "failed to close segment creator when build new rowset");
1005
0
    return Status::OK();
1006
0
}
1007
1008
541
Status BetaRowsetWriter::_close_file_writers() {
1009
    // Flush first to submit delete-bitmap tasks for the last segment, then wait before closing
1010
    // the shared file writers so build() cannot race with delete-bitmap workers on close().
1011
541
    RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.flush(),
1012
541
                                   "failed to flush segment creator when build new rowset");
1013
541
    const bool need_final_segcompaction_retry =
1014
541
            _calc_delete_bitmap_token != nullptr && config::enable_segcompaction &&
1015
541
            _context.enable_segcompaction && _context.tablet_schema->num_variant_columns() == 0;
1016
541
    if (_calc_delete_bitmap_token != nullptr) {
1017
9
        RETURN_IF_ERROR(_calc_delete_bitmap_token->wait());
1018
9
    }
1019
541
    RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(),
1020
541
                                   "failed to close segment creator when build new rowset");
1021
    // if _segment_start_id is not zero, that means it's a transient rowset writer for
1022
    // MoW partial update, don't need to do segment compaction.
1023
541
    if (_segment_start_id == 0) {
1024
541
        RETURN_IF_ERROR(_finish_flying_segcompaction(need_final_segcompaction_retry));
1025
541
        RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(),
1026
541
                                       "rename last segments failed when build new rowset");
1027
        // segcompaction worker would do file wrier's close function in compact_segments
1028
541
        if (auto& seg_comp_file_writer = _segcompaction_worker->get_file_writer();
1029
541
            nullptr != seg_comp_file_writer &&
1030
541
            seg_comp_file_writer->state() != io::FileWriter::State::CLOSED) {
1031
0
            RETURN_NOT_OK_STATUS_WITH_WARN(seg_comp_file_writer->close(),
1032
0
                                           "close segment compaction worker failed");
1033
0
        }
1034
        // process delete bitmap for mow table
1035
541
        if (is_segcompacted() && _segcompaction_worker->need_convert_delete_bitmap()) {
1036
4
            auto converted_delete_bitmap = _segcompaction_worker->get_converted_delete_bitmap();
1037
            // which means the segment compaction is triggerd
1038
4
            if (converted_delete_bitmap != nullptr) {
1039
4
                RowsetIdUnorderedSet rowsetids;
1040
4
                rowsetids.insert(rowset_id());
1041
4
                context().tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(),
1042
4
                                                                     rowsetids);
1043
4
                context().mow_context->delete_bitmap->remove({rowset_id(), 0, 0},
1044
4
                                                             {rowset_id(), UINT32_MAX, INT64_MAX});
1045
4
                context().mow_context->delete_bitmap->merge(*converted_delete_bitmap);
1046
4
            }
1047
4
        }
1048
541
    }
1049
541
    return Status::OK();
1050
541
}
1051
1052
869
Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
1053
869
    RETURN_IF_ERROR(_close_file_writers());
1054
869
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(_num_seg()),
1055
869
                                   "too many segments when build new rowset");
1056
869
    RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get(), true));
1057
869
    if (_is_pending) {
1058
30
        _rowset_meta->set_rowset_state(COMMITTED);
1059
839
    } else {
1060
839
        _rowset_meta->set_rowset_state(VISIBLE);
1061
839
    }
1062
1063
869
    if (_rowset_meta->newest_write_timestamp() == -1) {
1064
631
        _rowset_meta->set_newest_write_timestamp(UnixSeconds());
1065
631
    }
1066
1067
869
    _rowset_meta->set_tablet_schema(_context.tablet_schema);
1068
1069
    // If segment compaction occurs, the idx file info will become inaccurate.
1070
869
    if ((_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) &&
1071
869
        _num_segcompacted == 0) {
1072
107
        if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id);
1073
107
            !idx_files_info.has_value()) [[unlikely]] {
1074
0
            LOG(ERROR) << "expected inverted index files info, but none presents: "
1075
0
                       << idx_files_info.error();
1076
107
        } else {
1077
107
            _rowset_meta->add_inverted_index_files_info(idx_files_info.value());
1078
107
        }
1079
107
    }
1080
1081
869
    RETURN_NOT_OK_STATUS_WITH_WARN(
1082
869
            RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta,
1083
869
                                         &rowset),
1084
869
            "rowset init failed when build new rowset");
1085
869
    _already_built = true;
1086
869
    return Status::OK();
1087
869
}
1088
1089
0
int64_t BaseBetaRowsetWriter::_num_seg() const {
1090
0
    return _num_segment;
1091
0
}
1092
1093
1.80k
int64_t BetaRowsetWriter::_num_seg() const {
1094
1.80k
    return is_segcompacted() ? _num_segcompacted : _num_segment;
1095
1.80k
}
1096
1097
932
Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num) {
1098
932
    int64_t num_rows_written = 0;
1099
932
    int64_t total_data_size = 0;
1100
932
    int64_t total_index_size = 0;
1101
932
    std::vector<KeyBoundsPB> segments_encoded_key_bounds;
1102
932
    std::vector<uint32_t> segment_rows;
1103
932
    std::optional<bool> segments_key_bounds_truncated;
1104
932
    {
1105
932
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
1106
1.50k
        for (const auto& itr : _segid_statistics_map) {
1107
1.50k
            num_rows_written += itr.second.row_num;
1108
1.50k
            total_data_size += itr.second.data_size;
1109
1.50k
            total_index_size += itr.second.index_size;
1110
1.50k
            segments_encoded_key_bounds.push_back(itr.second.key_bounds);
1111
            // segcompaction don't modify _segment_num_rows, so we need to get segment rows from _segid_statistics_map for load
1112
1.50k
            segment_rows.push_back(cast_set<uint32_t>(itr.second.row_num));
1113
1.50k
        }
1114
932
        segments_key_bounds_truncated = _segments_key_bounds_truncated;
1115
932
    }
1116
932
    if (segment_rows.empty()) {
1117
        // vertical compaction and linked schema change will not record segment statistics,
1118
        // it will record segment rows in _segment_num_rows
1119
316
        RETURN_IF_ERROR(get_segment_num_rows(&segment_rows));
1120
316
    }
1121
1122
1.18k
    for (auto& key_bound : _segments_encoded_key_bounds) {
1123
1.18k
        segments_encoded_key_bounds.push_back(key_bound);
1124
1.18k
    }
1125
932
    if (segments_key_bounds_truncated.has_value()) {
1126
406
        rowset_meta->set_segments_key_bounds_truncated(segments_key_bounds_truncated.value());
1127
406
    }
1128
932
    rowset_meta->set_num_segment_rows(segment_rows);
1129
    // segment key bounds are empty in old version(before version 1.2.x). So we should not modify
1130
    // the overlap property when key bounds are empty.
1131
    // for mow table with cluster keys, the overlap is used for cluster keys,
1132
    // the key_bounds is primary keys
1133
932
    if (!segments_encoded_key_bounds.empty() &&
1134
932
        !is_segment_overlapping(segments_encoded_key_bounds) &&
1135
932
        _context.tablet_schema->cluster_key_uids().empty()) {
1136
442
        rowset_meta->set_segments_overlap(NONOVERLAPPING);
1137
442
    }
1138
1139
932
    auto segment_num = _num_seg();
1140
932
    if (check_segment_num && config::check_segment_when_build_rowset_meta) {
1141
0
        auto segments_encoded_key_bounds_size = segments_encoded_key_bounds.size();
1142
0
        if (segments_encoded_key_bounds_size != segment_num) {
1143
0
            return Status::InternalError(
1144
0
                    "segments_encoded_key_bounds_size should  equal to _num_seg, "
1145
0
                    "segments_encoded_key_bounds_size "
1146
0
                    "is: {}, _num_seg is: {}",
1147
0
                    segments_encoded_key_bounds_size, segment_num);
1148
0
        }
1149
0
        if (segment_rows.size() != segment_num) {
1150
0
            return Status::InternalError(
1151
0
                    "segment_rows size should equal to _num_seg, segment_rows size is: {}, "
1152
0
                    "_num_seg is {}, tablet={}, rowset={}, txn={}",
1153
0
                    segment_rows.size(), segment_num, _context.tablet_id,
1154
0
                    _context.rowset_id.to_string(), _context.txn_id);
1155
0
        }
1156
0
    }
1157
1158
932
    rowset_meta->set_num_segments(segment_num);
1159
932
    rowset_meta->set_num_rows(num_rows_written + _num_rows_written);
1160
932
    rowset_meta->set_total_disk_size(total_data_size + _total_data_size + total_index_size +
1161
932
                                     _total_index_size);
1162
932
    rowset_meta->set_data_disk_size(total_data_size + _total_data_size);
1163
932
    rowset_meta->set_index_disk_size(total_index_size + _total_index_size);
1164
932
    bool aggregate_key_bounds = config::enable_aggregate_non_mow_key_bounds &&
1165
932
                                !_context.enable_unique_key_merge_on_write;
1166
932
    rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds, aggregate_key_bounds);
1167
    // TODO write zonemap to meta
1168
932
    rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0);
1169
932
    rowset_meta->set_creation_time(time(nullptr));
1170
932
    return Status::OK();
1171
932
}
1172
1173
63
Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr& rowset_ptr) {
1174
63
    Status status;
1175
63
    std::shared_ptr<RowsetMeta> tmp_rs_meta = std::make_shared<RowsetMeta>();
1176
63
    tmp_rs_meta->init(_rowset_meta.get());
1177
1178
63
    status = _build_rowset_meta(tmp_rs_meta.get());
1179
63
    if (!status.ok()) {
1180
0
        LOG(WARNING) << "failed to build rowset meta, res=" << status;
1181
0
        return status;
1182
0
    }
1183
1184
63
    status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, tmp_rs_meta,
1185
63
                                          &rowset_ptr);
1186
63
    DBUG_EXECUTE_IF("BaseBetaRowsetWriter::_build_tmp.create_rowset_failed",
1187
63
                    { status = Status::InternalError("create rowset failed"); });
1188
63
    if (!status.ok()) {
1189
0
        LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
1190
0
        return status;
1191
0
    }
1192
63
    return Status::OK();
1193
63
}
1194
1195
Status BaseBetaRowsetWriter::_create_file_writer(const std::string& path,
1196
                                                 io::FileWriterPtr& file_writer,
1197
2.68k
                                                 bool is_index_file) {
1198
2.68k
    io::FileWriterOptions opts = _context.get_file_writer_options(is_index_file);
1199
2.68k
    Status st = _context.fs()->create_file(path, &file_writer, &opts);
1200
2.68k
    if (!st.ok()) {
1201
0
        LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st;
1202
0
        return st;
1203
0
    }
1204
1205
2.68k
    DCHECK(file_writer != nullptr);
1206
2.68k
    return Status::OK();
1207
2.68k
}
1208
1209
Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer,
1210
2.66k
                                                FileType file_type) {
1211
2.66k
    auto segment_path = _context.segment_path(segment_id);
1212
2.66k
    if (file_type == FileType::INVERTED_INDEX_FILE) {
1213
254
        std::string prefix =
1214
254
                std::string {InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)};
1215
254
        std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix);
1216
254
        return _create_file_writer(index_path, file_writer, true /* is_index_file */);
1217
2.40k
    } else if (file_type == FileType::SEGMENT_FILE) {
1218
2.40k
        return _create_file_writer(segment_path, file_writer, false /* is_index_file */);
1219
2.40k
    }
1220
0
    return Status::Error<ErrorCode::INTERNAL_ERROR>(
1221
0
            fmt::format("failed to create file = {}, file type = {}", segment_path, file_type));
1222
2.66k
}
1223
1224
Status BaseBetaRowsetWriter::create_index_file_writer(uint32_t segment_id,
1225
227
                                                      IndexFileWriterPtr* index_file_writer) {
1226
227
    RETURN_IF_ERROR(RowsetWriter::create_index_file_writer(segment_id, index_file_writer));
1227
    // used for inverted index format v1
1228
227
    (*index_file_writer)
1229
227
            ->set_file_writer_opts(_context.get_file_writer_options(true /* is_index_file */));
1230
227
    return Status::OK();
1231
227
}
1232
1233
Status BetaRowsetWriter::create_segment_writer_for_segcompaction(
1234
12
        std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end) {
1235
12
    DCHECK(begin >= 0 && end >= 0);
1236
12
    std::string path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
1237
12
                                                                   _context.rowset_id, begin, end);
1238
12
    io::FileWriterPtr file_writer;
1239
12
    RETURN_IF_ERROR(_create_file_writer(path, file_writer, false /* is_index_file */));
1240
1241
12
    IndexFileWriterPtr index_file_writer;
1242
12
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
1243
8
        io::FileWriterPtr idx_file_writer;
1244
8
        std::string prefix(InvertedIndexDescriptor::get_index_file_path_prefix(path));
1245
8
        if (_context.tablet_schema->get_inverted_index_storage_format() !=
1246
8
            InvertedIndexStorageFormatPB::V1) {
1247
8
            std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix);
1248
8
            RETURN_IF_ERROR(
1249
8
                    _create_file_writer(index_path, idx_file_writer, true /* is_index_file */));
1250
8
        }
1251
8
        index_file_writer = std::make_unique<IndexFileWriter>(
1252
8
                _context.fs(), prefix, _context.rowset_id.to_string(), _num_segcompacted,
1253
8
                _context.tablet_schema->get_inverted_index_storage_format(),
1254
8
                std::move(idx_file_writer), true /* can_use_ram_dir */, _context.tablet_id);
1255
8
    }
1256
1257
12
    segment_v2::SegmentWriterOptions writer_options;
1258
12
    writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
1259
12
    writer_options.rowset_ctx = &_context;
1260
12
    writer_options.write_type = _context.write_type;
1261
12
    writer_options.write_type = DataWriteType::TYPE_COMPACTION;
1262
12
    writer_options.max_rows_per_segment = _context.max_rows_per_segment;
1263
12
    writer_options.mow_ctx = _context.mow_context;
1264
1265
12
    *writer = std::make_unique<segment_v2::SegmentWriter>(
1266
12
            file_writer.get(), _num_segcompacted, _context.tablet_schema, _context.tablet,
1267
12
            _context.data_dir, writer_options, index_file_writer.get());
1268
12
    if (auto& seg_writer = _segcompaction_worker->get_file_writer();
1269
12
        seg_writer != nullptr && seg_writer->state() != io::FileWriter::State::CLOSED) {
1270
0
        RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close());
1271
0
    }
1272
12
    _segcompaction_worker->get_file_writer().reset(file_writer.release());
1273
12
    if (auto& idx_file_writer = _segcompaction_worker->get_inverted_index_file_writer();
1274
12
        idx_file_writer != nullptr) {
1275
0
        RETURN_IF_ERROR(idx_file_writer->begin_close());
1276
0
        RETURN_IF_ERROR(idx_file_writer->finish_close());
1277
0
    }
1278
12
    _segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release());
1279
12
    return Status::OK();
1280
12
}
1281
1282
0
Status BaseBetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
1283
0
    DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
1284
0
                    { segnum = dp->param("segnum", 1024); });
1285
0
    if (UNLIKELY(segnum > cast_set<size_t>(config::max_segment_num_per_rowset))) {
1286
0
        return Status::Error<TOO_MANY_SEGMENTS>(
1287
0
                "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, "
1288
0
                "_num_segment:{}, rowset_num_rows:{}. Please check if the bucket number is too "
1289
0
                "small or if the data is skewed.",
1290
0
                _context.tablet_id, _context.rowset_id.to_string(),
1291
0
                config::max_segment_num_per_rowset, _num_segment, get_rowset_num_rows());
1292
0
    }
1293
0
    return Status::OK();
1294
0
}
1295
1296
2.07k
Status BetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
1297
2.07k
    DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
1298
2.07k
                    { segnum = dp->param("segnum", 1024); });
1299
2.07k
    if (UNLIKELY(segnum > cast_set<size_t>(config::max_segment_num_per_rowset))) {
1300
0
        return Status::Error<TOO_MANY_SEGMENTS>(
1301
0
                "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, "
1302
0
                "_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}. Please check if "
1303
0
                "the bucket number is too small or if the data is skewed.",
1304
0
                _context.tablet_id, _context.rowset_id.to_string(),
1305
0
                config::max_segment_num_per_rowset, _num_segment, _segcompacted_point,
1306
0
                _num_segcompacted, get_rowset_num_rows());
1307
0
    }
1308
2.07k
    return Status::OK();
1309
2.07k
}
1310
1311
1.20k
Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
1312
1.20k
    uint32_t segid_offset = segment_id - _segment_start_id;
1313
1.20k
    bool key_bounds_truncated = false;
1314
1.20k
    SegmentStatistics stored_segstat =
1315
1.20k
            copy_segment_statistics_with_truncated_key_bounds(segstat, key_bounds_truncated);
1316
1.20k
    {
1317
1.20k
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
1318
1.20k
        CHECK_EQ(_segid_statistics_map.find(segment_id) == _segid_statistics_map.end(), true);
1319
1.20k
        _segid_statistics_map.emplace(segment_id, std::move(stored_segstat));
1320
1.20k
        if (segment_id >= _segment_num_rows.size()) {
1321
1.20k
            _segment_num_rows.resize(segment_id + 1);
1322
1.20k
        }
1323
1.20k
        _segment_num_rows[segid_offset] = cast_set<uint32_t>(segstat.row_num);
1324
1.20k
        if (key_bounds_truncated) {
1325
702
            _segments_key_bounds_truncated = true;
1326
702
        }
1327
1.20k
    }
1328
1.20k
    VLOG_DEBUG << "_segid_statistics_map add new record. segment_id:" << segment_id
1329
0
               << " row_num:" << segstat.row_num << " data_size:" << segstat.data_size
1330
0
               << " index_size:" << segstat.index_size;
1331
1332
1.20k
    {
1333
1.20k
        std::lock_guard<std::mutex> lock(_segment_set_mutex);
1334
1.20k
        _segment_set.add(segid_offset);
1335
2.40k
        while (_segment_set.contains(_num_segment)) {
1336
1.20k
            _num_segment++;
1337
1.20k
        }
1338
1.20k
    }
1339
1340
1.20k
    if (_context.mow_context != nullptr) {
1341
63
        RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
1342
63
    }
1343
1.20k
    return Status::OK();
1344
1.20k
}
1345
1346
1.20k
Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
1347
1.20k
    RETURN_IF_ERROR(BaseBetaRowsetWriter::add_segment(segment_id, segstat));
1348
1.20k
    DBUG_EXECUTE_IF("BetaRowsetWriter.add_segment.sleep_before_segcompaction", {
1349
1.20k
        auto target_segment_id = dp->param<int64_t>("segment_id", -1);
1350
1.20k
        if (target_segment_id < 0 || segment_id == target_segment_id) {
1351
1.20k
            auto sleep_ms = dp->param<int64_t>("sleep_ms", 200);
1352
1.20k
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
1353
1.20k
        }
1354
1.20k
    });
1355
1.20k
    return _segcompaction_if_necessary();
1356
1.20k
}
1357
1358
Status BetaRowsetWriter::flush_segment_writer_for_segcompaction(
1359
        std::unique_ptr<segment_v2::SegmentWriter>*
1360
                writer, // NOLINT(readability-non-const-parameter): resets the caller-owned writer after finalizing it.
1361
11
        uint64_t index_size, KeyBoundsPB& key_bounds) {
1362
11
    uint32_t segid = (*writer)->get_segment_id();
1363
11
    uint32_t row_num = (*writer)->row_count();
1364
11
    uint64_t segment_size;
1365
1366
11
    auto s = (*writer)->finalize_footer(&segment_size);
1367
11
    if (!s.ok()) {
1368
0
        return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to finalize segment: {}",
1369
0
                                                      s.to_string());
1370
0
    }
1371
11
    int64_t inverted_index_file_size = 0;
1372
11
    RETURN_IF_ERROR((*writer)->close_inverted_index(&inverted_index_file_size));
1373
1374
11
    SegmentStatistics segstat;
1375
11
    segstat.row_num = row_num;
1376
11
    segstat.data_size = segment_size;
1377
11
    segstat.index_size = inverted_index_file_size;
1378
11
    bool key_bounds_truncated = copy_key_bounds_with_truncation(key_bounds, &segstat.key_bounds);
1379
11
    {
1380
11
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
1381
11
        CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true);
1382
11
        _segid_statistics_map.emplace(segid, std::move(segstat));
1383
11
        if (key_bounds_truncated) {
1384
2
            _segments_key_bounds_truncated = true;
1385
2
        }
1386
11
    }
1387
11
    VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num
1388
0
               << " data_size:" << PrettyPrinter::print_bytes(segment_size)
1389
0
               << " index_size:" << PrettyPrinter::print_bytes(inverted_index_file_size);
1390
1391
11
    writer->reset();
1392
1393
11
    return Status::OK();
1394
11
}
1395
1396
} // namespace doris