Coverage Report

Created: 2026-03-30 21:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/beta_rowset_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/rowset/beta_rowset_writer.h"
19
20
#include <assert.h>
21
// IWYU pragma: no_include <bthread/errno.h>
22
#include <errno.h> // IWYU pragma: keep
23
#include <fmt/format.h>
24
#include <stdio.h>
25
26
#include <ctime> // time
27
#include <filesystem>
28
#include <memory>
29
#include <mutex>
30
#include <sstream>
31
#include <utility>
32
#include <vector>
33
34
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
35
#include "common/cast_set.h"
36
#include "common/compiler_util.h" // IWYU pragma: keep
37
#include "common/config.h"
38
#include "common/logging.h"
39
#include "common/status.h"
40
#include "core/block/block.h"
41
#include "core/column/column.h"
42
#include "core/data_type/data_type_factory.hpp"
43
#include "io/fs/file_reader.h"
44
#include "io/fs/file_system.h"
45
#include "io/fs/file_writer.h"
46
#include "runtime/thread_context.h"
47
#include "storage/index/inverted/inverted_index_cache.h"
48
#include "storage/index/inverted/inverted_index_desc.h"
49
#include "storage/olap_define.h"
50
#include "storage/rowset/beta_rowset.h"
51
#include "storage/rowset/rowset_factory.h"
52
#include "storage/rowset/rowset_writer.h"
53
#include "storage/rowset/segcompaction.h"
54
#include "storage/schema_change/schema_change.h"
55
#include "storage/segment/segment.h"
56
#include "storage/segment/segment_writer.h"
57
#include "storage/storage_engine.h"
58
#include "storage/tablet/tablet_schema.h"
59
#include "util/debug_points.h"
60
#include "util/pretty_printer.h"
61
#include "util/slice.h"
62
#include "util/stopwatch.hpp"
63
#include "util/time.h"
64
65
namespace doris {
66
#include "common/compile_check_begin.h"
67
using namespace ErrorCode;
68
69
namespace {
70
71
693
bool is_segment_overlapping(const std::vector<KeyBoundsPB>& segments_encoded_key_bounds) {
72
693
    std::string_view last;
73
3.41k
    for (auto&& segment_encode_key : segments_encoded_key_bounds) {
74
3.41k
        auto&& cur_min = segment_encode_key.min_key();
75
3.41k
        auto&& cur_max = segment_encode_key.max_key();
76
3.41k
        if (cur_min <= last) {
77
140
            return true;
78
140
        }
79
3.27k
        last = cur_max;
80
3.27k
    }
81
553
    return false;
82
693
}
83
84
void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta,
85
24
                                       const RowsetMeta& spec_rowset_meta) {
86
24
    rowset_meta.set_num_rows(spec_rowset_meta.num_rows());
87
24
    rowset_meta.set_total_disk_size(spec_rowset_meta.total_disk_size());
88
24
    rowset_meta.set_data_disk_size(spec_rowset_meta.data_disk_size());
89
24
    rowset_meta.set_index_disk_size(spec_rowset_meta.index_disk_size());
90
    // TODO write zonemap to meta
91
24
    rowset_meta.set_empty(spec_rowset_meta.num_rows() == 0);
92
24
    rowset_meta.set_creation_time(time(nullptr));
93
24
    rowset_meta.set_num_segments(spec_rowset_meta.num_segments());
94
24
    rowset_meta.set_segments_overlap(spec_rowset_meta.segments_overlap());
95
24
    rowset_meta.set_rowset_state(spec_rowset_meta.rowset_state());
96
24
    rowset_meta.set_segments_key_bounds_truncated(
97
24
            spec_rowset_meta.is_segments_key_bounds_truncated());
98
24
    std::vector<KeyBoundsPB> segments_key_bounds;
99
24
    spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds);
100
24
    rowset_meta.set_segments_key_bounds(segments_key_bounds);
101
24
    std::vector<uint32_t> num_segment_rows;
102
24
    spec_rowset_meta.get_num_segment_rows(&num_segment_rows);
103
24
    rowset_meta.set_num_segment_rows(num_segment_rows);
104
24
}
105
106
} // namespace
107
108
990
SegmentFileCollection::~SegmentFileCollection() = default;
109
110
5.19k
Status SegmentFileCollection::add(int seg_id, io::FileWriterPtr&& writer) {
111
5.19k
    std::lock_guard lock(_lock);
112
5.19k
    if (_closed) [[unlikely]] {
113
0
        DCHECK(false) << writer->path();
114
0
        return Status::InternalError("add to closed SegmentFileCollection");
115
0
    }
116
117
5.19k
    _file_writers.emplace(seg_id, std::move(writer));
118
5.19k
    return Status::OK();
119
5.19k
}
120
121
63
io::FileWriter* SegmentFileCollection::get(int seg_id) const {
122
63
    std::lock_guard lock(_lock);
123
63
    if (auto it = _file_writers.find(seg_id); it != _file_writers.end()) {
124
63
        return it->second.get();
125
63
    } else {
126
0
        return nullptr;
127
0
    }
128
63
}
129
130
957
Status SegmentFileCollection::close() {
131
957
    {
132
957
        std::lock_guard lock(_lock);
133
957
        if (_closed) [[unlikely]] {
134
0
            DCHECK(false);
135
0
            return Status::InternalError("double close SegmentFileCollection");
136
0
        }
137
957
        _closed = true;
138
957
    }
139
140
5.19k
    for (auto&& [_, writer] : _file_writers) {
141
5.19k
        if (writer->state() != io::FileWriter::State::CLOSED) {
142
5.13k
            RETURN_IF_ERROR(writer->close());
143
5.13k
        }
144
5.19k
    }
145
146
957
    return Status::OK();
147
957
}
148
149
0
Result<std::vector<size_t>> SegmentFileCollection::segments_file_size(int seg_id_offset) {
150
0
    std::lock_guard lock(_lock);
151
0
    if (!_closed) [[unlikely]] {
152
0
        DCHECK(false);
153
0
        return ResultError(Status::InternalError("get segments file size without closed"));
154
0
    }
155
156
0
    Status st;
157
0
    std::vector<size_t> seg_file_size(_file_writers.size(), 0);
158
0
    bool succ = std::all_of(_file_writers.begin(), _file_writers.end(), [&](auto&& it) {
159
0
        auto&& [seg_id, writer] = it;
160
161
0
        int idx = seg_id - seg_id_offset;
162
0
        if (idx >= seg_file_size.size()) [[unlikely]] {
163
0
            auto err_msg = fmt::format(
164
0
                    "invalid seg_id={} num_file_writers={} seg_id_offset={} path={}", seg_id,
165
0
                    seg_file_size.size(), seg_id_offset, writer->path().native());
166
0
            DCHECK(false) << err_msg;
167
0
            st = Status::InternalError(err_msg);
168
0
            return false;
169
0
        }
170
171
0
        auto& fsize = seg_file_size[idx];
172
0
        if (fsize != 0) {
173
            // File size should not been set
174
0
            auto err_msg =
175
0
                    fmt::format("duplicate seg_id={} path={}", seg_id, writer->path().native());
176
0
            DCHECK(false) << err_msg;
177
0
            st = Status::InternalError(err_msg);
178
0
            return false;
179
0
        }
180
181
0
        fsize = writer->bytes_appended();
182
0
        if (fsize <= 0) {
183
0
            auto err_msg =
184
0
                    fmt::format("invalid segment fsize={} path={}", fsize, writer->path().native());
185
0
            DCHECK(false) << err_msg;
186
0
            st = Status::InternalError(err_msg);
187
0
            return false;
188
0
        }
189
190
0
        return true;
191
0
    });
192
193
0
    if (succ) {
194
0
        return seg_file_size;
195
0
    }
196
197
0
    return ResultError(st);
198
0
}
199
200
990
InvertedIndexFileCollection::~InvertedIndexFileCollection() = default;
201
202
265
Status InvertedIndexFileCollection::add(int seg_id, IndexFileWriterPtr&& index_writer) {
203
265
    std::lock_guard lock(_lock);
204
265
    if (_inverted_index_file_writers.find(seg_id) != _inverted_index_file_writers.end())
205
0
            [[unlikely]] {
206
0
        DCHECK(false);
207
0
        return Status::InternalError("The seg_id already exists, seg_id is {}", seg_id);
208
0
    }
209
265
    _inverted_index_file_writers.emplace(seg_id, std::move(index_writer));
210
265
    return Status::OK();
211
265
}
212
213
322
Status InvertedIndexFileCollection::begin_close() {
214
322
    std::lock_guard lock(_lock);
215
322
    for (auto&& [id, writer] : _inverted_index_file_writers) {
216
36
        RETURN_IF_ERROR(writer->begin_close());
217
36
        _total_size += writer->get_index_file_total_size();
218
36
    }
219
220
322
    return Status::OK();
221
322
}
222
223
957
Status InvertedIndexFileCollection::finish_close() {
224
957
    std::lock_guard lock(_lock);
225
957
    for (auto&& [id, writer] : _inverted_index_file_writers) {
226
264
        RETURN_IF_ERROR(writer->finish_close());
227
264
    }
228
957
    return Status::OK();
229
957
}
230
231
Result<std::vector<const InvertedIndexFileInfo*>>
232
106
InvertedIndexFileCollection::inverted_index_file_info(int seg_id_offset) {
233
106
    std::lock_guard lock(_lock);
234
235
106
    Status st;
236
106
    std::vector<const InvertedIndexFileInfo*> idx_file_info(_inverted_index_file_writers.size());
237
106
    bool succ = std::all_of(
238
106
            _inverted_index_file_writers.begin(), _inverted_index_file_writers.end(),
239
215
            [&](auto&& it) {
240
215
                auto&& [seg_id, writer] = it;
241
242
215
                int idx = seg_id - seg_id_offset;
243
215
                if (idx >= idx_file_info.size()) [[unlikely]] {
244
0
                    auto err_msg =
245
0
                            fmt::format("invalid seg_id={} num_file_writers={} seg_id_offset={}",
246
0
                                        seg_id, idx_file_info.size(), seg_id_offset);
247
0
                    DCHECK(false) << err_msg;
248
0
                    st = Status::InternalError(err_msg);
249
0
                    return false;
250
0
                }
251
215
                idx_file_info[idx] = _inverted_index_file_writers[seg_id]->get_index_file_info();
252
215
                return true;
253
215
            });
254
255
106
    if (succ) {
256
106
        return idx_file_info;
257
106
    }
258
259
0
    return ResultError(st);
260
106
}
261
262
BaseBetaRowsetWriter::BaseBetaRowsetWriter()
263
990
        : _num_segment(0),
264
990
          _segment_start_id(0),
265
990
          _num_rows_written(0),
266
990
          _total_data_size(0),
267
990
          _total_index_size(0),
268
990
          _segment_creator(_context, _seg_files, _idx_files) {}
269
270
BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine)
271
989
        : _engine(engine), _segcompaction_worker(std::make_shared<SegcompactionWorker>(this)) {}
272
273
990
BaseBetaRowsetWriter::~BaseBetaRowsetWriter() {
274
990
    if (!_already_built && _rowset_meta->is_local()) {
275
        // abnormal exit, remove all files generated
276
8
        auto& fs = io::global_local_filesystem();
277
8
        for (int i = _segment_start_id; i < _segment_creator.next_segment_id(); ++i) {
278
0
            std::string seg_path =
279
0
                    local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), i);
280
            // Even if an error is encountered, these files that have not been cleaned up
281
            // will be cleaned up by the GC background. So here we only print the error
282
            // message when we encounter an error.
283
0
            WARN_IF_ERROR(fs->delete_file(seg_path),
284
0
                          fmt::format("Failed to delete file={}", seg_path));
285
0
        }
286
8
    }
287
990
    if (_calc_delete_bitmap_token) {
288
8
        _calc_delete_bitmap_token->cancel();
289
8
    }
290
990
}
291
292
989
BetaRowsetWriter::~BetaRowsetWriter() {
293
    /* Note that segcompaction is async and in parallel with load job. So we should handle carefully
294
     * when the job is cancelled. Although it is meaningless to continue segcompaction when the job
295
     * is cancelled, the objects involved in the job should be preserved during segcompaction to
296
     * avoid crashs for memory issues. */
297
989
    WARN_IF_ERROR(_wait_flying_segcompaction(), "segment compaction failed");
298
989
}
299
300
989
Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
301
989
    _context = rowset_writer_context;
302
989
    _rowset_meta.reset(new RowsetMeta);
303
989
    if (_context.storage_resource) {
304
0
        _rowset_meta->set_remote_storage_resource(*_context.storage_resource);
305
0
    }
306
989
    _rowset_meta->set_rowset_id(_context.rowset_id);
307
989
    _rowset_meta->set_partition_id(_context.partition_id);
308
989
    _rowset_meta->set_tablet_id(_context.tablet_id);
309
989
    _rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash);
310
989
    _rowset_meta->set_rowset_type(_context.rowset_type);
311
989
    _rowset_meta->set_rowset_state(_context.rowset_state);
312
989
    _rowset_meta->set_segments_overlap(_context.segments_overlap);
313
989
    if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) {
314
30
        _is_pending = true;
315
30
        _rowset_meta->set_txn_id(_context.txn_id);
316
30
        _rowset_meta->set_load_id(_context.load_id);
317
959
    } else {
318
959
        _rowset_meta->set_version(_context.version);
319
959
        _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp);
320
959
    }
321
989
    _rowset_meta->set_tablet_uid(_context.tablet_uid);
322
989
    _rowset_meta->set_tablet_schema(_context.tablet_schema);
323
989
    _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
324
989
    _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
325
989
    return Status::OK();
326
989
}
327
328
1.36k
Status BaseBetaRowsetWriter::add_block(const Block* block) {
329
1.36k
    return _segment_creator.add_block(block);
330
1.36k
}
331
332
63
Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
333
63
    SCOPED_RAW_TIMER(&_delete_bitmap_ns);
334
63
    if (!_context.tablet->enable_unique_key_merge_on_write() ||
335
63
        (_context.partial_update_info && _context.partial_update_info->is_partial_update())) {
336
0
        return Status::OK();
337
0
    }
338
63
    std::vector<RowsetSharedPtr> specified_rowsets;
339
63
    {
340
63
        std::shared_lock meta_rlock(_context.tablet->get_header_lock());
341
63
        specified_rowsets =
342
63
                _context.tablet->get_rowset_by_ids(_context.mow_context->rowset_ids.get());
343
63
    }
344
345
    // Submit the entire delete bitmap calculation process to thread pool for async execution
346
    // This avoids blocking memtable flush thread while waiting for file upload to complete
347
    // The process includes: file_writer->close(), _build_tmp, load_segments, and calc_delete_bitmap
348
63
    return _calc_delete_bitmap_token->submit_func(
349
63
            [this, segment_id, specified_rowsets = std::move(specified_rowsets)]() -> Status {
350
63
                Status st = Status::OK();
351
                // Step 1: Close file_writer (must be done before load_segments)
352
63
                auto* file_writer = _seg_files.get(segment_id);
353
63
                if (file_writer && file_writer->state() != io::FileWriter::State::CLOSED) {
354
63
                    MonotonicStopWatch close_timer;
355
63
                    close_timer.start();
356
63
                    st = file_writer->close();
357
63
                    close_timer.stop();
358
359
63
                    auto close_time_ms = close_timer.elapsed_time_milliseconds();
360
63
                    if (close_time_ms > 1000) {
361
0
                        LOG(INFO) << "file_writer->close() took " << close_time_ms
362
0
                                  << "ms for segment_id=" << segment_id
363
0
                                  << ", tablet_id=" << _context.tablet_id
364
0
                                  << ", rowset_id=" << _context.rowset_id;
365
0
                    }
366
63
                    if (!st.ok()) {
367
0
                        return st;
368
0
                    }
369
63
                }
370
371
63
                OlapStopWatch watch;
372
                // Step 2: Build tmp rowset (needs file_writer to be closed)
373
63
                RowsetSharedPtr rowset_ptr;
374
63
                st = _build_tmp(rowset_ptr);
375
63
                if (!st.ok()) {
376
0
                    return st;
377
0
                }
378
379
                // Step 3: Load segments (needs file_writer to be closed and rowset to be built)
380
63
                auto* beta_rowset = reinterpret_cast<BetaRowset*>(rowset_ptr.get());
381
63
                std::vector<segment_v2::SegmentSharedPtr> segments;
382
63
                st = beta_rowset->load_segments(segment_id, segment_id + 1, &segments);
383
63
                if (!st.ok()) {
384
0
                    return st;
385
0
                }
386
387
                // Step 4: Calculate delete bitmap
388
63
                st = BaseTablet::calc_delete_bitmap(
389
63
                        _context.tablet, rowset_ptr, segments, specified_rowsets,
390
63
                        _context.mow_context->delete_bitmap, _context.mow_context->max_version,
391
63
                        nullptr, nullptr, nullptr);
392
63
                if (!st.ok()) {
393
0
                    return st;
394
0
                }
395
396
63
                size_t total_rows =
397
63
                        std::accumulate(segments.begin(), segments.end(), 0,
398
63
                                        [](size_t sum, const segment_v2::SegmentSharedPtr& s) {
399
63
                                            return sum += s->num_rows();
400
63
                                        });
401
63
                LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: "
402
63
                          << _context.tablet->tablet_id()
403
63
                          << ", rowset_ids: " << _context.mow_context->rowset_ids->size()
404
63
                          << ", cur max_version: " << _context.mow_context->max_version
405
63
                          << ", transaction_id: " << _context.mow_context->txn_id
406
63
                          << ", delete_bitmap_count: "
407
63
                          << _context.mow_context->delete_bitmap->get_delete_bitmap_count()
408
63
                          << ", delete_bitmap_cardinality: "
409
63
                          << _context.mow_context->delete_bitmap->cardinality()
410
63
                          << ", cost: " << watch.get_elapse_time_us()
411
63
                          << "(us), total rows: " << total_rows;
412
63
                return Status::OK();
413
63
            });
414
63
}
415
416
989
Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
417
989
    RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context));
418
989
    if (_segcompaction_worker) {
419
989
        _segcompaction_worker->init_mem_tracker(rowset_writer_context);
420
989
    }
421
989
    if (_context.mow_context != nullptr) {
422
8
        _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor_for_load()->create_token();
423
8
    }
424
989
    return Status::OK();
425
989
}
426
427
Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment,
428
77
                                                    int32_t segment_id) {
429
77
    DCHECK(_rowset_meta->is_local());
430
77
    auto fs = _rowset_meta->fs();
431
77
    if (!fs) {
432
0
        return Status::Error<INIT_FAILED>(
433
0
                "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs get failed");
434
0
    }
435
77
    auto path =
436
77
            local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), segment_id);
437
77
    io::FileReaderOptions reader_options {
438
77
            .cache_type =
439
77
                    _context.write_file_cache
440
77
                            ? (config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
441
0
                                                         : io::FileCachePolicy::NO_CACHE)
442
77
                            : io::FileCachePolicy::NO_CACHE,
443
77
            .is_doris_table = true,
444
77
            .cache_base_path {},
445
77
            .tablet_id = _rowset_meta->tablet_id(),
446
77
    };
447
77
    auto s = segment_v2::Segment::open(fs, path, _rowset_meta->tablet_id(), segment_id, rowset_id(),
448
77
                                       _context.tablet_schema, reader_options, &segment);
449
77
    if (!s.ok()) {
450
0
        LOG(WARNING) << "failed to open segment. " << path << ":" << s;
451
0
        return s;
452
0
    }
453
77
    return Status::OK();
454
77
}
455
456
/* policy of segcompaction target selection:
457
 *  1. skip big segments
458
 *  2. if the consecutive smalls end up with a big, compact the smalls, except
459
 *     single small
460
 *  3. if the consecutive smalls end up with small, compact the smalls if the
461
 *     length is beyond (config::segcompaction_batch_size / 2)
462
 */
463
Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
464
15
        SegCompactionCandidatesSharedPtr& segments) {
465
15
    segments = std::make_shared<SegCompactionCandidates>();
466
    // skip last (maybe active) segment
467
15
    int32_t last_segment = _num_segment - 1;
468
15
    size_t task_bytes = 0;
469
15
    uint32_t task_rows = 0;
470
15
    int32_t segid;
471
15
    for (segid = _segcompacted_point;
472
86
         segid < last_segment && segments->size() < config::segcompaction_batch_size; segid++) {
473
77
        segment_v2::SegmentSharedPtr segment;
474
77
        RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid));
475
77
        const auto segment_rows = segment->num_rows();
476
77
        const auto segment_bytes = segment->file_reader()->size();
477
77
        bool is_large_segment = segment_rows > config::segcompaction_candidate_max_rows ||
478
77
                                segment_bytes > config::segcompaction_candidate_max_bytes;
479
77
        if (is_large_segment) {
480
14
            if (segid == _segcompacted_point) {
481
                // skip large segments at the front
482
8
                auto dst_seg_id = _num_segcompacted.load();
483
8
                RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
484
8
                if (_segcompaction_worker->need_convert_delete_bitmap()) {
485
4
                    _segcompaction_worker->convert_segment_delete_bitmap(
486
4
                            _context.mow_context->delete_bitmap, segid, dst_seg_id);
487
4
                }
488
8
                continue;
489
8
            } else {
490
                // stop because we need consecutive segments
491
6
                break;
492
6
            }
493
14
        }
494
63
        bool is_task_full = task_rows + segment_rows > config::segcompaction_task_max_rows ||
495
63
                            task_bytes + segment_bytes > config::segcompaction_task_max_bytes;
496
63
        if (is_task_full) {
497
0
            break;
498
0
        }
499
63
        segments->push_back(segment);
500
63
        task_rows += segment->num_rows();
501
63
        task_bytes += segment->file_reader()->size();
502
63
    }
503
15
    size_t s = segments->size();
504
15
    if (segid == last_segment && s <= (config::segcompaction_batch_size / 2)) {
505
        // we didn't collect enough segments, better to do it in next
506
        // round to compact more at once
507
0
        segments->clear();
508
0
        return Status::OK();
509
0
    }
510
15
    if (s == 1) { // poor bachelor, let it go
511
4
        VLOG_DEBUG << "only one candidate segment";
512
4
        auto src_seg_id = _segcompacted_point.load();
513
4
        auto dst_seg_id = _num_segcompacted.load();
514
4
        RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
515
4
        if (_segcompaction_worker->need_convert_delete_bitmap()) {
516
2
            _segcompaction_worker->convert_segment_delete_bitmap(
517
2
                    _context.mow_context->delete_bitmap, src_seg_id, dst_seg_id);
518
2
        }
519
4
        segments->clear();
520
4
        return Status::OK();
521
4
    }
522
11
    if (VLOG_DEBUG_IS_ON) {
523
0
        vlog_buffer.clear();
524
0
        for (auto& segment : (*segments.get())) {
525
0
            fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), segment->num_rows());
526
0
        }
527
0
        VLOG_DEBUG << "candidate segments num:" << s
528
0
                   << " list of candidates:" << fmt::to_string(vlog_buffer);
529
0
    }
530
11
    return Status::OK();
531
15
}
532
533
11
Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) {
534
11
    int ret;
535
11
    auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
536
11
                                                                    _context.rowset_id, begin, end);
537
11
    auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(),
538
11
                                           _num_segcompacted);
539
11
    ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
540
11
    if (ret) {
541
0
        return Status::Error<ROWSET_RENAME_FILE_FAILED>(
542
0
                "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret,
543
0
                errno);
544
0
    }
545
11
    RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path));
546
547
    // rename inverted index files
548
11
    RETURN_IF_ERROR(_rename_compacted_indices(begin, end, 0));
549
550
11
    _num_segcompacted++;
551
11
    return Status::OK();
552
11
}
553
554
void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint32_t begin,
555
43
                                                                      uint32_t end) {
556
43
    VLOG_DEBUG << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end;
557
134
    for (uint32_t i = begin; i <= end; ++i) {
558
91
        _segid_statistics_map.erase(i);
559
91
    }
560
43
}
561
562
42
Status BetaRowsetWriter::_rename_compacted_segment_plain(uint32_t seg_id) {
563
42
    if (seg_id == _num_segcompacted) {
564
10
        ++_num_segcompacted;
565
10
        return Status::OK();
566
10
    }
567
568
32
    auto src_seg_path =
569
32
            local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), seg_id);
570
32
    auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(),
571
32
                                           _num_segcompacted);
572
32
    VLOG_DEBUG << "segcompaction skip this segment. rename " << src_seg_path << " to "
573
0
               << dst_seg_path;
574
32
    {
575
32
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
576
32
        DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false);
577
32
        DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(),
578
32
                  true);
579
32
        auto org = _segid_statistics_map[seg_id];
580
32
        _segid_statistics_map.emplace(_num_segcompacted, org);
581
32
        _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id);
582
32
    }
583
32
    int ret = rename(src_seg_path.c_str(), dst_seg_path.c_str());
584
32
    if (ret) {
585
0
        return Status::Error<ROWSET_RENAME_FILE_FAILED>(
586
0
                "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret,
587
0
                errno);
588
0
    }
589
590
32
    RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path));
591
    // rename remaining inverted index files
592
32
    RETURN_IF_ERROR(_rename_compacted_indices(-1, -1, seg_id));
593
594
32
    ++_num_segcompacted;
595
32
    return Status::OK();
596
32
}
597
598
Status BetaRowsetWriter::_remove_segment_footer_cache(const uint32_t seg_id,
599
43
                                                      const std::string& segment_path) {
600
43
    auto* footer_page_cache = ExecEnv::GetInstance()->get_storage_page_cache();
601
43
    if (!footer_page_cache) {
602
0
        return Status::OK();
603
0
    }
604
605
43
    auto fs = _rowset_meta->fs();
606
43
    bool exists = false;
607
43
    RETURN_IF_ERROR(fs->exists(segment_path, &exists));
608
43
    if (exists) {
609
43
        io::FileReaderSPtr file_reader;
610
43
        io::FileReaderOptions reader_options {
611
43
                .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
612
43
                                                        : io::FileCachePolicy::NO_CACHE,
613
43
                .is_doris_table = true,
614
43
                .cache_base_path = "",
615
43
                .file_size = _rowset_meta->segment_file_size(static_cast<int>(seg_id)),
616
43
                .tablet_id = _rowset_meta->tablet_id(),
617
43
        };
618
43
        RETURN_IF_ERROR(fs->open_file(segment_path, &file_reader, &reader_options));
619
43
        DCHECK(file_reader != nullptr);
620
43
        auto cache_key = segment_v2::Segment::get_segment_footer_cache_key(file_reader);
621
43
        footer_page_cache->erase(cache_key, segment_v2::PageTypePB::INDEX_PAGE);
622
43
    }
623
43
    return Status::OK();
624
43
}
625
626
43
Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id) {
627
43
    int ret;
628
629
43
    auto src_seg_path = begin < 0 ? local_segment_path(_context.tablet_path,
630
32
                                                       _context.rowset_id.to_string(), seg_id)
631
43
                                  : BetaRowset::local_segment_path_segcompacted(
632
11
                                            _context.tablet_path, _context.rowset_id, begin, end);
633
43
    auto src_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(src_seg_path);
634
43
    auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(),
635
43
                                           _num_segcompacted);
636
43
    auto dst_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(dst_seg_path);
637
638
43
    if (_context.tablet_schema->get_inverted_index_storage_format() >=
639
43
        InvertedIndexStorageFormatPB::V2) {
640
22
        if (_context.tablet_schema->has_inverted_index() ||
641
22
            _context.tablet_schema->has_ann_index()) {
642
22
            auto src_idx_path =
643
22
                    InvertedIndexDescriptor::get_index_file_path_v2(src_index_path_prefix);
644
22
            auto dst_idx_path =
645
22
                    InvertedIndexDescriptor::get_index_file_path_v2(dst_index_path_prefix);
646
647
22
            ret = rename(src_idx_path.c_str(), dst_idx_path.c_str());
648
22
            if (ret) {
649
0
                return Status::Error<ROWSET_RENAME_FILE_FAILED>(
650
0
                        "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, dst_idx_path,
651
0
                        ret, errno);
652
0
            }
653
22
        }
654
22
    }
655
    // rename remaining inverted index files
656
150
    for (auto column : _context.tablet_schema->columns()) {
657
150
        auto index_infos = _context.tablet_schema->inverted_indexs(*column);
658
150
        for (const auto& index_info : index_infos) {
659
44
            auto index_id = index_info->index_id();
660
44
            if (_context.tablet_schema->get_inverted_index_storage_format() ==
661
44
                InvertedIndexStorageFormatPB::V1) {
662
0
                auto src_idx_path = InvertedIndexDescriptor::get_index_file_path_v1(
663
0
                        src_index_path_prefix, index_id, index_info->get_index_suffix());
664
0
                auto dst_idx_path = InvertedIndexDescriptor::get_index_file_path_v1(
665
0
                        dst_index_path_prefix, index_id, index_info->get_index_suffix());
666
0
                VLOG_DEBUG << "segcompaction skip this index. rename " << src_idx_path << " to "
667
0
                           << dst_idx_path;
668
0
                ret = rename(src_idx_path.c_str(), dst_idx_path.c_str());
669
0
                if (ret) {
670
0
                    return Status::Error<INVERTED_INDEX_RENAME_FILE_FAILED>(
671
0
                            "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path,
672
0
                            dst_idx_path, ret, errno);
673
0
                }
674
0
            }
675
            // Erase the origin index file cache
676
44
            auto src_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key(
677
44
                    src_index_path_prefix, index_id, index_info->get_index_suffix());
678
44
            auto dst_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key(
679
44
                    dst_index_path_prefix, index_id, index_info->get_index_suffix());
680
44
            RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(src_idx_cache_key));
681
44
            RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(dst_idx_cache_key));
682
44
        }
683
150
    }
684
43
    return Status::OK();
685
43
}
686
687
1.19k
Status BetaRowsetWriter::_segcompaction_if_necessary() {
688
1.19k
    Status status = Status::OK();
689
    // if not doing segcompaction, just check segment number
690
1.19k
    if (!config::enable_segcompaction || !_context.enable_segcompaction ||
691
1.19k
        _context.tablet_schema->num_variant_columns() > 0) {
692
1.07k
        return _check_segment_number_limit(_num_segment);
693
1.07k
    }
694
    // leave _is_doing_segcompaction as the last condition
695
    // otherwise _segcompacting_cond will never get notified
696
121
    if (_is_doing_segcompaction.exchange(true)) {
697
0
        return status;
698
0
    }
699
121
    if (_segcompaction_status.load() != OK) {
700
0
        status = Status::Error<SEGCOMPACTION_FAILED>(
701
0
                "BetaRowsetWriter::_segcompaction_if_necessary meet invalid state, error code: {}",
702
0
                _segcompaction_status.load());
703
121
    } else {
704
121
        status = _check_segment_number_limit(_num_segcompacted);
705
121
    }
706
121
    if (status.ok() && (_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) {
707
15
        SegCompactionCandidatesSharedPtr segments;
708
15
        status = _find_longest_consecutive_small_segment(segments);
709
15
        if (LIKELY(status.ok()) && (!segments->empty())) {
710
11
            LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id
711
11
                      << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment
712
11
                      << ", segcompacted_point:" << _segcompacted_point;
713
11
            status = _engine.submit_seg_compaction_task(_segcompaction_worker, segments);
714
11
            if (status.ok()) {
715
11
                return status;
716
11
            }
717
11
        }
718
15
    }
719
110
    {
720
110
        std::lock_guard lk(_is_doing_segcompaction_lock);
721
110
        _is_doing_segcompaction = false;
722
110
        _segcompacting_cond.notify_all();
723
110
    }
724
110
    return status;
725
121
}
726
727
635
Status BetaRowsetWriter::_segcompaction_rename_last_segments() {
728
635
    DCHECK_EQ(_is_doing_segcompaction, false);
729
635
    if (!config::enable_segcompaction) {
730
144
        return Status::OK();
731
144
    }
732
491
    if (_segcompaction_status.load() != OK) {
733
0
        return Status::Error<SEGCOMPACTION_FAILED>(
734
0
                "BetaRowsetWriter::_segcompaction_rename_last_segments meet invalid state, error "
735
0
                "code: {}",
736
0
                _segcompaction_status.load());
737
0
    }
738
491
    if (!is_segcompacted() || _segcompacted_point == _num_segment) {
739
        // no need if never segcompact before or all segcompacted
740
482
        return Status::OK();
741
482
    }
742
    // currently we only rename remaining segments to reduce wait time
743
    // so that transaction can be committed ASAP
744
9
    VLOG_DEBUG << "segcompaction last few segments";
745
39
    for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) {
746
30
        auto dst_segid = _num_segcompacted.load();
747
30
        RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
748
30
        if (_segcompaction_worker->need_convert_delete_bitmap()) {
749
16
            _segcompaction_worker->convert_segment_delete_bitmap(
750
16
                    _context.mow_context->delete_bitmap, segid, dst_segid);
751
16
        }
752
30
    }
753
9
    return Status::OK();
754
9
}
755
756
9
Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
757
9
    assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET);
758
9
    RETURN_IF_ERROR(rowset->link_files_to(_context.tablet_path, _context.rowset_id));
759
9
    _num_rows_written += rowset->num_rows();
760
9
    const auto& rowset_meta = rowset->rowset_meta();
761
9
    auto index_size = rowset_meta->index_disk_size();
762
9
    auto total_size = rowset_meta->total_disk_size();
763
9
    auto data_size = rowset_meta->data_disk_size();
764
    // corrupted index size caused by bug before 2.1.5 or 3.0.0 version
765
    // try to get real index size from disk.
766
9
    if (index_size < 0 || index_size > total_size * 2) {
767
0
        LOG(ERROR) << "invalid index size:" << index_size << " total size:" << total_size
768
0
                   << " data size:" << data_size << " tablet:" << rowset_meta->tablet_id()
769
0
                   << " rowset:" << rowset_meta->rowset_id();
770
0
        index_size = 0;
771
0
        auto st = rowset->get_inverted_index_size(&index_size);
772
0
        if (!st.ok()) {
773
0
            if (!st.is<NOT_FOUND>()) {
774
0
                LOG(ERROR) << "failed to get inverted index size. res=" << st;
775
0
                return st;
776
0
            }
777
0
        }
778
0
    }
779
9
    _total_data_size += data_size;
780
9
    _total_index_size += index_size;
781
9
    _num_segment += cast_set<int32_t>(rowset->num_segments());
782
    // append key_bounds to current rowset
783
9
    RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds));
784
9
    rowset->get_num_segment_rows(&_segment_num_rows);
785
9
    _segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated();
786
787
    // TODO update zonemap
788
9
    if (rowset->rowset_meta()->has_delete_predicate()) {
789
0
        _rowset_meta->set_delete_predicate(rowset->rowset_meta()->delete_predicate());
790
0
    }
791
    // Update the tablet schema in the rowset metadata if the tablet schema contains a variant.
792
    // During the build process, _context.tablet_schema will be used as the rowset schema.
793
    // This situation may arise in the event of a linked schema change. If this schema is not set,
794
    // the subcolumns of the variant will be lost.
795
9
    if (_context.tablet_schema->num_variant_columns() > 0 && rowset->tablet_schema() != nullptr) {
796
0
        _context.tablet_schema = rowset->tablet_schema();
797
0
    }
798
9
    return Status::OK();
799
9
}
800
801
0
Status BaseBetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) {
802
    // TODO use schema_mapping to transfer zonemap
803
0
    return add_rowset(rowset);
804
0
}
805
806
1.13k
Status BaseBetaRowsetWriter::flush() {
807
1.13k
    return _segment_creator.flush();
808
1.13k
}
809
810
12
Status BaseBetaRowsetWriter::flush_memtable(Block* block, int32_t segment_id, int64_t* flush_size) {
811
12
    if (block->rows() == 0) {
812
0
        return Status::OK();
813
0
    }
814
815
12
    {
816
12
        SCOPED_RAW_TIMER(&_segment_writer_ns);
817
12
        RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size));
818
12
    }
819
12
    return Status::OK();
820
12
}
821
822
0
Status BaseBetaRowsetWriter::flush_single_block(const Block* block) {
823
0
    return _segment_creator.flush_single_block(block);
824
0
}
825
826
989
Status BetaRowsetWriter::_wait_flying_segcompaction() {
827
989
    std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock);
828
989
    uint64_t begin_wait = GetCurrentTimeMicros();
829
989
    while (_is_doing_segcompaction) {
830
        // change sync wait to async?
831
0
        _segcompacting_cond.wait(l);
832
0
    }
833
989
    uint64_t elapsed = GetCurrentTimeMicros() - begin_wait;
834
989
    if (elapsed >= MICROS_PER_SEC) {
835
0
        LOG(INFO) << "wait flying segcompaction finish time:" << elapsed << "us";
836
0
    }
837
989
    if (_segcompaction_status.load() != OK) {
838
0
        return Status::Error<SEGCOMPACTION_FAILED>(
839
0
                "BetaRowsetWriter meet invalid state, error code: {}",
840
0
                _segcompaction_status.load());
841
0
    }
842
989
    return Status::OK();
843
989
}
844
845
24
RowsetSharedPtr BaseBetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_rowset_meta) {
846
24
    if (_rowset_meta->newest_write_timestamp() == -1) {
847
0
        _rowset_meta->set_newest_write_timestamp(UnixSeconds());
848
0
    }
849
850
24
    build_rowset_meta_with_spec_field(*_rowset_meta, *spec_rowset_meta);
851
24
    RowsetSharedPtr rowset;
852
24
    auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path,
853
24
                                               _rowset_meta, &rowset);
854
24
    if (!status.ok()) {
855
0
        LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
856
0
        return nullptr;
857
0
    }
858
24
    _already_built = true;
859
24
    return rowset;
860
24
}
861
862
635
Status BaseBetaRowsetWriter::_close_file_writers() {
863
    // Flush and close segment files
864
635
    RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(),
865
635
                                   "failed to close segment creator when build new rowset");
866
635
    return Status::OK();
867
635
}
868
869
635
Status BetaRowsetWriter::_close_file_writers() {
870
635
    RETURN_IF_ERROR(BaseBetaRowsetWriter::_close_file_writers());
871
    // if _segment_start_id is not zero, that means it's a transient rowset writer for
872
    // MoW partial update, don't need to do segment compaction.
873
635
    if (_segment_start_id == 0) {
874
635
        if (_segcompaction_worker->cancel()) {
875
635
            std::lock_guard lk(_is_doing_segcompaction_lock);
876
635
            _is_doing_segcompaction = false;
877
635
            _segcompacting_cond.notify_all();
878
635
        } else {
879
0
            RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(),
880
0
                                           "segcompaction failed when build new rowset");
881
0
        }
882
635
        RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(),
883
635
                                       "rename last segments failed when build new rowset");
884
        // segcompaction worker would do file wrier's close function in compact_segments
885
635
        if (auto& seg_comp_file_writer = _segcompaction_worker->get_file_writer();
886
635
            nullptr != seg_comp_file_writer &&
887
635
            seg_comp_file_writer->state() != io::FileWriter::State::CLOSED) {
888
0
            RETURN_NOT_OK_STATUS_WITH_WARN(seg_comp_file_writer->close(),
889
0
                                           "close segment compaction worker failed");
890
0
        }
891
        // process delete bitmap for mow table
892
635
        if (is_segcompacted() && _segcompaction_worker->need_convert_delete_bitmap()) {
893
4
            auto converted_delete_bitmap = _segcompaction_worker->get_converted_delete_bitmap();
894
            // which means the segment compaction is triggerd
895
4
            if (converted_delete_bitmap != nullptr) {
896
4
                RowsetIdUnorderedSet rowsetids;
897
4
                rowsetids.insert(rowset_id());
898
4
                context().tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(),
899
4
                                                                     rowsetids);
900
4
                context().mow_context->delete_bitmap->remove({rowset_id(), 0, 0},
901
4
                                                             {rowset_id(), UINT32_MAX, INT64_MAX});
902
4
                context().mow_context->delete_bitmap->merge(*converted_delete_bitmap);
903
4
            }
904
4
        }
905
635
    }
906
635
    return Status::OK();
907
635
}
908
909
957
Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
910
957
    if (_calc_delete_bitmap_token != nullptr) {
911
8
        RETURN_IF_ERROR(_calc_delete_bitmap_token->wait());
912
8
    }
913
957
    RETURN_IF_ERROR(_close_file_writers());
914
957
    const auto total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted;
915
957
    RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(total_segment_num),
916
957
                                   "too many segments when build new rowset");
917
957
    RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get(), true));
918
957
    if (_is_pending) {
919
25
        _rowset_meta->set_rowset_state(COMMITTED);
920
932
    } else {
921
932
        _rowset_meta->set_rowset_state(VISIBLE);
922
932
    }
923
924
957
    if (_rowset_meta->newest_write_timestamp() == -1) {
925
620
        _rowset_meta->set_newest_write_timestamp(UnixSeconds());
926
620
    }
927
928
957
    _rowset_meta->set_tablet_schema(_context.tablet_schema);
929
930
    // If segment compaction occurs, the idx file info will become inaccurate.
931
957
    if ((_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) &&
932
957
        _num_segcompacted == 0) {
933
106
        if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id);
934
106
            !idx_files_info.has_value()) [[unlikely]] {
935
0
            LOG(ERROR) << "expected inverted index files info, but none presents: "
936
0
                       << idx_files_info.error();
937
106
        } else {
938
106
            _rowset_meta->add_inverted_index_files_info(idx_files_info.value());
939
106
        }
940
106
    }
941
942
957
    RETURN_NOT_OK_STATUS_WITH_WARN(
943
957
            RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta,
944
957
                                         &rowset),
945
957
            "rowset init failed when build new rowset");
946
957
    _already_built = true;
947
957
    return Status::OK();
948
957
}
949
950
0
int64_t BaseBetaRowsetWriter::_num_seg() const {
951
0
    return _num_segment;
952
0
}
953
954
1.02k
int64_t BetaRowsetWriter::_num_seg() const {
955
1.02k
    return is_segcompacted() ? _num_segcompacted : _num_segment;
956
1.02k
}
957
958
1.02k
Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num) {
959
1.02k
    int64_t num_rows_written = 0;
960
1.02k
    int64_t total_data_size = 0;
961
1.02k
    int64_t total_index_size = 0;
962
1.02k
    std::vector<KeyBoundsPB> segments_encoded_key_bounds;
963
1.02k
    std::vector<uint32_t> segment_rows;
964
1.02k
    {
965
1.02k
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
966
1.49k
        for (const auto& itr : _segid_statistics_map) {
967
1.49k
            num_rows_written += itr.second.row_num;
968
1.49k
            total_data_size += itr.second.data_size;
969
1.49k
            total_index_size += itr.second.index_size;
970
1.49k
            segments_encoded_key_bounds.push_back(itr.second.key_bounds);
971
            // segcompaction don't modify _segment_num_rows, so we need to get segment rows from _segid_statistics_map for load
972
1.49k
            segment_rows.push_back(cast_set<uint32_t>(itr.second.row_num));
973
1.49k
        }
974
1.02k
    }
975
1.02k
    if (segment_rows.empty()) {
976
        // vertical compaction and linked schema change will not record segment statistics,
977
        // it will record segment rows in _segment_num_rows
978
408
        RETURN_IF_ERROR(get_segment_num_rows(&segment_rows));
979
408
    }
980
981
4.00k
    for (auto& key_bound : _segments_encoded_key_bounds) {
982
4.00k
        segments_encoded_key_bounds.push_back(key_bound);
983
4.00k
    }
984
1.02k
    if (_segments_key_bounds_truncated.has_value()) {
985
9
        rowset_meta->set_segments_key_bounds_truncated(_segments_key_bounds_truncated.value());
986
9
    }
987
1.02k
    rowset_meta->set_num_segment_rows(segment_rows);
988
    // segment key bounds are empty in old version(before version 1.2.x). So we should not modify
989
    // the overlap property when key bounds are empty.
990
    // for mow table with cluster keys, the overlap is used for cluster keys,
991
    // the key_bounds is primary keys
992
1.02k
    if (!segments_encoded_key_bounds.empty() &&
993
1.02k
        !is_segment_overlapping(segments_encoded_key_bounds) &&
994
1.02k
        _context.tablet_schema->cluster_key_uids().empty()) {
995
551
        rowset_meta->set_segments_overlap(NONOVERLAPPING);
996
551
    }
997
998
1.02k
    auto segment_num = _num_seg();
999
1.02k
    if (check_segment_num && config::check_segment_when_build_rowset_meta) {
1000
0
        auto segments_encoded_key_bounds_size = segments_encoded_key_bounds.size();
1001
0
        if (segments_encoded_key_bounds_size != segment_num) {
1002
0
            return Status::InternalError(
1003
0
                    "segments_encoded_key_bounds_size should  equal to _num_seg, "
1004
0
                    "segments_encoded_key_bounds_size "
1005
0
                    "is: {}, _num_seg is: {}",
1006
0
                    segments_encoded_key_bounds_size, segment_num);
1007
0
        }
1008
0
        if (segment_rows.size() != segment_num) {
1009
0
            return Status::InternalError(
1010
0
                    "segment_rows size should equal to _num_seg, segment_rows size is: {}, "
1011
0
                    "_num_seg is {}, tablet={}, rowset={}, txn={}",
1012
0
                    segment_rows.size(), segment_num, _context.tablet_id,
1013
0
                    _context.rowset_id.to_string(), _context.txn_id);
1014
0
        }
1015
0
    }
1016
1017
1.02k
    rowset_meta->set_num_segments(segment_num);
1018
1.02k
    rowset_meta->set_num_rows(num_rows_written + _num_rows_written);
1019
1.02k
    rowset_meta->set_total_disk_size(total_data_size + _total_data_size + total_index_size +
1020
1.02k
                                     _total_index_size);
1021
1.02k
    rowset_meta->set_data_disk_size(total_data_size + _total_data_size);
1022
1.02k
    rowset_meta->set_index_disk_size(total_index_size + _total_index_size);
1023
1.02k
    rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds);
1024
    // TODO write zonemap to meta
1025
1.02k
    rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0);
1026
1.02k
    rowset_meta->set_creation_time(time(nullptr));
1027
1.02k
    return Status::OK();
1028
1.02k
}
1029
1030
63
Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr& rowset_ptr) {
1031
63
    Status status;
1032
63
    std::shared_ptr<RowsetMeta> tmp_rs_meta = std::make_shared<RowsetMeta>();
1033
63
    tmp_rs_meta->init(_rowset_meta.get());
1034
1035
63
    status = _build_rowset_meta(tmp_rs_meta.get());
1036
63
    if (!status.ok()) {
1037
0
        LOG(WARNING) << "failed to build rowset meta, res=" << status;
1038
0
        return status;
1039
0
    }
1040
1041
63
    status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, tmp_rs_meta,
1042
63
                                          &rowset_ptr);
1043
63
    DBUG_EXECUTE_IF("BaseBetaRowsetWriter::_build_tmp.create_rowset_failed",
1044
63
                    { status = Status::InternalError("create rowset failed"); });
1045
63
    if (!status.ok()) {
1046
0
        LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
1047
0
        return status;
1048
0
    }
1049
63
    return Status::OK();
1050
63
}
1051
1052
Status BaseBetaRowsetWriter::_create_file_writer(const std::string& path,
1053
                                                 io::FileWriterPtr& file_writer,
1054
5.49k
                                                 bool is_index_file) {
1055
5.49k
    io::FileWriterOptions opts = _context.get_file_writer_options(is_index_file);
1056
5.49k
    Status st = _context.fs()->create_file(path, &file_writer, &opts);
1057
5.49k
    if (!st.ok()) {
1058
0
        LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st;
1059
0
        return st;
1060
0
    }
1061
1062
5.49k
    DCHECK(file_writer != nullptr);
1063
5.49k
    return Status::OK();
1064
5.49k
}
1065
1066
Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer,
1067
5.47k
                                                FileType file_type) {
1068
5.47k
    auto segment_path = _context.segment_path(segment_id);
1069
5.47k
    if (file_type == FileType::INVERTED_INDEX_FILE) {
1070
255
        std::string prefix =
1071
255
                std::string {InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)};
1072
255
        std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix);
1073
255
        return _create_file_writer(index_path, file_writer, true /* is_index_file */);
1074
5.22k
    } else if (file_type == FileType::SEGMENT_FILE) {
1075
5.22k
        return _create_file_writer(segment_path, file_writer, false /* is_index_file */);
1076
5.22k
    }
1077
0
    return Status::Error<ErrorCode::INTERNAL_ERROR>(
1078
0
            fmt::format("failed to create file = {}, file type = {}", segment_path, file_type));
1079
5.47k
}
1080
1081
Status BaseBetaRowsetWriter::create_index_file_writer(uint32_t segment_id,
1082
228
                                                      IndexFileWriterPtr* index_file_writer) {
1083
228
    RETURN_IF_ERROR(RowsetWriter::create_index_file_writer(segment_id, index_file_writer));
1084
    // used for inverted index format v1
1085
228
    (*index_file_writer)
1086
228
            ->set_file_writer_opts(_context.get_file_writer_options(true /* is_index_file */));
1087
228
    return Status::OK();
1088
228
}
1089
1090
Status BetaRowsetWriter::create_segment_writer_for_segcompaction(
1091
12
        std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end) {
1092
12
    DCHECK(begin >= 0 && end >= 0);
1093
12
    std::string path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path,
1094
12
                                                                   _context.rowset_id, begin, end);
1095
12
    io::FileWriterPtr file_writer;
1096
12
    RETURN_IF_ERROR(_create_file_writer(path, file_writer, false /* is_index_file */));
1097
1098
12
    IndexFileWriterPtr index_file_writer;
1099
12
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
1100
8
        io::FileWriterPtr idx_file_writer;
1101
8
        std::string prefix(InvertedIndexDescriptor::get_index_file_path_prefix(path));
1102
8
        if (_context.tablet_schema->get_inverted_index_storage_format() !=
1103
8
            InvertedIndexStorageFormatPB::V1) {
1104
8
            std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix);
1105
8
            RETURN_IF_ERROR(
1106
8
                    _create_file_writer(index_path, idx_file_writer, true /* is_index_file */));
1107
8
        }
1108
8
        index_file_writer = std::make_unique<IndexFileWriter>(
1109
8
                _context.fs(), prefix, _context.rowset_id.to_string(), _num_segcompacted,
1110
8
                _context.tablet_schema->get_inverted_index_storage_format(),
1111
8
                std::move(idx_file_writer), true /* can_use_ram_dir */, _context.tablet_id);
1112
8
    }
1113
1114
12
    segment_v2::SegmentWriterOptions writer_options;
1115
12
    writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
1116
12
    writer_options.rowset_ctx = &_context;
1117
12
    writer_options.write_type = _context.write_type;
1118
12
    writer_options.write_type = DataWriteType::TYPE_COMPACTION;
1119
12
    writer_options.max_rows_per_segment = _context.max_rows_per_segment;
1120
12
    writer_options.mow_ctx = _context.mow_context;
1121
1122
12
    *writer = std::make_unique<segment_v2::SegmentWriter>(
1123
12
            file_writer.get(), _num_segcompacted, _context.tablet_schema, _context.tablet,
1124
12
            _context.data_dir, writer_options, index_file_writer.get());
1125
12
    if (auto& seg_writer = _segcompaction_worker->get_file_writer();
1126
12
        seg_writer != nullptr && seg_writer->state() != io::FileWriter::State::CLOSED) {
1127
0
        RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close());
1128
0
    }
1129
12
    _segcompaction_worker->get_file_writer().reset(file_writer.release());
1130
12
    if (auto& idx_file_writer = _segcompaction_worker->get_inverted_index_file_writer();
1131
12
        idx_file_writer != nullptr) {
1132
0
        RETURN_IF_ERROR(idx_file_writer->begin_close());
1133
0
        RETURN_IF_ERROR(idx_file_writer->finish_close());
1134
0
    }
1135
12
    _segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release());
1136
12
    return Status::OK();
1137
12
}
1138
1139
0
Status BaseBetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
1140
0
    DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
1141
0
                    { segnum = dp->param("segnum", 1024); });
1142
0
    if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) {
1143
0
        return Status::Error<TOO_MANY_SEGMENTS>(
1144
0
                "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, "
1145
0
                "_num_segment:{}, rowset_num_rows:{}. Please check if the bucket number is too "
1146
0
                "small or if the data is skewed.",
1147
0
                _context.tablet_id, _context.rowset_id.to_string(),
1148
0
                config::max_segment_num_per_rowset, _num_segment, get_rowset_num_rows());
1149
0
    }
1150
0
    return Status::OK();
1151
0
}
1152
1153
2.15k
Status BetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
1154
2.15k
    DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
1155
2.15k
                    { segnum = dp->param("segnum", 1024); });
1156
2.15k
    if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) {
1157
0
        return Status::Error<TOO_MANY_SEGMENTS>(
1158
0
                "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, "
1159
0
                "_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}. Please check if "
1160
0
                "the bucket number is too small or if the data is skewed.",
1161
0
                _context.tablet_id, _context.rowset_id.to_string(),
1162
0
                config::max_segment_num_per_rowset, _num_segment, _segcompacted_point,
1163
0
                _num_segcompacted, get_rowset_num_rows());
1164
0
    }
1165
2.15k
    return Status::OK();
1166
2.15k
}
1167
1168
1.19k
Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
1169
1.19k
    uint32_t segid_offset = segment_id - _segment_start_id;
1170
1.19k
    {
1171
1.19k
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
1172
1.19k
        CHECK_EQ(_segid_statistics_map.find(segment_id) == _segid_statistics_map.end(), true);
1173
1.19k
        _segid_statistics_map.emplace(segment_id, segstat);
1174
1.19k
        if (segment_id >= _segment_num_rows.size()) {
1175
1.19k
            _segment_num_rows.resize(segment_id + 1);
1176
1.19k
        }
1177
1.19k
        _segment_num_rows[segid_offset] = cast_set<uint32_t>(segstat.row_num);
1178
1.19k
    }
1179
1.19k
    VLOG_DEBUG << "_segid_statistics_map add new record. segment_id:" << segment_id
1180
0
               << " row_num:" << segstat.row_num << " data_size:" << segstat.data_size
1181
0
               << " index_size:" << segstat.index_size;
1182
1183
1.19k
    {
1184
1.19k
        std::lock_guard<std::mutex> lock(_segment_set_mutex);
1185
1.19k
        _segment_set.add(segid_offset);
1186
2.39k
        while (_segment_set.contains(_num_segment)) {
1187
1.19k
            _num_segment++;
1188
1.19k
        }
1189
1.19k
    }
1190
1191
1.19k
    if (_context.mow_context != nullptr) {
1192
63
        RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
1193
63
    }
1194
1.19k
    return Status::OK();
1195
1.19k
}
1196
1197
1.19k
Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
1198
1.19k
    RETURN_IF_ERROR(BaseBetaRowsetWriter::add_segment(segment_id, segstat));
1199
1.19k
    return _segcompaction_if_necessary();
1200
1.19k
}
1201
1202
Status BetaRowsetWriter::flush_segment_writer_for_segcompaction(
1203
        std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size,
1204
11
        KeyBoundsPB& key_bounds) {
1205
11
    uint32_t segid = (*writer)->get_segment_id();
1206
11
    uint32_t row_num = (*writer)->row_count();
1207
11
    uint64_t segment_size;
1208
1209
11
    auto s = (*writer)->finalize_footer(&segment_size);
1210
11
    if (!s.ok()) {
1211
0
        return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to finalize segment: {}",
1212
0
                                                      s.to_string());
1213
0
    }
1214
11
    int64_t inverted_index_file_size = 0;
1215
11
    RETURN_IF_ERROR((*writer)->close_inverted_index(&inverted_index_file_size));
1216
1217
11
    SegmentStatistics segstat;
1218
11
    segstat.row_num = row_num;
1219
11
    segstat.data_size = segment_size;
1220
11
    segstat.index_size = inverted_index_file_size;
1221
11
    segstat.key_bounds = key_bounds;
1222
11
    {
1223
11
        std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
1224
11
        CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true);
1225
11
        _segid_statistics_map.emplace(segid, segstat);
1226
11
    }
1227
11
    VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num
1228
0
               << " data_size:" << PrettyPrinter::print_bytes(segment_size)
1229
0
               << " index_size:" << PrettyPrinter::print_bytes(inverted_index_file_size);
1230
1231
11
    writer->reset();
1232
1233
11
    return Status::OK();
1234
11
}
1235
1236
#include "common/compile_check_end.h"
1237
} // namespace doris