Coverage Report

Created: 2026-03-15 22:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/segment_creator.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/rowset/segment_creator.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <cerrno> // IWYU pragma: keep
22
#include <filesystem>
23
#include <memory>
24
#include <sstream>
25
#include <utility>
26
27
#include "common/compiler_util.h" // IWYU pragma: keep
28
#include "common/config.h"
29
#include "common/exception.h"
30
#include "common/logging.h"
31
#include "common/status.h"
32
#include "core/assert_cast.h"
33
#include "core/block/block.h"
34
#include "core/block/columns_with_type_and_name.h"
35
#include "core/column/column.h"
36
#include "core/column/column_nullable.h"
37
#include "core/column/column_string.h"
38
#include "core/column/column_variant.h"
39
#include "core/data_type/data_type.h"
40
#include "core/types.h"
41
#include "io/fs/file_writer.h"
42
#include "storage/olap_define.h"
43
#include "storage/rowset/beta_rowset_writer.h" // SegmentStatistics
44
#include "storage/segment/segment_writer.h"
45
#include "storage/segment/vertical_segment_writer.h"
46
#include "storage/tablet/tablet_schema.h"
47
#include "storage/utils.h"
48
#include "util/json/json_parser.h"
49
#include "util/pretty_printer.h"
50
#include "util/stopwatch.hpp"
51
52
namespace doris {
53
using namespace ErrorCode;
54
55
SegmentFlusher::SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection& seg_files,
56
                               InvertedIndexFileCollection& idx_files)
57
985
        : _context(context), _seg_files(seg_files), _idx_files(idx_files) {}
58
59
985
SegmentFlusher::~SegmentFlusher() = default;
60
61
// NOLINTNEXTLINE(readability-function-cognitive-complexity)
62
Status SegmentFlusher::flush_single_block(const Block* block, int32_t segment_id,
63
12
                                          int64_t* flush_size) {
64
12
    if (block->rows() == 0) {
65
0
        return Status::OK();
66
0
    }
67
12
    Block flush_block(*block);
68
12
    bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024;
69
12
    if (config::enable_vertical_segment_writer) {
70
12
        std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
71
12
        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
72
12
        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows()));
73
12
        RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
74
12
    } else {
75
0
        std::unique_ptr<segment_v2::SegmentWriter> writer;
76
0
        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
77
0
        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows()));
78
0
        RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
79
0
    }
80
12
    return Status::OK();
81
12
}
82
83
631
Status SegmentFlusher::close() {
84
631
    RETURN_IF_ERROR(_seg_files.close());
85
631
    RETURN_IF_ERROR(_idx_files.finish_close());
86
631
    return Status::OK();
87
631
}
88
89
Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
90
1.71k
                                 const Block* block, size_t row_pos, size_t num_rows) {
91
1.71k
    RETURN_IF_ERROR(segment_writer->append_block(block, row_pos, num_rows));
92
1.71k
    _num_rows_written += num_rows;
93
1.71k
    return Status::OK();
94
1.71k
}
95
96
Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
97
12
                                 const Block* block, size_t row_pos, size_t num_rows) {
98
12
    RETURN_IF_ERROR(segment_writer->batch_block(block, row_pos, num_rows));
99
12
    RETURN_IF_ERROR(segment_writer->write_batch());
100
12
    _num_rows_written += num_rows;
101
12
    return Status::OK();
102
12
}
103
104
Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
105
1.18k
                                              int32_t segment_id, bool no_compression) {
106
1.18k
    io::FileWriterPtr segment_file_writer;
107
1.18k
    RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, segment_file_writer));
108
109
1.18k
    IndexFileWriterPtr index_file_writer;
110
1.18k
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
111
228
        RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, &index_file_writer));
112
228
    }
113
114
1.18k
    segment_v2::SegmentWriterOptions writer_options;
115
1.18k
    writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
116
1.18k
    writer_options.rowset_ctx = &_context;
117
1.18k
    writer_options.write_type = _context.write_type;
118
1.18k
    writer_options.max_rows_per_segment = _context.max_rows_per_segment;
119
1.18k
    writer_options.mow_ctx = _context.mow_context;
120
1.18k
    if (no_compression) {
121
0
        writer_options.compression_type = NO_COMPRESSION;
122
0
    }
123
124
1.18k
    writer = std::make_unique<segment_v2::SegmentWriter>(
125
1.18k
            segment_file_writer.get(), segment_id, _context.tablet_schema, _context.tablet,
126
1.18k
            _context.data_dir, writer_options, index_file_writer.get());
127
1.18k
    RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(segment_file_writer)));
128
1.18k
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
129
228
        RETURN_IF_ERROR(_idx_files.add(segment_id, std::move(index_file_writer)));
130
228
    }
131
1.18k
    auto s = writer->init();
132
1.18k
    if (!s.ok()) {
133
0
        LOG(WARNING) << "failed to init segment writer: " << s.to_string();
134
0
        writer.reset();
135
0
        return s;
136
0
    }
137
1.18k
    return Status::OK();
138
1.18k
}
139
140
Status SegmentFlusher::_create_segment_writer(
141
        std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int32_t segment_id,
142
12
        bool no_compression) {
143
12
    io::FileWriterPtr segment_file_writer;
144
12
    RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, segment_file_writer));
145
146
12
    IndexFileWriterPtr index_file_writer;
147
12
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
148
0
        RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, &index_file_writer));
149
0
    }
150
151
12
    segment_v2::VerticalSegmentWriterOptions writer_options;
152
12
    writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
153
12
    writer_options.rowset_ctx = &_context;
154
12
    writer_options.write_type = _context.write_type;
155
12
    writer_options.mow_ctx = _context.mow_context;
156
12
    if (no_compression) {
157
12
        writer_options.compression_type = NO_COMPRESSION;
158
12
    }
159
160
12
    writer = std::make_unique<segment_v2::VerticalSegmentWriter>(
161
12
            segment_file_writer.get(), segment_id, _context.tablet_schema, _context.tablet,
162
12
            _context.data_dir, writer_options, index_file_writer.get());
163
12
    RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(segment_file_writer)));
164
12
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
165
0
        RETURN_IF_ERROR(_idx_files.add(segment_id, std::move(index_file_writer)));
166
0
    }
167
12
    auto s = writer->init();
168
12
    if (!s.ok()) {
169
0
        LOG(WARNING) << "failed to init segment writer: " << s.to_string();
170
0
        writer.reset();
171
0
        return s;
172
0
    }
173
174
12
    VLOG_DEBUG << "create new segment writer, tablet_id:" << _context.tablet_id
175
0
               << " segment id: " << segment_id << " filename: " << writer->data_dir_path()
176
0
               << " rowset_id:" << _context.rowset_id;
177
12
    return Status::OK();
178
12
}
179
180
Status SegmentFlusher::_flush_segment_writer(
181
12
        std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int64_t* flush_size) {
182
12
    MonotonicStopWatch total_timer;
183
12
    total_timer.start();
184
185
12
    uint32_t row_num = writer->num_rows_written();
186
12
    _num_rows_updated += writer->num_rows_updated();
187
12
    _num_rows_deleted += writer->num_rows_deleted();
188
12
    _num_rows_new_added += writer->num_rows_new_added();
189
12
    _num_rows_filtered += writer->num_rows_filtered();
190
191
12
    if (row_num == 0) {
192
0
        return Status::OK();
193
0
    }
194
195
12
    MonotonicStopWatch finalize_timer;
196
12
    finalize_timer.start();
197
12
    uint64_t segment_file_size;
198
12
    uint64_t common_index_size;
199
12
    Status s = writer->finalize(&segment_file_size, &common_index_size);
200
12
    finalize_timer.stop();
201
202
12
    if (!s.ok()) {
203
0
        return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string());
204
0
    }
205
206
12
    MonotonicStopWatch inverted_index_timer;
207
12
    inverted_index_timer.start();
208
12
    int64_t inverted_index_file_size = 0;
209
12
    RETURN_IF_ERROR(writer->close_inverted_index(&inverted_index_file_size));
210
12
    inverted_index_timer.stop();
211
212
12
    VLOG_DEBUG << "tablet_id:" << _context.tablet_id
213
0
               << " flushing filename: " << writer->data_dir_path()
214
0
               << " rowset_id:" << _context.rowset_id;
215
216
12
    KeyBoundsPB key_bounds;
217
12
    Slice min_key = writer->min_encoded_key();
218
12
    Slice max_key = writer->max_encoded_key();
219
12
    DCHECK_LE(min_key.compare(max_key), 0);
220
12
    key_bounds.set_min_key(min_key.to_string());
221
12
    key_bounds.set_max_key(max_key.to_string());
222
223
12
    uint32_t segment_id = writer->segment_id();
224
12
    SegmentStatistics segstat;
225
12
    segstat.row_num = row_num;
226
12
    segstat.data_size = segment_file_size;
227
12
    segstat.index_size = inverted_index_file_size;
228
12
    segstat.key_bounds = key_bounds;
229
230
12
    writer.reset();
231
232
12
    MonotonicStopWatch collector_timer;
233
12
    collector_timer.start();
234
12
    RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat));
235
12
    collector_timer.stop();
236
237
12
    total_timer.stop();
238
239
12
    LOG(INFO) << "tablet_id:" << _context.tablet_id
240
12
              << ", flushing rowset_dir: " << _context.tablet_path
241
12
              << ", rowset_id:" << _context.rowset_id
242
12
              << ", data size:" << PrettyPrinter::print_bytes(segstat.data_size)
243
12
              << ", index size:" << PrettyPrinter::print_bytes(segstat.index_size)
244
12
              << ", timing breakdown: total=" << total_timer.elapsed_time_milliseconds() << "ms"
245
12
              << ", finalize=" << finalize_timer.elapsed_time_milliseconds() << "ms"
246
12
              << ", inverted_index=" << inverted_index_timer.elapsed_time_milliseconds() << "ms"
247
12
              << ", collector=" << collector_timer.elapsed_time_milliseconds() << "ms";
248
249
12
    if (flush_size) {
250
12
        *flush_size = segment_file_size;
251
12
    }
252
12
    return Status::OK();
253
12
}
254
255
Status SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
256
1.18k
                                             int64_t* flush_size) {
257
1.18k
    MonotonicStopWatch total_timer;
258
1.18k
    total_timer.start();
259
260
1.18k
    uint32_t row_num = writer->num_rows_written();
261
1.18k
    _num_rows_updated += writer->num_rows_updated();
262
1.18k
    _num_rows_deleted += writer->num_rows_deleted();
263
1.18k
    _num_rows_new_added += writer->num_rows_new_added();
264
1.18k
    _num_rows_filtered += writer->num_rows_filtered();
265
266
1.18k
    if (row_num == 0) {
267
0
        return Status::OK();
268
0
    }
269
270
1.18k
    MonotonicStopWatch finalize_timer;
271
1.18k
    finalize_timer.start();
272
1.18k
    uint64_t segment_file_size;
273
1.18k
    uint64_t common_index_size;
274
1.18k
    Status s = writer->finalize(&segment_file_size, &common_index_size);
275
1.18k
    finalize_timer.stop();
276
277
1.18k
    if (!s.ok()) {
278
0
        return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string());
279
0
    }
280
281
1.18k
    MonotonicStopWatch inverted_index_timer;
282
1.18k
    inverted_index_timer.start();
283
1.18k
    int64_t inverted_index_file_size = 0;
284
1.18k
    RETURN_IF_ERROR(writer->close_inverted_index(&inverted_index_file_size));
285
1.18k
    inverted_index_timer.stop();
286
287
1.18k
    VLOG_DEBUG << "tablet_id:" << _context.tablet_id
288
0
               << " flushing rowset_dir: " << _context.tablet_path
289
0
               << " rowset_id:" << _context.rowset_id;
290
291
1.18k
    KeyBoundsPB key_bounds;
292
1.18k
    Slice min_key = writer->min_encoded_key();
293
1.18k
    Slice max_key = writer->max_encoded_key();
294
1.18k
    DCHECK_LE(min_key.compare(max_key), 0);
295
1.18k
    key_bounds.set_min_key(min_key.to_string());
296
1.18k
    key_bounds.set_max_key(max_key.to_string());
297
298
1.18k
    uint32_t segment_id = writer->get_segment_id();
299
1.18k
    SegmentStatistics segstat;
300
1.18k
    segstat.row_num = row_num;
301
1.18k
    segstat.data_size = segment_file_size;
302
1.18k
    segstat.index_size = inverted_index_file_size;
303
1.18k
    segstat.key_bounds = key_bounds;
304
305
1.18k
    writer.reset();
306
307
1.18k
    MonotonicStopWatch collector_timer;
308
1.18k
    collector_timer.start();
309
1.18k
    RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat));
310
1.18k
    collector_timer.stop();
311
312
1.18k
    total_timer.stop();
313
314
1.18k
    LOG(INFO) << "tablet_id:" << _context.tablet_id
315
1.18k
              << ", flushing rowset_dir: " << _context.tablet_path
316
1.18k
              << ", rowset_id:" << _context.rowset_id
317
1.18k
              << ", data size:" << PrettyPrinter::print_bytes(segstat.data_size)
318
1.18k
              << ", index size:" << PrettyPrinter::print_bytes(segstat.index_size)
319
1.18k
              << ", timing breakdown: total=" << total_timer.elapsed_time_milliseconds() << "ms"
320
1.18k
              << ", finalize=" << finalize_timer.elapsed_time_milliseconds() << "ms"
321
1.18k
              << ", inverted_index=" << inverted_index_timer.elapsed_time_milliseconds() << "ms"
322
1.18k
              << ", collector=" << collector_timer.elapsed_time_milliseconds() << "ms";
323
324
1.18k
    if (flush_size) {
325
0
        *flush_size = segment_file_size;
326
0
    }
327
1.18k
    return Status::OK();
328
1.18k
}
329
330
Status SegmentFlusher::create_writer(std::unique_ptr<SegmentFlusher::Writer>& writer,
331
1.18k
                                     uint32_t segment_id) {
332
1.18k
    std::unique_ptr<segment_v2::SegmentWriter> segment_writer;
333
1.18k
    RETURN_IF_ERROR(_create_segment_writer(segment_writer, segment_id));
334
1.18k
    DCHECK(segment_writer != nullptr);
335
1.18k
    writer.reset(new SegmentFlusher::Writer(this, segment_writer));
336
1.18k
    return Status::OK();
337
1.18k
}
338
339
SegmentFlusher::Writer::Writer(SegmentFlusher* flusher,
340
                               std::unique_ptr<segment_v2::SegmentWriter>& segment_writer)
341
1.18k
        : _flusher(flusher), _writer(std::move(segment_writer)) {};
342
343
1.18k
SegmentFlusher::Writer::~Writer() = default;
344
345
1.18k
Status SegmentFlusher::Writer::flush() {
346
1.18k
    return _flusher->_flush_segment_writer(_writer);
347
1.18k
}
348
349
2.06k
int64_t SegmentFlusher::Writer::max_row_to_add(size_t row_avg_size_in_bytes) {
350
2.06k
    return _writer->max_row_to_add(row_avg_size_in_bytes);
351
2.06k
}
352
353
SegmentCreator::SegmentCreator(RowsetWriterContext& context, SegmentFileCollection& seg_files,
354
                               InvertedIndexFileCollection& idx_files)
355
985
        : _segment_flusher(context, seg_files, idx_files) {}
356
357
1.35k
Status SegmentCreator::add_block(const Block* block) {
358
1.35k
    if (block->rows() == 0) {
359
0
        return Status::OK();
360
0
    }
361
362
1.35k
    size_t block_size_in_bytes = block->bytes();
363
1.35k
    size_t block_row_num = block->rows();
364
1.35k
    size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num);
365
1.35k
    size_t row_offset = 0;
366
367
1.35k
    if (_flush_writer == nullptr) {
368
828
        RETURN_IF_ERROR(_segment_flusher.create_writer(_flush_writer, allocate_segment_id()));
369
828
    }
370
371
1.71k
    do {
372
1.71k
        auto max_row_add = _flush_writer->max_row_to_add(row_avg_size_in_bytes);
373
1.71k
        if (UNLIKELY(max_row_add < 1)) {
374
            // no space for another single row, need flush now
375
354
            RETURN_IF_ERROR(flush());
376
354
            RETURN_IF_ERROR(_segment_flusher.create_writer(_flush_writer, allocate_segment_id()));
377
354
            max_row_add = _flush_writer->max_row_to_add(row_avg_size_in_bytes);
378
354
            DCHECK(max_row_add > 0);
379
354
        }
380
1.71k
        size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add));
381
1.71k
        RETURN_IF_ERROR(_flush_writer->add_rows(block, row_offset, input_row_num));
382
1.71k
        row_offset += input_row_num;
383
1.71k
    } while (row_offset < block_row_num);
384
385
1.35k
    return Status::OK();
386
1.35k
}
387
388
2.11k
Status SegmentCreator::flush() {
389
2.11k
    if (_flush_writer == nullptr) {
390
935
        return Status::OK();
391
935
    }
392
1.18k
    RETURN_IF_ERROR(_flush_writer->flush());
393
1.18k
    _flush_writer.reset();
394
1.18k
    return Status::OK();
395
1.18k
}
396
397
Status SegmentCreator::flush_single_block(const Block* block, int32_t segment_id,
398
12
                                          int64_t* flush_size) {
399
12
    if (block->rows() == 0) {
400
0
        return Status::OK();
401
0
    }
402
12
    RETURN_IF_ERROR(_segment_flusher.flush_single_block(block, segment_id, flush_size));
403
12
    return Status::OK();
404
12
}
405
406
631
Status SegmentCreator::close() {
407
631
    RETURN_IF_ERROR(flush());
408
631
    RETURN_IF_ERROR(_segment_flusher.close());
409
631
    return Status::OK();
410
631
}
411
412
} // namespace doris