Coverage Report

Created: 2026-05-13 18:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/beta_rowset_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/rowset/beta_rowset_writer.h"
19
20
#include <assert.h>
21
// IWYU pragma: no_include <bthread/errno.h>
22
#include <errno.h> // IWYU pragma: keep
23
#include <fmt/format.h>
24
#include <stdio.h>
25
26
#include <chrono>
27
#include <ctime> // time
28
#include <filesystem>
29
#include <memory>
30
#include <mutex>
31
#include <sstream>
32
#include <thread>
33
#include <utility>
34
#include <vector>
35
36
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
37
#include "common/cast_set.h"
38
#include "common/compiler_util.h" // IWYU pragma: keep
39
#include "common/config.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "core/block/block.h"
43
#include "core/column/column.h"
44
#include "core/data_type/data_type_factory.hpp"
45
#include "io/fs/file_reader.h"
46
#include "io/fs/file_system.h"
47
#include "io/fs/file_writer.h"
48
#include "runtime/thread_context.h"
49
#include "storage/index/inverted/inverted_index_cache.h"
50
#include "storage/index/inverted/inverted_index_desc.h"
51
#include "storage/olap_define.h"
52
#include "storage/rowset/beta_rowset.h"
53
#include "storage/rowset/rowset_factory.h"
54
#include "storage/rowset/rowset_writer.h"
55
#include "storage/rowset/segcompaction.h"
56
#include "storage/schema_change/schema_change.h"
57
#include "storage/segment/segment.h"
58
#include "storage/segment/segment_writer.h"
59
#include "storage/storage_engine.h"
60
#include "storage/tablet/tablet_schema.h"
61
#include "util/debug_points.h"
62
#include "util/pretty_printer.h"
63
#include "util/slice.h"
64
#include "util/stopwatch.hpp"
65
#include "util/time.h"
66
67
namespace doris {
68
using namespace ErrorCode;
69
70
namespace {
71
72
701
bool is_segment_overlapping(const std::vector<KeyBoundsPB>& segments_encoded_key_bounds) {
73
701
    std::string_view last;
74
1.85k
    for (auto&& segment_encode_key : segments_encoded_key_bounds) {
75
1.85k
        auto&& cur_min = segment_encode_key.min_key();
76
1.85k
        auto&& cur_max = segment_encode_key.max_key();
77
1.85k
        if (cur_min <= last) {
78
143
            return true;
79
143
        }
80
1.70k
        last = cur_max;
81
1.70k
    }
82
558
    return false;
83
701
}
84
85
void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta,
86
24
                                       const RowsetMeta& spec_rowset_meta) {
87
24
    rowset_meta.set_num_rows(spec_rowset_meta.num_rows());
88
24
    rowset_meta.set_total_disk_size(spec_rowset_meta.total_disk_size());
89
24
    rowset_meta.set_data_disk_size(spec_rowset_meta.data_disk_size());
90
24
    rowset_meta.set_index_disk_size(spec_rowset_meta.index_disk_size());
91
    // TODO write zonemap to meta
92
24
    rowset_meta.set_empty(spec_rowset_meta.num_rows() == 0);
93
24
    rowset_meta.set_creation_time(time(nullptr));
94
24
    rowset_meta.set_num_segments(spec_rowset_meta.num_segments());
95
24
    rowset_meta.set_segments_overlap(spec_rowset_meta.segments_overlap());
96
24
    rowset_meta.set_rowset_state(spec_rowset_meta.rowset_state());
97
24
    rowset_meta.set_segments_key_bounds_truncated(
98
24
            spec_rowset_meta.is_segments_key_bounds_truncated());
99
24
    std::vector<KeyBoundsPB> segments_key_bounds;
100
24
    spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds);
101
    // Preserve source layout: if source was aggregated (size 1), re-aggregating
102
    // the single entry is a no-op that also keeps the flag consistent.
103
24
    rowset_meta.set_segments_key_bounds(segments_key_bounds,
104
24
                                        spec_rowset_meta.is_segments_key_bounds_aggregated());
105
24
    std::vector<uint32_t> num_segment_rows;
106
24
    spec_rowset_meta.get_num_segment_rows(&num_segment_rows);
107
24
    rowset_meta.set_num_segment_rows(num_segment_rows);
108
24
    if (spec_rowset_meta.is_row_binlog()) {
109
0
        rowset_meta.mark_row_binlog();
110
0
    }
111
24
}
112
113
} // namespace
114
115
1.00k
SegmentFileCollection::~SegmentFileCollection() = default;
116
117
2.38k
Status SegmentFileCollection::add(int seg_id, io::FileWriterPtr&& writer) {
118
2.38k
    std::lock_guard lock(_lock);
119
2.38k
    if (_closed) [[unlikely]] {
120
0
        DCHECK(false) << writer->path();
121
0
        return Status::InternalError("add to closed SegmentFileCollection");
122
0
    }
123
124
2.38k
    _file_writers.emplace(seg_id, std::move(writer));
125
2.38k
    return Status::OK();
126
2.38k
}
127
128
63
io::FileWriter* SegmentFileCollection::get(int seg_id) const {
129
63
    std::lock_guard lock(_lock);
130
63
    if (auto it = _file_writers.find(seg_id); it != _file_writers.end()) {
131
63
        return it->second.get();
132
63
    } else {
133
0
        return nullptr;
134
0
    }
135
63
}
136
137
969
Status SegmentFileCollection::close() {
138
969
    {
139
969
        std::lock_guard lock(_lock);
140
969
        if (_closed) [[unlikely]] {
141
0
            DCHECK(false);
142
0
            return Status::InternalError("double close SegmentFileCollection");
143
0
        }
144
969
        _closed = true;
145
969
    }
146
147
2.38k
    for (auto&& [_, writer] : _file_writers) {
148
2.38k
        DBUG_EXECUTE_IF("SegmentFileCollection.close.wait_dat_closed", {
149
2.38k
            auto before_state = writer->state();
150
2.38k
            for (int i = 0; i < 3000 && writer->state() != io::FileWriter::State::CLOSED; ++i) {
151
2.38k
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
152
2.38k
            }
153
2.38k
            LOG(INFO) << "SegmentFileCollection.close.wait_dat_closed path="
154
2.38k
                      << writer->path().native()
155
2.38k
                      << " before_state=" << static_cast<int>(before_state)
156
2.38k
                      << " after_state=" << static_cast<int>(writer->state());
157
2.38k
        });
158
2.38k
        if (writer->state() != io::FileWriter::State::CLOSED) {
159
2.31k
            RETURN_IF_ERROR(writer->close());
160
2.31k
        }
161
2.38k
    }
162
163
969
    return Status::OK();
164
969
}
165
166
0
Result<std::vector<size_t>> SegmentFileCollection::segments_file_size(int seg_id_offset) {
167
0
    std::lock_guard lock(_lock);
168
0
    if (!_closed) [[unlikely]] {
169
0
        DCHECK(false);
170
0
        return ResultError(Status::InternalError("get segments file size without closed"));
171
0
    }
172
173
0
    Status st;
174
0
    std::vector<size_t> seg_file_size(_file_writers.size(), 0);
175
0
    bool succ = std::all_of(_file_writers.begin(), _file_writers.end(), [&](auto&& it) {
176
0
        auto&& [seg_id, writer] = it;
177
178
0
        int idx = seg_id - seg_id_offset;
179
0
        if (idx >= seg_file_size.size()) [[unlikely]] {
180
0
            auto err_msg = fmt::format(
181
0
                    "invalid seg_id={} num_file_writers={} seg_id_offset={} path={}", seg_id,
182
0
                    seg_file_size.size(), seg_id_offset, writer->path().native());
183
0
            DCHECK(false) << err_msg;
184
0
            st = Status::InternalError(err_msg);
185
0
            return false;
186
0
        }
187
188
0
        auto& fsize = seg_file_size[idx];
189
0
        if (fsize != 0) {
190
            // File size should not been set
191
0
            auto err_msg =
192
0
                    fmt::format("duplicate seg_id={} path={}", seg_id, writer->path().native());
193
0
            DCHECK(false) << err_msg;
194
0
            st = Status::InternalError(err_msg);
195
0
            return false;
196
0
        }
197
198
0
        fsize = writer->bytes_appended();
199
0
        if (fsize <= 0) {
200
0
            auto err_msg =
201
0
                    fmt::format("invalid segment fsize={} path={}", fsize, writer->path().native());
202
0
            DCHECK(false) << err_msg;
203
0
            st = Status::InternalError(err_msg);
204
0
            return false;
205
0
        }
206
207
0
        return true;
208
0
    });
209
210
0
    if (succ) {
211
0
        return seg_file_size;
212
0
    }
213
214
0
    return ResultError(st);
215
0
}
216
217
1.00k
InvertedIndexFileCollection::~InvertedIndexFileCollection() = default;
218
219
265
Status InvertedIndexFileCollection::add(int seg_id, IndexFileWriterPtr&& index_writer) {
220
265
    std::lock_guard lock(_lock);
221
265
    if (_inverted_index_file_writers.find(seg_id) != _inverted_index_file_writers.end())
222
0
            [[unlikely]] {
223
0
        DCHECK(false);
224
0
        return Status::InternalError("The seg_id already exists, seg_id is {}", seg_id);
225
0
    }
226
265
    _inverted_index_file_writers.emplace(seg_id, std::move(index_writer));
227
265
    return Status::OK();
228
265
}
229
230
328
Status InvertedIndexFileCollection::begin_close() {
231
328
    std::lock_guard lock(_lock);
232
328
    for (auto&& [id, writer] : _inverted_index_file_writers) {
233
36
        RETURN_IF_ERROR(writer->begin_close());
234
36
        _total_size += writer->get_index_file_total_size();
235
36
    }
236
237
328
    return Status::OK();
238
328
}
239
240
969
Status InvertedIndexFileCollection::finish_close() {
241
969
    std::lock_guard lock(_lock);
242
969
    for (auto&& [id, writer] : _inverted_index_file_writers) {
243
264
        RETURN_IF_ERROR(writer->finish_close());
244
264
    }
245
969
    return Status::OK();
246
969
}
247
248
Result<std::vector<const InvertedIndexFileInfo*>>
249
106
InvertedIndexFileCollection::inverted_index_file_info(int seg_id_offset) {
250
106
    std::lock_guard lock(_lock);
251
252
106
    Status st;
253
106
    std::vector<const InvertedIndexFileInfo*> idx_file_info(_inverted_index_file_writers.size());
254
106
    bool succ = std::all_of(
255
106
            _inverted_index_file_writers.begin(), _inverted_index_file_writers.end(),
256
215
            [&](auto&& it) {
257
215
                auto&& [seg_id, writer] = it;
258
259
215
                int idx = seg_id - seg_id_offset;
260
215
                if (idx >= idx_file_info.size()) [[unlikely]] {
261
0
                    auto err_msg =
262
0
                            fmt::format("invalid seg_id={} num_file_writers={} seg_id_offset={}",
263
0
                                        seg_id, idx_file_info.size(), seg_id_offset);
264
0
                    DCHECK(false) << err_msg;
265
0
                    st = Status::InternalError(err_msg);
266
0
                    return false;
267
0
                }
268
215
                idx_file_info[idx] = _inverted_index_file_writers[seg_id]->get_index_file_info();
269
215
                return true;
270
215
            });
271
272
106
    if (succ) {
273
106
        return idx_file_info;
274
106
    }
275
276
0
    return ResultError(st);
277
106
}
278
279
BaseBetaRowsetWriter::BaseBetaRowsetWriter()
280
1.00k
        : _num_segment(0),
281
1.00k
          _segment_start_id(0),
282
1.00k
          _num_rows_written(0),
283
1.00k
          _total_data_size(0),
284
1.00k
          _total_index_size(0),
285
1.00k
          _segment_creator(_context, _seg_files, _idx_files) {}
286
287
BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine)
288
1.00k
        : _engine(engine), _segcompaction_worker(std::make_shared<SegcompactionWorker>(this)) {}
289
290
2
RowBinlogRowsetWriter::RowBinlogRowsetWriter(StorageEngine& engine) : BetaRowsetWriter(engine) {}
291
292
1.00k
BaseBetaRowsetWriter::~BaseBetaRowsetWriter() {
293
1.00k
    if (!_already_built && _rowset_meta->is_local()) {
294
        // abnormal exit, remove all files generated
295
8
        auto& fs = io::global_local_filesystem();
296
8
        for (int i = _segment_start_id; i < _segment_creator.next_segment_id(); ++i) {
297
0
            std::string seg_path =
298
0
                    local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), i);
299
            // Even if an error is encountered, these files that have not been cleaned up
300
            // will be cleaned up by the GC background. So here we only print the error
301
            // message when we encounter an error.
302
0
            WARN_IF_ERROR(fs->delete_file(seg_path),
303
0
                          fmt::format("Failed to delete file={}", seg_path));
304
0
        }
305
8
    }
306
1.00k
    if (_calc_delete_bitmap_token) {
307
8
        _calc_delete_bitmap_token->cancel();
308
8
    }
309
1.00k
}
310
311
1.00k
BetaRowsetWriter::~BetaRowsetWriter() {
312
    /* Note that segcompaction is async and in parallel with load job. So we should handle carefully
313
     * when the job is cancelled. Although it is meaningless to continue segcompaction when the job
314
     * is cancelled, the objects involved in the job should be preserved during segcompaction to
315
     * avoid crashs for memory issues. */
316
1.00k
    WARN_IF_ERROR(_wait_flying_segcompaction(), "segment compaction failed");
317
1.00k
}
318
319
1.00k
Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
320
1.00k
    _context = rowset_writer_context;
321
1.00k
    _rowset_meta.reset(new RowsetMeta);
322
1.00k
    if (_context.storage_resource) {
323
0
        _rowset_meta->set_remote_storage_resource(*_context.storage_resource);
324
0
    }
325
1.00k
    _rowset_meta->set_rowset_id(_context.rowset_id);
326
1.00k
    _rowset_meta->set_partition_id(_context.partition_id);
327
1.00k
    _rowset_meta->set_tablet_id(_context.tablet_id);
328
1.00k
    _rowset_meta->set_index_id(_context.index_id);
329
1.00k
    _rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash);
330
1.00k
    _rowset_meta->set_rowset_type(_context.rowset_type);
331
1.00k
    _rowset_meta->set_rowset_state(_context.rowset_state);
332
1.00k
    _rowset_meta->set_segments_overlap(_context.segments_overlap);
333
1.00k
    if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) {
334
32
        _is_pending = true;
335
32
        _rowset_meta->set_txn_id(_context.txn_id);
336
32
        _rowset_meta->set_load_id(_context.load_id);
337
969
    } else {
338
969
        _rowset_meta->set_version(_context.version);
339
969
        _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp);
340
969
    }
341
1.00k
    _rowset_meta->set_tablet_uid(_context.tablet_uid);
342
1.00k
    _rowset_meta->set_tablet_schema(_context.tablet_schema);
343
1.00k
    if (_context.write_binlog_opt().is_binlog_writer()) {
344
2
        _rowset_meta->mark_row_binlog();
345
2
    }
346
1.00k
    _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
347
1.00k
    _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
348
1.00k
    return Status::OK();
349
1.00k
}
350
351
1.36k
Status BaseBetaRowsetWriter::add_block(const Block* block) {
352
1.36k
    return _segment_creator.add_block(block);
353
1.36k
}
354
355
63
Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
356
63
    SCOPED_RAW_TIMER(&_delete_bitmap_ns);
357
63
    if (!_context.tablet->enable_unique_key_merge_on_write() ||
358
63
        (_context.partial_update_info && _context.partial_update_info->is_partial_update())) {
359
0
        return Status::OK();
360
0
    }
361
63
    std::vector<RowsetSharedPtr> specified_rowsets;
362
63
    {
363
63
        std::shared_lock meta_rlock(_context.tablet->get_header_lock());
364
63
        specified_rowsets =
365
63
                _context.tablet->get_rowset_by_ids(_context.mow_context->rowset_ids.get());
366
63
    }
367
368
    // Submit the entire delete bitmap calculation process to thread pool for async execution
369
    // This avoids blocking memtable flush thread while waiting for file upload to complete
370
    // The process includes: file_writer->close(), _build_tmp, load_segments, and calc_delete_bitmap
371
63
    return _calc_delete_bitmap_token->submit_func(
372
63
            [this, segment_id, specified_rowsets = std::move(specified_rowsets)]() -> Status {
373
63
                Status st = Status::OK();
374
                // Step 1: Close file_writer (must be done before load_segments)
375
63
                auto* file_writer = _seg_files.get(segment_id);
376
63
                if (file_writer && file_writer->state() != io::FileWriter::State::CLOSED) {
377
63
                    MonotonicStopWatch close_timer;
378
63
                    close_timer.start();
379
63
                    st = file_writer->close();
380
63
                    close_timer.stop();
381
382
63
                    auto close_time_ms = close_timer.elapsed_time_milliseconds();
383
63
                    if (close_time_ms > 1000) {
384
0
                        LOG(INFO) << "file_writer->close() took " << close_time_ms
385
0
                                  << "ms for segment_id=" << segment_id
386
0
                                  << ", tablet_id=" << _context.tablet_id
387
0
                                  << ", rowset_id=" << _context.rowset_id;
388
0
                    }
389
63
                    if (!st.ok()) {
390
0
                        return st;
391
0
                    }
392
63
                }
393
394
63
                OlapStopWatch watch;
395
                // Step 2: Build tmp rowset (needs file_writer to be closed)
396
63
                RowsetSharedPtr rowset_ptr;
397
63
                st = _build_tmp(rowset_ptr);
398
63
                if (!st.ok()) {
399
0
                    return st;
400
0
                }
401
402
                // Step 3: Load segments (needs file_writer to be closed and rowset to be built)
403
63
                auto* beta_rowset = reinterpret_cast<BetaRowset*>(rowset_ptr.get());
404
63
                std::vector<segment_v2::SegmentSharedPtr> segments;
405
63
                st = beta_rowset->load_segments(segment_id, segment_id + 1, &segments);
406
63
                if (!st.ok()) {
407
0
                    return st;
408
0
                }
409
410
                // Step 4: Calculate delete bitmap
411
63
                st = BaseTablet::calc_delete_bitmap(
412
63
                        _context.tablet, rowset_ptr, segments, specified_rowsets,
413
63
                        _context.mow_context->delete_bitmap, _context.mow_context->max_version,
414
63
                        nullptr, nullptr, nullptr);
415
63
                if (!st.ok()) {
416
0
                    return st;
417
0
                }
418
419
63
                size_t total_rows =
420
63
                        std::accumulate(segments.begin(), segments.end(), 0,
421
63
                                        [](size_t sum, const segment_v2::SegmentSharedPtr& s) {
422
63
                                            return sum += s->num_rows();
423
63
                                        });
424
63
                LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: "
425
63
                          << _context.tablet->tablet_id()
426
63
                          << ", rowset_ids: " << _context.mow_context->rowset_ids->size()
427
63
                          << ", cur max_version: " << _context.mow_context->max_version
428
63
                          << ", transaction_id: " << _context.mow_context->txn_id
429
63
                          << ", delete_bitmap_count: "
430
63
                          << _context.mow_context->delete_bitmap->get_delete_bitmap_count()
431
63
                          << ", delete_bitmap_cardinality: "
432
63
                          << _context.mow_context->delete_bitmap->cardinality()
433
63
                          << ", cost: " << watch.get_elapse_time_us()
434
63
                          << "(us), total rows: " << total_rows;
435
63
                return Status::OK();
436
63
            });
437
63
}
438
439
1.00k
Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
440
1.00k
    RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context));
441
1.00k
    if (_segcompaction_worker) {
442
1.00k
        _segcompaction_worker->init_mem_tracker(rowset_writer_context);
443
1.00k
    }
444
1.00k
    if (_context.mow_context != nullptr) {
445
8
        _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor_for_load()->create_token();
446
8
    }
447
1.00k
    return Status::OK();
448
1.00k
}
449
450
Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment,
451
77
                                                    int32_t segment_id) {
452
77
    DCHECK(_rowset_meta->is_local());
453
77
    auto fs = _rowset_meta->fs();
454
77
    if (!fs) {
455
0
        return Status::Error<INIT_FAILED>(
456
0
                "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs get failed");
457
0
    }
458
77
    auto path =
459
77
            local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), segment_id);
460
77
    io::FileReaderOptions reader_options {
461
77
            .cache_type =
462
77
                    _context.write_file_cache
463
77
                            ? (config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
464
0
                                                         : io::FileCachePolicy::NO_CACHE)
465
77
                            : io::FileCachePolicy::NO_CACHE,
466
77
            .is_doris_table = true,
467
77
            .cache_base_path {},
468
77
            .tablet_id = _rowset_meta->tablet_id(),
469
77
    };
470
77
    auto s = segment_v2::Segment::open(fs, path, _rowset_meta->tablet_id(), segment_id, rowset_id(),
471
77
                                       _context.tablet_schema, reader_options, &segment);
472
77
    if (!s.ok()) {
473
0
        LOG(WARNING) << "failed to open segment. " << path << ":" << s;
474
0
        return s;
475
0
    }
476
77
    return Status::OK();
477
77
}
478
479
/* policy of segcompaction target selection:
480
 *  1. skip big segments
481
 *  2. if the consecutive smalls end up with a big, compact the smalls, except
482
 *     single small
483
 *  3. if the consecutive smalls end up with small, compact the smalls if the
484
 *     length is beyond (config::segcompaction_batch_size / 2)
485
 */
486
Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
487
15
        SegCompactionCandidatesSharedPtr& segments) {
488
15
    segments = std::make_shared<SegCompactionCandidates>();
489
    // skip last (maybe active) segment
490
15
    int32_t last_segment = _num_segment - 1;
491
15
    size_t task_bytes = 0;
492
15
    uint32_t task_rows = 0;
493
15
    int32_t segid;
494
15
    for (segid = _segcompacted_point;
495
86
         segid < last_segment && segments->size() < config::segcompaction_batch_size; segid++) {
496
77
        segment_v2::SegmentSharedPtr segment;
497
77
        RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid));
498
77
        const auto segment_rows = segment->num_rows();
499
77
        const auto segment_bytes = segment->file_reader()->size();
500
77
        bool is_large_segment = segment_rows > config::segcompaction_candidate_max_rows ||
501
77
                                segment_bytes > config::segcompaction_candidate_max_bytes;
502
77
        if (is_large_segment) {
503
14
            if (segid == _segcompacted_point) {
504
                // skip large segments at the front
505
8
                auto dst_seg_id = _num_segcompacted.load();
506
8
                RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
507
8
                if (_segcompaction_worker->need_convert_delete_bitmap()) {
508
4
                    _segcompaction_worker->convert_segment_delete_bitmap(
509
4
                            _context.mow_context->delete_bitmap, segid, dst_seg_id);
510
4
                }
511
8
                continue;
512
8
            } else {
513
                // stop because we need consecutive segments
514
6
                break;
515
6
            }
516
14
        }
517
63
        bool is_task_full = task_rows + segment_rows > config::segcompaction_task_max_rows ||
518
63
                            task_bytes + segment_bytes > config::segcompaction_task_max_bytes;
519
63
        if (is_task_full) {
520
0
            break;
521
0
        }
522
63
        segments->push_back(segment);
523
63
        task_rows += segment->num_rows();
524
63
        task_bytes += segment->file_reader()->size();
525
63
    }
526
15
    size_t s = segments->size();
527
15
    if (segid == last_segment && s <= (config::segcompaction_batch_size / 2)) {
528
        // we didn't collect enough segments, better to do it in next
529
        // round to compact more at once
530
0
        segments->clear();
531
0
        return Status::OK();
532
0
    }
533
15
    if (s == 1) { // poor bachelor, let it go
534
4
        VLOG_DEBUG << "only one candidate segment";
535
4
        auto src_seg_id = _segcompacted_point.load();
536
4
        auto dst_seg_id = _num_segcompacted.load();
537
4
        RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
538
4
        if (_segcompaction_worker->need_convert_delete_bitmap()) {
539
2
            _segcompaction_worker->convert_segment_delete_bitmap(
540
2
                    _context.mow_context->delete_bitmap, src_seg_id, dst_seg_id);
541
2
        }
542
4
        segments->clear();
543
4
        return Status::OK();
544
4
    }
545
11
    if (VLOG_DEBUG_IS_ON) {
546
0
        vlog_buffer.clear();
547
0
        for (auto& segment : (*segments.get())) {
548
0
            fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), segment->num_rows());
549
0
        }
550
0
        VLOG_DEBUG << "candidate segments num:" << s
551
0
                   << " list of candidates:" << fmt::to_string(vlog_buffer);
552
0
    }
553
11
    return Status::OK();
554
15
}
555
556
11
Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) {
557
11
    int ret;
558
11
    auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
559
11
                                                                    _context.rowset_id, begin, end);
560
11
    auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(),
561
11
                                           _num_segcompacted);
562
11
    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
563
11
    if (ret) {
564
0
        return Status::Error<ROWSET_RENAME_FILE_FAILED>(
565
0
                "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret,
566
0
                errno);
567
0
    }
568
11
    RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path));
569
570
    // rename inverted index files
571
11
    RETURN_IF_ERROR(_rename_compacted_indices(begin, end, 0));
572
573
11
    _num_segcompacted++;
574
11
    return Status::OK();
575
11
}
576
577
void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint32_t begin,
578
43
                                                                      uint32_t end) {
579
43
    VLOG_DEBUG << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end;
580
134
    for (uint32_t i = begin; i <= end; ++i) {
581
91
        _segid_statistics_map.erase(i);
582
91
    }
583
43
}
584
585
42
Status BetaRowsetWriter::_rename_compacted_segment_plain(uint32_t seg_id) {
586
42
    if (seg_id == _num_segcompacted) {
587
10
        ++_num_segcompacted;
588
10
        return Status::OK();
589
10
    }
590
591
32
    auto src_seg_path =
592
32
            local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), seg_id);
593
32
    auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(),
594
32
                                           _num_segcompacted);
595
32
    VLOG_DEBUG << "segcompaction skip this segment. rename " << src_seg_path << " to "
596
0
               << dst_seg_path;
597
32
    {
598
32
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
599
32
        DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false);
600
32
        DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(),
601
32
                  true);
602
32
        auto org = _segid_statistics_map[seg_id];
603
32
        _segid_statistics_map.emplace(_num_segcompacted, org);
604
32
        _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id);
605
32
    }
606
32
    int ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
607
32
    if (ret) {
608
0
        return Status::Error<ROWSET_RENAME_FILE_FAILED>(
609
0
                "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret,
610
0
                errno);
611
0
    }
612
613
32
    RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path));
614
    // rename remaining inverted index files
615
32
    RETURN_IF_ERROR(_rename_compacted_indices(-1, -1, seg_id));
616
617
32
    ++_num_segcompacted;
618
32
    return Status::OK();
619
32
}
620
621
Status BetaRowsetWriter::_remove_segment_footer_cache(const uint32_t seg_id,
622
43
                                                      const std::string& segment_path) {
623
43
    auto* footer_page_cache = ExecEnv::GetInstance()->get_storage_page_cache();
624
43
    if (!footer_page_cache) {
625
0
        return Status::OK();
626
0
    }
627
628
43
    auto fs = _rowset_meta->fs();
629
43
    bool exists = false;
630
43
    RETURN_IF_ERROR(fs->exists(segment_path, &exists));
631
43
    if (exists) {
632
43
        io::FileReaderSPtr file_reader;
633
43
        io::FileReaderOptions reader_options {
634
43
                .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
635
43
                                                        : io::FileCachePolicy::NO_CACHE,
636
43
                .is_doris_table = true,
637
43
                .cache_base_path = "",
638
43
                .file_size = _rowset_meta->segment_file_size(static_cast<int>(seg_id)),
639
43
                .tablet_id = _rowset_meta->tablet_id(),
640
43
        };
641
43
        RETURN_IF_ERROR(fs->open_file(segment_path, &file_reader, &reader_options));
642
43
        DCHECK(file_reader != nullptr);
643
43
        auto cache_key = segment_v2::Segment::get_segment_footer_cache_key(file_reader);
644
43
        footer_page_cache->erase(cache_key, segment_v2::PageTypePB::INDEX_PAGE);
645
43
    }
646
43
    return Status::OK();
647
43
}
648
649
43
Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id) {
650
43
    int ret;
651
652
43
    auto src_seg_path = begin < 0 ? local_segment_path(_context.tablet_path,
653
32
                                                       _context.rowset_id.to_string(), seg_id)
654
43
                                  : BetaRowset::local_segment_path_segcompacted(
655
11
                                            _context.tablet_path, _context.rowset_id, begin, end);
656
43
    auto src_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(src_seg_path);
657
43
    auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(),
658
43
                                           _num_segcompacted);
659
43
    auto dst_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(dst_seg_path);
660
661
43
    if (_context.tablet_schema->get_inverted_index_storage_format() >=
662
43
        InvertedIndexStorageFormatPB::V2) {
663
22
        if (_context.tablet_schema->has_inverted_index() ||
664
22
            _context.tablet_schema->has_ann_index()) {
665
22
            auto src_idx_path =
666
22
                    InvertedIndexDescriptor::get_index_file_path_v2(src_index_path_prefix);
667
22
            auto dst_idx_path =
668
22
                    InvertedIndexDescriptor::get_index_file_path_v2(dst_index_path_prefix);
669
670
22
            ret = rename(src_idx_path.c_str(), dst_idx_path.c_str());
671
22
            if (ret) {
672
0
                return Status::Error<ROWSET_RENAME_FILE_FAILED>(
673
0
                        "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, dst_idx_path,
674
0
                        ret, errno);
675
0
            }
676
22
        }
677
22
    }
678
    // rename remaining inverted index files
679
150
    for (auto column : _context.tablet_schema->columns()) {
680
150
        auto index_infos = _context.tablet_schema->inverted_indexs(*column);
681
150
        for (const auto& index_info : index_infos) {
682
44
            auto index_id = index_info->index_id();
683
44
            if (_context.tablet_schema->get_inverted_index_storage_format() ==
684
44
                InvertedIndexStorageFormatPB::V1) {
685
0
                auto src_idx_path = InvertedIndexDescriptor::get_index_file_path_v1(
686
0
                        src_index_path_prefix, index_id, index_info->get_index_suffix());
687
0
                auto dst_idx_path = InvertedIndexDescriptor::get_index_file_path_v1(
688
0
                        dst_index_path_prefix, index_id, index_info->get_index_suffix());
689
0
                VLOG_DEBUG << "segcompaction skip this index. rename " << src_idx_path << " to "
690
0
                           << dst_idx_path;
691
0
                ret = rename(src_idx_path.c_str(), dst_idx_path.c_str());
692
0
                if (ret) {
693
0
                    return Status::Error<INVERTED_INDEX_RENAME_FILE_FAILED>(
694
0
                            "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path,
695
0
                            dst_idx_path, ret, errno);
696
0
                }
697
0
            }
698
            // Erase the origin index file cache
699
44
            auto src_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key(
700
44
                    src_index_path_prefix, index_id, index_info->get_index_suffix());
701
44
            auto dst_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key(
702
44
                    dst_index_path_prefix, index_id, index_info->get_index_suffix());
703
44
            RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(src_idx_cache_key));
704
44
            RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(dst_idx_cache_key));
705
44
        }
706
150
    }
707
43
    return Status::OK();
708
43
}
709
710
1.20k
Status BetaRowsetWriter::_segcompaction_if_necessary() {
711
1.20k
    Status status = Status::OK();
712
    // if not doing segcompaction, just check segment number
713
1.20k
    if (!config::enable_segcompaction || !_context.enable_segcompaction ||
714
1.20k
        _context.tablet_schema->num_variant_columns() > 0) {
715
1.08k
        return _check_segment_number_limit(_num_segment);
716
1.08k
    }
717
    // leave _is_doing_segcompaction as the last condition
718
    // otherwise _segcompacting_cond will never get notified
719
121
    if (_is_doing_segcompaction.exchange(true)) {
720
0
        return status;
721
0
    }
722
121
    if (_segcompaction_status.load() != OK) {
723
0
        status = Status::Error<SEGCOMPACTION_FAILED>(
724
0
                "BetaRowsetWriter::_segcompaction_if_necessary meet invalid state, error code: {}",
725
0
                _segcompaction_status.load());
726
121
    } else {
727
121
        status = _check_segment_number_limit(_num_segcompacted);
728
121
    }
729
121
    if (status.ok() && (_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) {
730
15
        SegCompactionCandidatesSharedPtr segments;
731
15
        status = _find_longest_consecutive_small_segment(segments);
732
15
        if (LIKELY(status.ok()) && (!segments->empty())) {
733
11
            LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id
734
11
                      << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment
735
11
                      << ", segcompacted_point:" << _segcompacted_point;
736
11
            status = _engine.submit_seg_compaction_task(_segcompaction_worker, segments);
737
11
            if (status.ok()) {
738
11
                return status;
739
11
            }
740
11
        }
741
15
    }
742
110
    {
743
110
        std::lock_guard lk(_is_doing_segcompaction_lock);
744
110
        _is_doing_segcompaction = false;
745
110
        _segcompacting_cond.notify_all();
746
110
    }
747
110
    return status;
748
121
}
749
750
641
Status BetaRowsetWriter::_segcompaction_rename_last_segments() {
751
641
    DCHECK_EQ(_is_doing_segcompaction, false);
752
641
    if (!config::enable_segcompaction) {
753
144
        return Status::OK();
754
144
    }
755
497
    if (_segcompaction_status.load() != OK) {
756
0
        return Status::Error<SEGCOMPACTION_FAILED>(
757
0
                "BetaRowsetWriter::_segcompaction_rename_last_segments meet invalid state, error "
758
0
                "code: {}",
759
0
                _segcompaction_status.load());
760
0
    }
761
497
    if (!is_segcompacted() || _segcompacted_point == _num_segment) {
762
        // no need if never segcompact before or all segcompacted
763
488
        return Status::OK();
764
488
    }
765
    // currently we only rename remaining segments to reduce wait time
766
    // so that transaction can be committed ASAP
767
9
    VLOG_DEBUG << "segcompaction last few segments";
768
39
    for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) {
769
30
        auto dst_segid = _num_segcompacted.load();
770
30
        RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
771
30
        if (_segcompaction_worker->need_convert_delete_bitmap()) {
772
16
            _segcompaction_worker->convert_segment_delete_bitmap(
773
16
                    _context.mow_context->delete_bitmap, segid, dst_segid);
774
16
        }
775
30
    }
776
9
    return Status::OK();
777
9
}
778
779
9
Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
780
9
    assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET);
781
9
    RETURN_IF_ERROR(rowset->link_files_to(_context.tablet_path, _context.rowset_id));
782
9
    _num_rows_written += rowset->num_rows();
783
9
    const auto& rowset_meta = rowset->rowset_meta();
784
9
    auto index_size = rowset_meta->index_disk_size();
785
9
    auto total_size = rowset_meta->total_disk_size();
786
9
    auto data_size = rowset_meta->data_disk_size();
787
    // corrupted index size caused by bug before 2.1.5 or 3.0.0 version
788
    // try to get real index size from disk.
789
9
    if (index_size < 0 || index_size > total_size * 2) {
790
0
        LOG(ERROR) << "invalid index size:" << index_size << " total size:" << total_size
791
0
                   << " data size:" << data_size << " tablet:" << rowset_meta->tablet_id()
792
0
                   << " rowset:" << rowset_meta->rowset_id();
793
0
        index_size = 0;
794
0
        auto st = rowset->get_inverted_index_size(&index_size);
795
0
        if (!st.ok()) {
796
0
            if (!st.is<NOT_FOUND>()) {
797
0
                LOG(ERROR) << "failed to get inverted index size. res=" << st;
798
0
                return st;
799
0
            }
800
0
        }
801
0
    }
802
9
    _total_data_size += data_size;
803
9
    _total_index_size += index_size;
804
9
    _num_segment += cast_set<int32_t>(rowset->num_segments());
805
    // append key_bounds to current rowset
806
9
    RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds));
807
9
    rowset->get_num_segment_rows(&_segment_num_rows);
808
9
    _segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated();
809
810
    // TODO update zonemap
811
9
    if (rowset->rowset_meta()->has_delete_predicate()) {
812
0
        _rowset_meta->set_delete_predicate(rowset->rowset_meta()->delete_predicate());
813
0
    }
814
    // Update the tablet schema in the rowset metadata if the tablet schema contains a variant.
815
    // During the build process, _context.tablet_schema will be used as the rowset schema.
816
    // This situation may arise in the event of a linked schema change. If this schema is not set,
817
    // the subcolumns of the variant will be lost.
818
9
    if (_context.tablet_schema->num_variant_columns() > 0 && rowset->tablet_schema() != nullptr) {
819
0
        _context.tablet_schema = rowset->tablet_schema();
820
0
    }
821
9
    return Status::OK();
822
9
}
823
824
0
Status BaseBetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) {
825
    // TODO use schema_mapping to transfer zonemap
826
0
    return add_rowset(rowset);
827
0
}
828
829
1.14k
Status BaseBetaRowsetWriter::flush() {
830
1.14k
    return _segment_creator.flush();
831
1.14k
}
832
833
12
Status BaseBetaRowsetWriter::flush_memtable(Block* block, int32_t segment_id, int64_t* flush_size) {
834
12
    if (block->rows() == 0) {
835
0
        return Status::OK();
836
0
    }
837
838
12
    {
839
12
        SCOPED_RAW_TIMER(&_segment_writer_ns);
840
12
        RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size));
841
12
    }
842
12
    return Status::OK();
843
12
}
844
845
0
Status BaseBetaRowsetWriter::flush_single_block(const Block* block) {
846
0
    return _segment_creator.flush_single_block(block);
847
0
}
848
849
1.00k
Status BetaRowsetWriter::_wait_flying_segcompaction() {
850
1.00k
    std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock);
851
1.00k
    uint64_t begin_wait = GetCurrentTimeMicros();
852
1.00k
    while (_is_doing_segcompaction) {
853
        // change sync wait to async?
854
0
        _segcompacting_cond.wait(l);
855
0
    }
856
1.00k
    uint64_t elapsed = GetCurrentTimeMicros() - begin_wait;
857
1.00k
    if (elapsed >= MICROS_PER_SEC) {
858
0
        LOG(INFO) << "wait flying segcompaction finish time:" << elapsed << "us";
859
0
    }
860
1.00k
    if (_segcompaction_status.load() != OK) {
861
0
        return Status::Error<SEGCOMPACTION_FAILED>(
862
0
                "BetaRowsetWriter meet invalid state, error code: {}",
863
0
                _segcompaction_status.load());
864
0
    }
865
1.00k
    return Status::OK();
866
1.00k
}
867
868
24
RowsetSharedPtr BaseBetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_rowset_meta) {
869
24
    if (_rowset_meta->newest_write_timestamp() == -1) {
870
0
        _rowset_meta->set_newest_write_timestamp(UnixSeconds());
871
0
    }
872
873
24
    build_rowset_meta_with_spec_field(*_rowset_meta, *spec_rowset_meta);
874
24
    RowsetSharedPtr rowset;
875
24
    auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path,
876
24
                                               _rowset_meta, &rowset);
877
24
    if (!status.ok()) {
878
0
        LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
879
0
        return nullptr;
880
0
    }
881
24
    _already_built = true;
882
24
    return rowset;
883
24
}
884
885
641
Status BaseBetaRowsetWriter::_close_file_writers() {
886
    // Flush and close segment files
887
641
    RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(),
888
641
                                   "failed to close segment creator when build new rowset");
889
641
    return Status::OK();
890
641
}
891
892
641
Status BetaRowsetWriter::_close_file_writers() {
893
641
    RETURN_IF_ERROR(BaseBetaRowsetWriter::_close_file_writers());
894
    // if _segment_start_id is not zero, that means it's a transient rowset writer for
895
    // MoW partial update, don't need to do segment compaction.
896
641
    if (_segment_start_id == 0) {
897
641
        if (_segcompaction_worker->cancel()) {
898
641
            std::lock_guard lk(_is_doing_segcompaction_lock);
899
641
            _is_doing_segcompaction = false;
900
641
            _segcompacting_cond.notify_all();
901
641
        } else {
902
0
            RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(),
903
0
                                           "segcompaction failed when build new rowset");
904
0
        }
905
641
        RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(),
906
641
                                       "rename last segments failed when build new rowset");
907
        // segcompaction worker would do file wrier's close function in compact_segments
908
641
        if (auto& seg_comp_file_writer = _segcompaction_worker->get_file_writer();
909
641
            nullptr != seg_comp_file_writer &&
910
641
            seg_comp_file_writer->state() != io::FileWriter::State::CLOSED) {
911
0
            RETURN_NOT_OK_STATUS_WITH_WARN(seg_comp_file_writer->close(),
912
0
                                           "close segment compaction worker failed");
913
0
        }
914
        // process delete bitmap for mow table
915
641
        if (is_segcompacted() && _segcompaction_worker->need_convert_delete_bitmap()) {
916
4
            auto converted_delete_bitmap = _segcompaction_worker->get_converted_delete_bitmap();
917
            // which means the segment compaction is triggerd
918
4
            if (converted_delete_bitmap != nullptr) {
919
4
                RowsetIdUnorderedSet rowsetids;
920
4
                rowsetids.insert(rowset_id());
921
4
                context().tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(),
922
4
                                                                     rowsetids);
923
4
                context().mow_context->delete_bitmap->remove({rowset_id(), 0, 0},
924
4
                                                             {rowset_id(), UINT32_MAX, INT64_MAX});
925
4
                context().mow_context->delete_bitmap->merge(*converted_delete_bitmap);
926
4
            }
927
4
        }
928
641
    }
929
641
    return Status::OK();
930
641
}
931
932
969
Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
933
969
    if (_calc_delete_bitmap_token != nullptr) {
934
8
        RETURN_IF_ERROR(_calc_delete_bitmap_token->wait());
935
8
    }
936
969
    RETURN_IF_ERROR(_close_file_writers());
937
969
    const auto total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
938
969
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(total_segment_num),
939
969
                                   "too many segments when build new rowset");
940
969
    RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get(), true));
941
969
    if (_is_pending) {
942
27
        _rowset_meta->set_rowset_state(COMMITTED);
943
942
    } else {
944
942
        _rowset_meta->set_rowset_state(VISIBLE);
945
942
    }
946
947
969
    if (_rowset_meta->newest_write_timestamp() == -1) {
948
630
        _rowset_meta->set_newest_write_timestamp(UnixSeconds());
949
630
    }
950
951
969
    _rowset_meta->set_tablet_schema(_context.tablet_schema);
952
953
    // If segment compaction occurs, the idx file info will become inaccurate.
954
969
    if ((_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) &&
955
969
        _num_segcompacted == 0) {
956
106
        if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id);
957
106
            !idx_files_info.has_value()) [[unlikely]] {
958
0
            LOG(ERROR) << "expected inverted index files info, but none presents: "
959
0
                       << idx_files_info.error();
960
106
        } else {
961
106
            _rowset_meta->add_inverted_index_files_info(idx_files_info.value());
962
106
        }
963
106
    }
964
965
969
    RETURN_NOT_OK_STATUS_WITH_WARN(
966
969
            RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta,
967
969
                                         &rowset),
968
969
            "rowset init failed when build new rowset");
969
969
    _already_built = true;
970
969
    return Status::OK();
971
969
}
972
973
0
int64_t BaseBetaRowsetWriter::_num_seg() const {
974
0
    return _num_segment;
975
0
}
976
977
1.03k
int64_t BetaRowsetWriter::_num_seg() const {
978
1.03k
    return is_segcompacted() ? _num_segcompacted : _num_segment;
979
1.03k
}
980
981
1.03k
Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num) {
982
1.03k
    int64_t num_rows_written = 0;
983
1.03k
    int64_t total_data_size = 0;
984
1.03k
    int64_t total_index_size = 0;
985
1.03k
    std::vector<KeyBoundsPB> segments_encoded_key_bounds;
986
1.03k
    std::vector<uint32_t> segment_rows;
987
1.03k
    {
988
1.03k
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
989
1.50k
        for (const auto& itr : _segid_statistics_map) {
990
1.50k
            num_rows_written += itr.second.row_num;
991
1.50k
            total_data_size += itr.second.data_size;
992
1.50k
            total_index_size += itr.second.index_size;
993
1.50k
            segments_encoded_key_bounds.push_back(itr.second.key_bounds);
994
            // segcompaction don't modify _segment_num_rows, so we need to get segment rows from _segid_statistics_map for load
995
1.50k
            segment_rows.push_back(cast_set<uint32_t>(itr.second.row_num));
996
1.50k
        }
997
1.03k
    }
998
1.03k
    if (segment_rows.empty()) {
999
        // vertical compaction and linked schema change will not record segment statistics,
1000
        // it will record segment rows in _segment_num_rows
1001
414
        RETURN_IF_ERROR(get_segment_num_rows(&segment_rows));
1002
414
    }
1003
1004
1.18k
    for (auto& key_bound : _segments_encoded_key_bounds) {
1005
1.18k
        segments_encoded_key_bounds.push_back(key_bound);
1006
1.18k
    }
1007
1.03k
    if (_segments_key_bounds_truncated.has_value()) {
1008
9
        rowset_meta->set_segments_key_bounds_truncated(_segments_key_bounds_truncated.value());
1009
9
    }
1010
1.03k
    rowset_meta->set_num_segment_rows(segment_rows);
1011
    // segment key bounds are empty in old version(before version 1.2.x). So we should not modify
1012
    // the overlap property when key bounds are empty.
1013
    // for mow table with cluster keys, the overlap is used for cluster keys,
1014
    // the key_bounds is primary keys
1015
1.03k
    if (!segments_encoded_key_bounds.empty() &&
1016
1.03k
        !is_segment_overlapping(segments_encoded_key_bounds) &&
1017
1.03k
        _context.tablet_schema->cluster_key_uids().empty()) {
1018
556
        rowset_meta->set_segments_overlap(NONOVERLAPPING);
1019
556
    }
1020
1021
1.03k
    auto segment_num = _num_seg();
1022
1.03k
    if (check_segment_num && config::check_segment_when_build_rowset_meta) {
1023
0
        auto segments_encoded_key_bounds_size = segments_encoded_key_bounds.size();
1024
0
        if (segments_encoded_key_bounds_size != segment_num) {
1025
0
            return Status::InternalError(
1026
0
                    "segments_encoded_key_bounds_size should  equal to _num_seg, "
1027
0
                    "segments_encoded_key_bounds_size "
1028
0
                    "is: {}, _num_seg is: {}",
1029
0
                    segments_encoded_key_bounds_size, segment_num);
1030
0
        }
1031
0
        if (segment_rows.size() != segment_num) {
1032
0
            return Status::InternalError(
1033
0
                    "segment_rows size should equal to _num_seg, segment_rows size is: {}, "
1034
0
                    "_num_seg is {}, tablet={}, rowset={}, txn={}",
1035
0
                    segment_rows.size(), segment_num, _context.tablet_id,
1036
0
                    _context.rowset_id.to_string(), _context.txn_id);
1037
0
        }
1038
0
    }
1039
1040
1.03k
    rowset_meta->set_num_segments(segment_num);
1041
1.03k
    rowset_meta->set_num_rows(num_rows_written + _num_rows_written);
1042
1.03k
    rowset_meta->set_total_disk_size(total_data_size + _total_data_size + total_index_size +
1043
1.03k
                                     _total_index_size);
1044
1.03k
    rowset_meta->set_data_disk_size(total_data_size + _total_data_size);
1045
1.03k
    rowset_meta->set_index_disk_size(total_index_size + _total_index_size);
1046
1.03k
    bool aggregate_key_bounds = config::enable_aggregate_non_mow_key_bounds &&
1047
1.03k
                                !_context.enable_unique_key_merge_on_write;
1048
1.03k
    rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds, aggregate_key_bounds);
1049
    // TODO write zonemap to meta
1050
1.03k
    rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0);
1051
1.03k
    rowset_meta->set_creation_time(time(nullptr));
1052
1.03k
    return Status::OK();
1053
1.03k
}
1054
1055
63
Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr& rowset_ptr) {
1056
63
    Status status;
1057
63
    std::shared_ptr<RowsetMeta> tmp_rs_meta = std::make_shared<RowsetMeta>();
1058
63
    tmp_rs_meta->init(_rowset_meta.get());
1059
1060
63
    status = _build_rowset_meta(tmp_rs_meta.get());
1061
63
    if (!status.ok()) {
1062
0
        LOG(WARNING) << "failed to build rowset meta, res=" << status;
1063
0
        return status;
1064
0
    }
1065
1066
63
    status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, tmp_rs_meta,
1067
63
                                          &rowset_ptr);
1068
63
    DBUG_EXECUTE_IF("BaseBetaRowsetWriter::_build_tmp.create_rowset_failed",
1069
63
                    { status = Status::InternalError("create rowset failed"); });
1070
63
    if (!status.ok()) {
1071
0
        LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
1072
0
        return status;
1073
0
    }
1074
63
    return Status::OK();
1075
63
}
1076
1077
Status BaseBetaRowsetWriter::_create_file_writer(const std::string& path,
1078
                                                 io::FileWriterPtr& file_writer,
1079
2.68k
                                                 bool is_index_file) {
1080
2.68k
    io::FileWriterOptions opts = _context.get_file_writer_options(is_index_file);
1081
2.68k
    Status st = _context.fs()->create_file(path, &file_writer, &opts);
1082
2.68k
    if (!st.ok()) {
1083
0
        LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st;
1084
0
        return st;
1085
0
    }
1086
1087
2.68k
    DCHECK(file_writer != nullptr);
1088
2.68k
    return Status::OK();
1089
2.68k
}
1090
1091
Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer,
1092
2.66k
                                                FileType file_type) {
1093
2.66k
    auto segment_path = _context.segment_path(segment_id);
1094
2.66k
    if (file_type == FileType::INVERTED_INDEX_FILE) {
1095
255
        std::string prefix =
1096
255
                std::string {InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)};
1097
255
        std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix);
1098
255
        return _create_file_writer(index_path, file_writer, true /* is_index_file */);
1099
2.40k
    } else if (file_type == FileType::SEGMENT_FILE) {
1100
2.40k
        return _create_file_writer(segment_path, file_writer, false /* is_index_file */);
1101
2.40k
    }
1102
0
    return Status::Error<ErrorCode::INTERNAL_ERROR>(
1103
0
            fmt::format("failed to create file = {}, file type = {}", segment_path, file_type));
1104
2.66k
}
1105
1106
Status BaseBetaRowsetWriter::create_index_file_writer(uint32_t segment_id,
1107
228
                                                      IndexFileWriterPtr* index_file_writer) {
1108
228
    RETURN_IF_ERROR(RowsetWriter::create_index_file_writer(segment_id, index_file_writer));
1109
    // used for inverted index format v1
1110
228
    (*index_file_writer)
1111
228
            ->set_file_writer_opts(_context.get_file_writer_options(true /* is_index_file */));
1112
228
    return Status::OK();
1113
228
}
1114
1115
Status BetaRowsetWriter::create_segment_writer_for_segcompaction(
1116
12
        std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end) {
1117
12
    DCHECK(begin >= 0 && end >= 0);
1118
12
    std::string path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
1119
12
                                                                   _context.rowset_id, begin, end);
1120
12
    io::FileWriterPtr file_writer;
1121
12
    RETURN_IF_ERROR(_create_file_writer(path, file_writer, false /* is_index_file */));
1122
1123
12
    IndexFileWriterPtr index_file_writer;
1124
12
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
1125
8
        io::FileWriterPtr idx_file_writer;
1126
8
        std::string prefix(InvertedIndexDescriptor::get_index_file_path_prefix(path));
1127
8
        if (_context.tablet_schema->get_inverted_index_storage_format() !=
1128
8
            InvertedIndexStorageFormatPB::V1) {
1129
8
            std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix);
1130
8
            RETURN_IF_ERROR(
1131
8
                    _create_file_writer(index_path, idx_file_writer, true /* is_index_file */));
1132
8
        }
1133
8
        index_file_writer = std::make_unique<IndexFileWriter>(
1134
8
                _context.fs(), prefix, _context.rowset_id.to_string(), _num_segcompacted,
1135
8
                _context.tablet_schema->get_inverted_index_storage_format(),
1136
8
                std::move(idx_file_writer), true /* can_use_ram_dir */, _context.tablet_id);
1137
8
    }
1138
1139
12
    segment_v2::SegmentWriterOptions writer_options;
1140
12
    writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
1141
12
    writer_options.rowset_ctx = &_context;
1142
12
    writer_options.write_type = _context.write_type;
1143
12
    writer_options.write_type = DataWriteType::TYPE_COMPACTION;
1144
12
    writer_options.max_rows_per_segment = _context.max_rows_per_segment;
1145
12
    writer_options.mow_ctx = _context.mow_context;
1146
1147
12
    *writer = std::make_unique<segment_v2::SegmentWriter>(
1148
12
            file_writer.get(), _num_segcompacted, _context.tablet_schema, _context.tablet,
1149
12
            _context.data_dir, writer_options, index_file_writer.get());
1150
12
    if (auto& seg_writer = _segcompaction_worker->get_file_writer();
1151
12
        seg_writer != nullptr && seg_writer->state() != io::FileWriter::State::CLOSED) {
1152
0
        RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close());
1153
0
    }
1154
12
    _segcompaction_worker->get_file_writer().reset(file_writer.release());
1155
12
    if (auto& idx_file_writer = _segcompaction_worker->get_inverted_index_file_writer();
1156
12
        idx_file_writer != nullptr) {
1157
0
        RETURN_IF_ERROR(idx_file_writer->begin_close());
1158
0
        RETURN_IF_ERROR(idx_file_writer->finish_close());
1159
0
    }
1160
12
    _segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release());
1161
12
    return Status::OK();
1162
12
}
1163
1164
0
Status BaseBetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
1165
0
    DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
1166
0
                    { segnum = dp->param("segnum", 1024); });
1167
0
    if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) {
1168
0
        return Status::Error<TOO_MANY_SEGMENTS>(
1169
0
                "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, "
1170
0
                "_num_segment:{}, rowset_num_rows:{}. Please check if the bucket number is too "
1171
0
                "small or if the data is skewed.",
1172
0
                _context.tablet_id, _context.rowset_id.to_string(),
1173
0
                config::max_segment_num_per_rowset, _num_segment, get_rowset_num_rows());
1174
0
    }
1175
0
    return Status::OK();
1176
0
}
1177
1178
2.17k
Status BetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
1179
2.17k
    DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
1180
2.17k
                    { segnum = dp->param("segnum", 1024); });
1181
2.17k
    if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) {
1182
0
        return Status::Error<TOO_MANY_SEGMENTS>(
1183
0
                "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, "
1184
0
                "_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}. Please check if "
1185
0
                "the bucket number is too small or if the data is skewed.",
1186
0
                _context.tablet_id, _context.rowset_id.to_string(),
1187
0
                config::max_segment_num_per_rowset, _num_segment, _segcompacted_point,
1188
0
                _num_segcompacted, get_rowset_num_rows());
1189
0
    }
1190
2.17k
    return Status::OK();
1191
2.17k
}
1192
1193
1.20k
Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
1194
1.20k
    uint32_t segid_offset = segment_id - _segment_start_id;
1195
1.20k
    {
1196
1.20k
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
1197
1.20k
        CHECK_EQ(_segid_statistics_map.find(segment_id) == _segid_statistics_map.end(), true);
1198
1.20k
        _segid_statistics_map.emplace(segment_id, segstat);
1199
1.20k
        if (segment_id >= _segment_num_rows.size()) {
1200
1.20k
            _segment_num_rows.resize(segment_id + 1);
1201
1.20k
        }
1202
1.20k
        _segment_num_rows[segid_offset] = cast_set<uint32_t>(segstat.row_num);
1203
1.20k
    }
1204
1.20k
    VLOG_DEBUG << "_segid_statistics_map add new record. segment_id:" << segment_id
1205
0
               << " row_num:" << segstat.row_num << " data_size:" << segstat.data_size
1206
0
               << " index_size:" << segstat.index_size;
1207
1208
1.20k
    {
1209
1.20k
        std::lock_guard<std::mutex> lock(_segment_set_mutex);
1210
1.20k
        _segment_set.add(segid_offset);
1211
2.40k
        while (_segment_set.contains(_num_segment)) {
1212
1.20k
            _num_segment++;
1213
1.20k
        }
1214
1.20k
    }
1215
1216
1.20k
    if (_context.mow_context != nullptr) {
1217
63
        RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
1218
63
    }
1219
1.20k
    return Status::OK();
1220
1.20k
}
1221
1222
1.20k
Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
1223
1.20k
    RETURN_IF_ERROR(BaseBetaRowsetWriter::add_segment(segment_id, segstat));
1224
1.20k
    return _segcompaction_if_necessary();
1225
1.20k
}
1226
1227
Status BetaRowsetWriter::flush_segment_writer_for_segcompaction(
1228
        std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size,
1229
11
        KeyBoundsPB& key_bounds) {
1230
11
    uint32_t segid = (*writer)->get_segment_id();
1231
11
    uint32_t row_num = (*writer)->row_count();
1232
11
    uint64_t segment_size;
1233
1234
11
    auto s = (*writer)->finalize_footer(&segment_size);
1235
11
    if (!s.ok()) {
1236
0
        return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to finalize segment: {}",
1237
0
                                                      s.to_string());
1238
0
    }
1239
11
    int64_t inverted_index_file_size = 0;
1240
11
    RETURN_IF_ERROR((*writer)->close_inverted_index(&inverted_index_file_size));
1241
1242
11
    SegmentStatistics segstat;
1243
11
    segstat.row_num = row_num;
1244
11
    segstat.data_size = segment_size;
1245
11
    segstat.index_size = inverted_index_file_size;
1246
11
    segstat.key_bounds = key_bounds;
1247
11
    {
1248
11
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
1249
11
        CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true);
1250
11
        _segid_statistics_map.emplace(segid, segstat);
1251
11
    }
1252
11
    VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num
1253
0
               << " data_size:" << PrettyPrinter::print_bytes(segment_size)
1254
0
               << " index_size:" << PrettyPrinter::print_bytes(inverted_index_file_size);
1255
1256
11
    writer->reset();
1257
1258
11
    return Status::OK();
1259
11
}
1260
1261
} // namespace doris