Coverage Report

Created: 2026-07-02 23:26

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/segment_creator.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/rowset/segment_creator.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <cerrno> // IWYU pragma: keep
22
#include <chrono>
23
#include <filesystem>
24
#include <memory>
25
#include <sstream>
26
#include <thread>
27
#include <utility>
28
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/config.h"
31
#include "common/exception.h"
32
#include "common/logging.h"
33
#include "common/status.h"
34
#include "core/assert_cast.h"
35
#include "core/block/block.h"
36
#include "core/block/columns_with_type_and_name.h"
37
#include "core/column/column.h"
38
#include "core/column/column_nullable.h"
39
#include "core/column/column_string.h"
40
#include "core/column/column_variant.h"
41
#include "core/data_type/data_type.h"
42
#include "core/types.h"
43
#include "cpp/sync_point.h"
44
#include "io/fs/file_writer.h"
45
#include "storage/olap_define.h"
46
#include "storage/rowset/beta_rowset_writer.h" // SegmentStatistics
47
#include "storage/segment/row_binlog_segment_writer.h"
48
#include "storage/segment/segment_index_file_cache_loader.h"
49
#include "storage/segment/segment_writer.h"
50
#include "storage/segment/vertical_segment_writer.h"
51
#include "storage/tablet/tablet_schema.h"
52
#include "storage/utils.h"
53
#include "util/debug_points.h"
54
#include "util/json/json_parser.h"
55
#include "util/pretty_printer.h"
56
#include "util/stopwatch.hpp"
57
58
namespace doris {
59
using namespace ErrorCode;
60
61
SegmentFlusher::SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection& seg_files,
62
                               InvertedIndexFileCollection& idx_files)
63
1.02k
        : _context(context), _seg_files(seg_files), _idx_files(idx_files) {}
64
65
1.02k
SegmentFlusher::~SegmentFlusher() = default;
66
67
// NOLINTNEXTLINE(readability-function-cognitive-complexity)
68
Status SegmentFlusher::flush_single_block(const Block* block, int32_t segment_id,
69
18
                                          int64_t* flush_size) {
70
18
    if (block->rows() == 0) {
71
0
        return Status::OK();
72
0
    }
73
18
    Block flush_block(*block);
74
18
    bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024;
75
18
    bool use_vertical_segment_writer =
76
18
            config::enable_vertical_segment_writer && !_context.write_binlog_opt().enable;
77
18
    if (use_vertical_segment_writer) {
78
16
        std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
79
16
        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
80
16
        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows()));
81
16
        RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
82
16
    } else {
83
2
        std::unique_ptr<segment_v2::SegmentWriter> writer;
84
2
        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
85
2
        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows()));
86
2
        RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
87
2
    }
88
18
    return Status::OK();
89
18
}
90
91
629
Status SegmentFlusher::close() {
92
629
    RETURN_IF_ERROR(_seg_files.close());
93
629
    RETURN_IF_ERROR(_preload_segment_indexes_to_file_cache());
94
629
    RETURN_IF_ERROR(_idx_files.finish_close());
95
629
    return Status::OK();
96
629
}
97
98
void SegmentFlusher::_record_segment_index_file_cache_preload(
99
1.29k
        uint32_t segment_id, const segment_v2::SegmentIndexFileCacheInfo& info) {
100
1.29k
    std::lock_guard lock(_segment_index_file_cache_preloads_lock);
101
1.29k
    _segment_index_file_cache_preloads.push_back({segment_id, info});
102
1.29k
}
103
104
629
Status SegmentFlusher::_preload_segment_indexes_to_file_cache() {
105
629
    std::vector<segment_v2::SegmentIndexFileCachePreloadTask> tasks;
106
629
    {
107
629
        std::lock_guard lock(_segment_index_file_cache_preloads_lock);
108
629
        tasks.swap(_segment_index_file_cache_preloads);
109
629
    }
110
629
    return segment_v2::SegmentIndexFileCacheLoader::preload_segment_indexes_to_file_cache(_context,
111
629
                                                                                          tasks);
112
629
}
113
114
Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
115
1.81k
                                 const Block* block, size_t row_pos, size_t num_rows) {
116
1.81k
    RETURN_IF_ERROR(segment_writer->append_block(block, row_pos, num_rows));
117
1.81k
    _num_rows_written += num_rows;
118
1.81k
    return Status::OK();
119
1.81k
}
120
121
Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
122
16
                                 const Block* block, size_t row_pos, size_t num_rows) {
123
16
    RETURN_IF_ERROR(segment_writer->batch_block(block, row_pos, num_rows));
124
16
    RETURN_IF_ERROR(segment_writer->write_batch());
125
16
    _num_rows_written += num_rows;
126
16
    return Status::OK();
127
16
}
128
129
Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
130
1.28k
                                              int32_t segment_id, bool no_compression) {
131
1.28k
    io::FileWriterPtr segment_file_writer;
132
1.28k
    RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, segment_file_writer));
133
134
1.28k
    IndexFileWriterPtr index_file_writer;
135
1.28k
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
136
271
        RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, &index_file_writer));
137
271
    }
138
139
1.28k
    segment_v2::SegmentWriterOptions writer_options;
140
1.28k
    writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
141
1.28k
    writer_options.rowset_ctx = &_context;
142
1.28k
    writer_options.write_type = _context.write_type;
143
1.28k
    writer_options.max_rows_per_segment = _context.max_rows_per_segment;
144
1.28k
    writer_options.mow_ctx = _context.mow_context;
145
1.28k
    if (no_compression) {
146
2
        writer_options.compression_type = NO_COMPRESSION;
147
2
    }
148
149
1.28k
    if (_context.write_binlog_opt().enable) {
150
2
        writer = std::make_unique<segment_v2::RowBinlogSegmentWriter>(
151
2
                segment_file_writer.get(), segment_id, _context.tablet_schema, _context.tablet,
152
2
                _context.data_dir, writer_options,
153
2
                _context.write_binlog_opt().write_binlog_config());
154
1.28k
    } else {
155
1.28k
        writer = std::make_unique<segment_v2::SegmentWriter>(
156
1.28k
                segment_file_writer.get(), segment_id, _context.tablet_schema, _context.tablet,
157
1.28k
                _context.data_dir, writer_options, index_file_writer.get());
158
1.28k
    }
159
1.28k
    RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(segment_file_writer)));
160
1.28k
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
161
271
        RETURN_IF_ERROR(_idx_files.add(segment_id, std::move(index_file_writer)));
162
271
    }
163
1.28k
    auto s = writer->init();
164
1.28k
    if (!s.ok()) {
165
0
        LOG(WARNING) << "failed to init segment writer: " << s.to_string();
166
0
        writer.reset();
167
0
        return s;
168
0
    }
169
1.28k
    return Status::OK();
170
1.28k
}
171
172
Status SegmentFlusher::_create_segment_writer(
173
        std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int32_t segment_id,
174
16
        bool no_compression) {
175
16
    io::FileWriterPtr segment_file_writer;
176
16
    RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, segment_file_writer));
177
178
16
    IndexFileWriterPtr index_file_writer;
179
16
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
180
2
        RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, &index_file_writer));
181
2
    }
182
183
16
    segment_v2::VerticalSegmentWriterOptions writer_options;
184
16
    writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
185
16
    writer_options.rowset_ctx = &_context;
186
16
    writer_options.write_type = _context.write_type;
187
16
    writer_options.mow_ctx = _context.mow_context;
188
16
    if (no_compression) {
189
16
        writer_options.compression_type = NO_COMPRESSION;
190
16
    }
191
192
16
    writer = std::make_unique<segment_v2::VerticalSegmentWriter>(
193
16
            segment_file_writer.get(), segment_id, _context.tablet_schema, _context.tablet,
194
16
            _context.data_dir, writer_options, index_file_writer.get());
195
16
    RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(segment_file_writer)));
196
16
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
197
2
        RETURN_IF_ERROR(_idx_files.add(segment_id, std::move(index_file_writer)));
198
2
    }
199
16
    auto s = writer->init();
200
16
    if (!s.ok()) {
201
0
        LOG(WARNING) << "failed to init segment writer: " << s.to_string();
202
0
        writer.reset();
203
0
        return s;
204
0
    }
205
206
16
    VLOG_DEBUG << "create new segment writer, tablet_id:" << _context.tablet_id
207
0
               << " segment id: " << segment_id << " filename: " << writer->data_dir_path()
208
0
               << " rowset_id:" << _context.rowset_id;
209
16
    return Status::OK();
210
16
}
211
212
Status SegmentFlusher::_flush_segment_writer(
213
16
        std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int64_t* flush_size) {
214
16
    MonotonicStopWatch total_timer;
215
16
    total_timer.start();
216
217
16
    uint32_t row_num = writer->num_rows_written();
218
16
    _num_rows_updated += writer->num_rows_updated();
219
16
    _num_rows_deleted += writer->num_rows_deleted();
220
16
    _num_rows_new_added += writer->num_rows_new_added();
221
16
    _num_rows_filtered += writer->num_rows_filtered();
222
223
16
    if (row_num == 0) {
224
0
        return Status::OK();
225
0
    }
226
227
16
    MonotonicStopWatch finalize_timer;
228
16
    finalize_timer.start();
229
16
    uint64_t segment_file_size;
230
16
    uint64_t common_index_size;
231
16
    segment_v2::SegmentIndexFileCacheInfo index_file_cache_info;
232
16
    Status s = writer->finalize(&segment_file_size, &common_index_size, &index_file_cache_info);
233
16
    finalize_timer.stop();
234
235
16
    if (!s.ok()) {
236
0
        return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string());
237
0
    }
238
239
16
    DBUG_EXECUTE_IF("SegmentFlusher._flush_segment_writer.after_finalize.sleep",
240
16
                    { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); });
241
242
16
    MonotonicStopWatch inverted_index_timer;
243
16
    inverted_index_timer.start();
244
16
    int64_t inverted_index_file_size = 0;
245
16
    RETURN_IF_ERROR(writer->close_inverted_index(&inverted_index_file_size));
246
16
    inverted_index_timer.stop();
247
248
16
    VLOG_DEBUG << "tablet_id:" << _context.tablet_id
249
0
               << " flushing filename: " << writer->data_dir_path()
250
0
               << " rowset_id:" << _context.rowset_id;
251
252
16
    KeyBoundsPB key_bounds;
253
16
    Slice min_key = writer->min_encoded_key();
254
16
    Slice max_key = writer->max_encoded_key();
255
16
    DCHECK_LE(min_key.compare(max_key), 0);
256
16
    key_bounds.set_min_key(min_key.to_string());
257
16
    key_bounds.set_max_key(max_key.to_string());
258
259
16
    uint32_t segment_id = writer->segment_id();
260
16
    TEST_SYNC_POINT_CALLBACK("SegmentFlusher::flush_vertical_segment_writer", &segment_id);
261
16
    SegmentStatistics segstat;
262
16
    segstat.row_num = row_num;
263
16
    segstat.data_size = segment_file_size;
264
16
    segstat.index_size = inverted_index_file_size;
265
16
    segstat.key_bounds = key_bounds;
266
267
16
    writer.reset();
268
16
    _record_segment_index_file_cache_preload(segment_id, index_file_cache_info);
269
270
16
    MonotonicStopWatch collector_timer;
271
16
    collector_timer.start();
272
16
    RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat));
273
16
    collector_timer.stop();
274
275
16
    total_timer.stop();
276
277
16
    LOG(INFO) << "tablet_id:" << _context.tablet_id
278
16
              << ", flushing rowset_dir: " << _context.tablet_path
279
16
              << ", rowset_id:" << _context.rowset_id
280
16
              << ", data size:" << PrettyPrinter::print_bytes(segstat.data_size)
281
16
              << ", index size:" << PrettyPrinter::print_bytes(segstat.index_size)
282
16
              << ", timing breakdown: total=" << total_timer.elapsed_time_milliseconds() << "ms"
283
16
              << ", finalize=" << finalize_timer.elapsed_time_milliseconds() << "ms"
284
16
              << ", inverted_index=" << inverted_index_timer.elapsed_time_milliseconds() << "ms"
285
16
              << ", collector=" << collector_timer.elapsed_time_milliseconds() << "ms";
286
287
16
    if (flush_size) {
288
12
        *flush_size = segment_file_size;
289
12
    }
290
16
    return Status::OK();
291
16
}
292
293
Status SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
294
1.28k
                                             int64_t* flush_size) {
295
1.28k
    MonotonicStopWatch total_timer;
296
1.28k
    total_timer.start();
297
298
1.28k
    uint32_t row_num = writer->num_rows_written();
299
1.28k
    _num_rows_updated += writer->num_rows_updated();
300
1.28k
    _num_rows_deleted += writer->num_rows_deleted();
301
1.28k
    _num_rows_new_added += writer->num_rows_new_added();
302
1.28k
    _num_rows_filtered += writer->num_rows_filtered();
303
304
1.28k
    if (row_num == 0) {
305
0
        return Status::OK();
306
0
    }
307
308
1.28k
    MonotonicStopWatch finalize_timer;
309
1.28k
    finalize_timer.start();
310
1.28k
    uint64_t segment_file_size;
311
1.28k
    uint64_t common_index_size;
312
1.28k
    segment_v2::SegmentIndexFileCacheInfo index_file_cache_info;
313
1.28k
    Status s = writer->finalize(&segment_file_size, &common_index_size, &index_file_cache_info);
314
1.28k
    finalize_timer.stop();
315
316
1.28k
    if (!s.ok()) {
317
0
        return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string());
318
0
    }
319
320
1.28k
    DBUG_EXECUTE_IF("SegmentFlusher._flush_segment_writer.after_finalize.sleep",
321
1.28k
                    { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); });
322
323
1.28k
    MonotonicStopWatch inverted_index_timer;
324
1.28k
    inverted_index_timer.start();
325
1.28k
    int64_t inverted_index_file_size = 0;
326
1.28k
    RETURN_IF_ERROR(writer->close_inverted_index(&inverted_index_file_size));
327
1.28k
    inverted_index_timer.stop();
328
329
1.28k
    VLOG_DEBUG << "tablet_id:" << _context.tablet_id
330
0
               << " flushing rowset_dir: " << _context.tablet_path
331
0
               << " rowset_id:" << _context.rowset_id;
332
333
1.28k
    KeyBoundsPB key_bounds;
334
1.28k
    Slice min_key = writer->min_encoded_key();
335
1.28k
    Slice max_key = writer->max_encoded_key();
336
1.28k
    DCHECK_LE(min_key.compare(max_key), 0);
337
1.28k
    key_bounds.set_min_key(min_key.to_string());
338
1.28k
    key_bounds.set_max_key(max_key.to_string());
339
340
1.28k
    uint32_t segment_id = writer->get_segment_id();
341
1.28k
    SegmentStatistics segstat;
342
1.28k
    segstat.row_num = row_num;
343
1.28k
    segstat.data_size = segment_file_size;
344
1.28k
    segstat.index_size = inverted_index_file_size;
345
1.28k
    segstat.key_bounds = key_bounds;
346
347
1.28k
    writer.reset();
348
1.28k
    _record_segment_index_file_cache_preload(segment_id, index_file_cache_info);
349
350
1.28k
    MonotonicStopWatch collector_timer;
351
1.28k
    collector_timer.start();
352
1.28k
    RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat));
353
1.28k
    collector_timer.stop();
354
355
1.28k
    total_timer.stop();
356
357
1.28k
    LOG(INFO) << "tablet_id:" << _context.tablet_id
358
1.28k
              << ", flushing rowset_dir: " << _context.tablet_path
359
1.28k
              << ", rowset_id:" << _context.rowset_id
360
1.28k
              << ", data size:" << PrettyPrinter::print_bytes(segstat.data_size)
361
1.28k
              << ", index size:" << PrettyPrinter::print_bytes(segstat.index_size)
362
1.28k
              << ", timing breakdown: total=" << total_timer.elapsed_time_milliseconds() << "ms"
363
1.28k
              << ", finalize=" << finalize_timer.elapsed_time_milliseconds() << "ms"
364
1.28k
              << ", inverted_index=" << inverted_index_timer.elapsed_time_milliseconds() << "ms"
365
1.28k
              << ", collector=" << collector_timer.elapsed_time_milliseconds() << "ms";
366
367
1.28k
    if (flush_size) {
368
0
        *flush_size = segment_file_size;
369
0
    }
370
1.28k
    return Status::OK();
371
1.28k
}
372
373
Status SegmentFlusher::create_writer(std::unique_ptr<SegmentFlusher::Writer>& writer,
374
1.28k
                                     uint32_t segment_id) {
375
1.28k
    std::unique_ptr<segment_v2::SegmentWriter> segment_writer;
376
1.28k
    RETURN_IF_ERROR(_create_segment_writer(segment_writer, segment_id));
377
1.28k
    DCHECK(segment_writer != nullptr);
378
1.28k
    writer.reset(new SegmentFlusher::Writer(this, segment_writer));
379
1.28k
    return Status::OK();
380
1.28k
}
381
382
SegmentFlusher::Writer::Writer(SegmentFlusher* flusher,
383
                               std::unique_ptr<segment_v2::SegmentWriter>& segment_writer)
384
1.28k
        : _flusher(flusher), _writer(std::move(segment_writer)) {};
385
386
1.28k
SegmentFlusher::Writer::~Writer() = default;
387
388
1.28k
Status SegmentFlusher::Writer::flush() {
389
1.28k
    return _flusher->_flush_segment_writer(_writer);
390
1.28k
}
391
392
2.16k
int64_t SegmentFlusher::Writer::max_row_to_add(size_t row_avg_size_in_bytes) {
393
2.16k
    return _writer->max_row_to_add(row_avg_size_in_bytes);
394
2.16k
}
395
396
SegmentCreator::SegmentCreator(RowsetWriterContext& context, SegmentFileCollection& seg_files,
397
                               InvertedIndexFileCollection& idx_files)
398
1.02k
        : _segment_flusher(context, seg_files, idx_files) {}
399
400
1.45k
Status SegmentCreator::add_block(const Block* block) {
401
1.45k
    if (block->rows() == 0) {
402
2
        return Status::OK();
403
2
    }
404
405
1.45k
    size_t block_size_in_bytes = block->bytes();
406
1.45k
    size_t block_row_num = block->rows();
407
1.45k
    size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num);
408
1.45k
    size_t row_offset = 0;
409
410
1.45k
    if (_flush_writer == nullptr) {
411
921
        RETURN_IF_ERROR(_segment_flusher.create_writer(_flush_writer, allocate_segment_id()));
412
921
    }
413
414
1.81k
    do {
415
1.81k
        auto max_row_add = _flush_writer->max_row_to_add(row_avg_size_in_bytes);
416
1.81k
        if (UNLIKELY(max_row_add < 1)) {
417
            // no space for another single row, need flush now
418
359
            RETURN_IF_ERROR(flush());
419
359
            RETURN_IF_ERROR(_segment_flusher.create_writer(_flush_writer, allocate_segment_id()));
420
359
            max_row_add = _flush_writer->max_row_to_add(row_avg_size_in_bytes);
421
359
            DCHECK(max_row_add > 0);
422
359
        }
423
1.81k
        size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add));
424
1.81k
        RETURN_IF_ERROR(_flush_writer->add_rows(block, row_offset, input_row_num));
425
1.81k
        row_offset += input_row_num;
426
1.81k
    } while (row_offset < block_row_num);
427
428
1.45k
    return Status::OK();
429
1.45k
}
430
431
2.11k
Status SegmentCreator::flush() {
432
2.11k
    if (_flush_writer == nullptr) {
433
838
        return Status::OK();
434
838
    }
435
1.28k
    RETURN_IF_ERROR(_flush_writer->flush());
436
1.28k
    _flush_writer.reset();
437
1.28k
    return Status::OK();
438
1.28k
}
439
440
Status SegmentCreator::flush_single_block(const Block* block, int32_t segment_id,
441
18
                                          int64_t* flush_size) {
442
18
    if (block->rows() == 0) {
443
0
        return Status::OK();
444
0
    }
445
18
    RETURN_IF_ERROR(_segment_flusher.flush_single_block(block, segment_id, flush_size));
446
18
    return Status::OK();
447
18
}
448
449
629
Status SegmentCreator::close() {
450
629
    RETURN_IF_ERROR(flush());
451
629
    RETURN_IF_ERROR(_segment_flusher.close());
452
629
    return Status::OK();
453
629
}
454
455
} // namespace doris