Coverage Report

Created: 2026-05-20 21:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/segment_creator.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/rowset/segment_creator.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <cerrno> // IWYU pragma: keep
22
#include <chrono>
23
#include <filesystem>
24
#include <memory>
25
#include <sstream>
26
#include <thread>
27
#include <utility>
28
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/config.h"
31
#include "common/exception.h"
32
#include "common/logging.h"
33
#include "common/status.h"
34
#include "core/assert_cast.h"
35
#include "core/block/block.h"
36
#include "core/block/columns_with_type_and_name.h"
37
#include "core/column/column.h"
38
#include "core/column/column_nullable.h"
39
#include "core/column/column_string.h"
40
#include "core/column/column_variant.h"
41
#include "core/data_type/data_type.h"
42
#include "core/types.h"
43
#include "io/fs/file_writer.h"
44
#include "storage/olap_define.h"
45
#include "storage/rowset/beta_rowset_writer.h" // SegmentStatistics
46
#include "storage/segment/row_binlog_segment_writer.h"
47
#include "storage/segment/segment_writer.h"
48
#include "storage/segment/vertical_segment_writer.h"
49
#include "storage/tablet/tablet_schema.h"
50
#include "storage/utils.h"
51
#include "util/debug_points.h"
52
#include "util/json/json_parser.h"
53
#include "util/pretty_printer.h"
54
#include "util/stopwatch.hpp"
55
56
namespace doris {
57
using namespace ErrorCode;
58
59
SegmentFlusher::SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection& seg_files,
60
                               InvertedIndexFileCollection& idx_files)
61
1.01k
        : _context(context), _seg_files(seg_files), _idx_files(idx_files) {}
62
63
1.01k
SegmentFlusher::~SegmentFlusher() = default;
64
65
// NOLINTNEXTLINE(readability-function-cognitive-complexity)
66
Status SegmentFlusher::flush_single_block(const Block* block, int32_t segment_id,
67
16
                                          int64_t* flush_size) {
68
16
    if (block->rows() == 0) {
69
0
        return Status::OK();
70
0
    }
71
16
    Block flush_block(*block);
72
16
    bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024;
73
16
    bool use_vertical_segment_writer =
74
16
            config::enable_vertical_segment_writer && !_context.write_binlog_opt().enable;
75
16
    if (use_vertical_segment_writer) {
76
14
        std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
77
14
        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
78
14
        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows()));
79
14
        RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
80
14
    } else {
81
2
        std::unique_ptr<segment_v2::SegmentWriter> writer;
82
2
        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
83
2
        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows()));
84
2
        RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
85
2
    }
86
16
    return Status::OK();
87
16
}
88
89
654
Status SegmentFlusher::close() {
90
654
    RETURN_IF_ERROR(_seg_files.close());
91
654
    RETURN_IF_ERROR(_idx_files.finish_close());
92
654
    return Status::OK();
93
654
}
94
95
Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
96
1.72k
                                 const Block* block, size_t row_pos, size_t num_rows) {
97
1.72k
    RETURN_IF_ERROR(segment_writer->append_block(block, row_pos, num_rows));
98
1.72k
    _num_rows_written += num_rows;
99
1.72k
    return Status::OK();
100
1.72k
}
101
102
Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
103
14
                                 const Block* block, size_t row_pos, size_t num_rows) {
104
14
    RETURN_IF_ERROR(segment_writer->batch_block(block, row_pos, num_rows));
105
14
    RETURN_IF_ERROR(segment_writer->write_batch());
106
14
    _num_rows_written += num_rows;
107
14
    return Status::OK();
108
14
}
109
110
Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
111
1.19k
                                              int32_t segment_id, bool no_compression) {
112
1.19k
    io::FileWriterPtr segment_file_writer;
113
1.19k
    RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, segment_file_writer));
114
115
1.19k
    IndexFileWriterPtr index_file_writer;
116
1.19k
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
117
228
        RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, &index_file_writer));
118
228
    }
119
120
1.19k
    segment_v2::SegmentWriterOptions writer_options;
121
1.19k
    writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
122
1.19k
    writer_options.rowset_ctx = &_context;
123
1.19k
    writer_options.write_type = _context.write_type;
124
1.19k
    writer_options.max_rows_per_segment = _context.max_rows_per_segment;
125
1.19k
    writer_options.mow_ctx = _context.mow_context;
126
1.19k
    if (no_compression) {
127
2
        writer_options.compression_type = NO_COMPRESSION;
128
2
    }
129
130
1.19k
    if (_context.write_binlog_opt().enable) {
131
2
        writer = std::make_unique<segment_v2::RowBinlogSegmentWriter>(
132
2
                segment_file_writer.get(), segment_id, _context.tablet_schema, _context.tablet,
133
2
                _context.data_dir, writer_options,
134
2
                _context.write_binlog_opt().write_binlog_config());
135
1.19k
    } else {
136
1.19k
        writer = std::make_unique<segment_v2::SegmentWriter>(
137
1.19k
                segment_file_writer.get(), segment_id, _context.tablet_schema, _context.tablet,
138
1.19k
                _context.data_dir, writer_options, index_file_writer.get());
139
1.19k
    }
140
1.19k
    RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(segment_file_writer)));
141
1.19k
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
142
228
        RETURN_IF_ERROR(_idx_files.add(segment_id, std::move(index_file_writer)));
143
228
    }
144
1.19k
    auto s = writer->init();
145
1.19k
    if (!s.ok()) {
146
0
        LOG(WARNING) << "failed to init segment writer: " << s.to_string();
147
0
        writer.reset();
148
0
        return s;
149
0
    }
150
1.19k
    return Status::OK();
151
1.19k
}
152
153
Status SegmentFlusher::_create_segment_writer(
154
        std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int32_t segment_id,
155
14
        bool no_compression) {
156
14
    io::FileWriterPtr segment_file_writer;
157
14
    RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, segment_file_writer));
158
159
14
    IndexFileWriterPtr index_file_writer;
160
14
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
161
0
        RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, &index_file_writer));
162
0
    }
163
164
14
    segment_v2::VerticalSegmentWriterOptions writer_options;
165
14
    writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
166
14
    writer_options.rowset_ctx = &_context;
167
14
    writer_options.write_type = _context.write_type;
168
14
    writer_options.mow_ctx = _context.mow_context;
169
14
    if (no_compression) {
170
14
        writer_options.compression_type = NO_COMPRESSION;
171
14
    }
172
173
14
    writer = std::make_unique<segment_v2::VerticalSegmentWriter>(
174
14
            segment_file_writer.get(), segment_id, _context.tablet_schema, _context.tablet,
175
14
            _context.data_dir, writer_options, index_file_writer.get());
176
14
    RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(segment_file_writer)));
177
14
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
178
0
        RETURN_IF_ERROR(_idx_files.add(segment_id, std::move(index_file_writer)));
179
0
    }
180
14
    auto s = writer->init();
181
14
    if (!s.ok()) {
182
0
        LOG(WARNING) << "failed to init segment writer: " << s.to_string();
183
0
        writer.reset();
184
0
        return s;
185
0
    }
186
187
14
    VLOG_DEBUG << "create new segment writer, tablet_id:" << _context.tablet_id
188
0
               << " segment id: " << segment_id << " filename: " << writer->data_dir_path()
189
0
               << " rowset_id:" << _context.rowset_id;
190
14
    return Status::OK();
191
14
}
192
193
Status SegmentFlusher::_flush_segment_writer(
194
14
        std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int64_t* flush_size) {
195
14
    MonotonicStopWatch total_timer;
196
14
    total_timer.start();
197
198
14
    uint32_t row_num = writer->num_rows_written();
199
14
    _num_rows_updated += writer->num_rows_updated();
200
14
    _num_rows_deleted += writer->num_rows_deleted();
201
14
    _num_rows_new_added += writer->num_rows_new_added();
202
14
    _num_rows_filtered += writer->num_rows_filtered();
203
204
14
    if (row_num == 0) {
205
0
        return Status::OK();
206
0
    }
207
208
14
    MonotonicStopWatch finalize_timer;
209
14
    finalize_timer.start();
210
14
    uint64_t segment_file_size;
211
14
    uint64_t common_index_size;
212
14
    Status s = writer->finalize(&segment_file_size, &common_index_size);
213
14
    finalize_timer.stop();
214
215
14
    if (!s.ok()) {
216
0
        return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string());
217
0
    }
218
219
14
    DBUG_EXECUTE_IF("SegmentFlusher._flush_segment_writer.after_finalize.sleep",
220
14
                    { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); });
221
222
14
    MonotonicStopWatch inverted_index_timer;
223
14
    inverted_index_timer.start();
224
14
    int64_t inverted_index_file_size = 0;
225
14
    RETURN_IF_ERROR(writer->close_inverted_index(&inverted_index_file_size));
226
14
    inverted_index_timer.stop();
227
228
14
    VLOG_DEBUG << "tablet_id:" << _context.tablet_id
229
0
               << " flushing filename: " << writer->data_dir_path()
230
0
               << " rowset_id:" << _context.rowset_id;
231
232
14
    KeyBoundsPB key_bounds;
233
14
    Slice min_key = writer->min_encoded_key();
234
14
    Slice max_key = writer->max_encoded_key();
235
14
    DCHECK_LE(min_key.compare(max_key), 0);
236
14
    key_bounds.set_min_key(min_key.to_string());
237
14
    key_bounds.set_max_key(max_key.to_string());
238
239
14
    uint32_t segment_id = writer->segment_id();
240
14
    SegmentStatistics segstat;
241
14
    segstat.row_num = row_num;
242
14
    segstat.data_size = segment_file_size;
243
14
    segstat.index_size = inverted_index_file_size;
244
14
    segstat.key_bounds = key_bounds;
245
246
14
    writer.reset();
247
248
14
    MonotonicStopWatch collector_timer;
249
14
    collector_timer.start();
250
14
    RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat));
251
14
    collector_timer.stop();
252
253
14
    total_timer.stop();
254
255
14
    LOG(INFO) << "tablet_id:" << _context.tablet_id
256
14
              << ", flushing rowset_dir: " << _context.tablet_path
257
14
              << ", rowset_id:" << _context.rowset_id
258
14
              << ", data size:" << PrettyPrinter::print_bytes(segstat.data_size)
259
14
              << ", index size:" << PrettyPrinter::print_bytes(segstat.index_size)
260
14
              << ", timing breakdown: total=" << total_timer.elapsed_time_milliseconds() << "ms"
261
14
              << ", finalize=" << finalize_timer.elapsed_time_milliseconds() << "ms"
262
14
              << ", inverted_index=" << inverted_index_timer.elapsed_time_milliseconds() << "ms"
263
14
              << ", collector=" << collector_timer.elapsed_time_milliseconds() << "ms";
264
265
14
    if (flush_size) {
266
12
        *flush_size = segment_file_size;
267
12
    }
268
14
    return Status::OK();
269
14
}
270
271
Status SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
272
1.19k
                                             int64_t* flush_size) {
273
1.19k
    MonotonicStopWatch total_timer;
274
1.19k
    total_timer.start();
275
276
1.19k
    uint32_t row_num = writer->num_rows_written();
277
1.19k
    _num_rows_updated += writer->num_rows_updated();
278
1.19k
    _num_rows_deleted += writer->num_rows_deleted();
279
1.19k
    _num_rows_new_added += writer->num_rows_new_added();
280
1.19k
    _num_rows_filtered += writer->num_rows_filtered();
281
282
1.19k
    if (row_num == 0) {
283
0
        return Status::OK();
284
0
    }
285
286
1.19k
    MonotonicStopWatch finalize_timer;
287
1.19k
    finalize_timer.start();
288
1.19k
    uint64_t segment_file_size;
289
1.19k
    uint64_t common_index_size;
290
1.19k
    Status s = writer->finalize(&segment_file_size, &common_index_size);
291
1.19k
    finalize_timer.stop();
292
293
1.19k
    if (!s.ok()) {
294
0
        return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string());
295
0
    }
296
297
1.19k
    DBUG_EXECUTE_IF("SegmentFlusher._flush_segment_writer.after_finalize.sleep",
298
1.19k
                    { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); });
299
300
1.19k
    MonotonicStopWatch inverted_index_timer;
301
1.19k
    inverted_index_timer.start();
302
1.19k
    int64_t inverted_index_file_size = 0;
303
1.19k
    RETURN_IF_ERROR(writer->close_inverted_index(&inverted_index_file_size));
304
1.19k
    inverted_index_timer.stop();
305
306
1.19k
    VLOG_DEBUG << "tablet_id:" << _context.tablet_id
307
0
               << " flushing rowset_dir: " << _context.tablet_path
308
0
               << " rowset_id:" << _context.rowset_id;
309
310
1.19k
    KeyBoundsPB key_bounds;
311
1.19k
    Slice min_key = writer->min_encoded_key();
312
1.19k
    Slice max_key = writer->max_encoded_key();
313
1.19k
    DCHECK_LE(min_key.compare(max_key), 0);
314
1.19k
    key_bounds.set_min_key(min_key.to_string());
315
1.19k
    key_bounds.set_max_key(max_key.to_string());
316
317
1.19k
    uint32_t segment_id = writer->get_segment_id();
318
1.19k
    SegmentStatistics segstat;
319
1.19k
    segstat.row_num = row_num;
320
1.19k
    segstat.data_size = segment_file_size;
321
1.19k
    segstat.index_size = inverted_index_file_size;
322
1.19k
    segstat.key_bounds = key_bounds;
323
324
1.19k
    writer.reset();
325
326
1.19k
    MonotonicStopWatch collector_timer;
327
1.19k
    collector_timer.start();
328
1.19k
    RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat));
329
1.19k
    collector_timer.stop();
330
331
1.19k
    total_timer.stop();
332
333
1.19k
    LOG(INFO) << "tablet_id:" << _context.tablet_id
334
1.19k
              << ", flushing rowset_dir: " << _context.tablet_path
335
1.19k
              << ", rowset_id:" << _context.rowset_id
336
1.19k
              << ", data size:" << PrettyPrinter::print_bytes(segstat.data_size)
337
1.19k
              << ", index size:" << PrettyPrinter::print_bytes(segstat.index_size)
338
1.19k
              << ", timing breakdown: total=" << total_timer.elapsed_time_milliseconds() << "ms"
339
1.19k
              << ", finalize=" << finalize_timer.elapsed_time_milliseconds() << "ms"
340
1.19k
              << ", inverted_index=" << inverted_index_timer.elapsed_time_milliseconds() << "ms"
341
1.19k
              << ", collector=" << collector_timer.elapsed_time_milliseconds() << "ms";
342
343
1.19k
    if (flush_size) {
344
0
        *flush_size = segment_file_size;
345
0
    }
346
1.19k
    return Status::OK();
347
1.19k
}
348
349
Status SegmentFlusher::create_writer(std::unique_ptr<SegmentFlusher::Writer>& writer,
350
1.19k
                                     uint32_t segment_id) {
351
1.19k
    std::unique_ptr<segment_v2::SegmentWriter> segment_writer;
352
1.19k
    RETURN_IF_ERROR(_create_segment_writer(segment_writer, segment_id));
353
1.19k
    DCHECK(segment_writer != nullptr);
354
1.19k
    writer.reset(new SegmentFlusher::Writer(this, segment_writer));
355
1.19k
    return Status::OK();
356
1.19k
}
357
358
SegmentFlusher::Writer::Writer(SegmentFlusher* flusher,
359
                               std::unique_ptr<segment_v2::SegmentWriter>& segment_writer)
360
1.19k
        : _flusher(flusher), _writer(std::move(segment_writer)) {};
361
362
1.19k
SegmentFlusher::Writer::~Writer() = default;
363
364
1.19k
Status SegmentFlusher::Writer::flush() {
365
1.19k
    return _flusher->_flush_segment_writer(_writer);
366
1.19k
}
367
368
2.07k
int64_t SegmentFlusher::Writer::max_row_to_add(size_t row_avg_size_in_bytes) {
369
2.07k
    return _writer->max_row_to_add(row_avg_size_in_bytes);
370
2.07k
}
371
372
SegmentCreator::SegmentCreator(RowsetWriterContext& context, SegmentFileCollection& seg_files,
373
                               InvertedIndexFileCollection& idx_files)
374
1.01k
        : _segment_flusher(context, seg_files, idx_files) {}
375
376
1.36k
Status SegmentCreator::add_block(const Block* block) {
377
1.36k
    if (block->rows() == 0) {
378
0
        return Status::OK();
379
0
    }
380
381
1.36k
    size_t block_size_in_bytes = block->bytes();
382
1.36k
    size_t block_row_num = block->rows();
383
1.36k
    size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num);
384
1.36k
    size_t row_offset = 0;
385
386
1.36k
    if (_flush_writer == nullptr) {
387
838
        RETURN_IF_ERROR(_segment_flusher.create_writer(_flush_writer, allocate_segment_id()));
388
838
    }
389
390
1.72k
    do {
391
1.72k
        auto max_row_add = _flush_writer->max_row_to_add(row_avg_size_in_bytes);
392
1.72k
        if (UNLIKELY(max_row_add < 1)) {
393
            // no space for another single row, need flush now
394
354
            RETURN_IF_ERROR(flush());
395
354
            RETURN_IF_ERROR(_segment_flusher.create_writer(_flush_writer, allocate_segment_id()));
396
354
            max_row_add = _flush_writer->max_row_to_add(row_avg_size_in_bytes);
397
354
            DCHECK(max_row_add > 0);
398
354
        }
399
1.72k
        size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add));
400
1.72k
        RETURN_IF_ERROR(_flush_writer->add_rows(block, row_offset, input_row_num));
401
1.72k
        row_offset += input_row_num;
402
1.72k
    } while (row_offset < block_row_num);
403
404
1.36k
    return Status::OK();
405
1.36k
}
406
407
2.16k
Status SegmentCreator::flush() {
408
2.16k
    if (_flush_writer == nullptr) {
409
972
        return Status::OK();
410
972
    }
411
1.19k
    RETURN_IF_ERROR(_flush_writer->flush());
412
1.19k
    _flush_writer.reset();
413
1.19k
    return Status::OK();
414
1.19k
}
415
416
Status SegmentCreator::flush_single_block(const Block* block, int32_t segment_id,
417
16
                                          int64_t* flush_size) {
418
16
    if (block->rows() == 0) {
419
0
        return Status::OK();
420
0
    }
421
16
    RETURN_IF_ERROR(_segment_flusher.flush_single_block(block, segment_id, flush_size));
422
16
    return Status::OK();
423
16
}
424
425
654
Status SegmentCreator::close() {
426
654
    RETURN_IF_ERROR(flush());
427
654
    RETURN_IF_ERROR(_segment_flusher.close());
428
654
    return Status::OK();
429
654
}
430
431
} // namespace doris