Coverage Report

Created: 2026-05-13 01:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/segment_creator.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/rowset/segment_creator.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <cerrno> // IWYU pragma: keep
22
#include <chrono>
23
#include <filesystem>
24
#include <memory>
25
#include <sstream>
26
#include <thread>
27
#include <utility>
28
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/config.h"
31
#include "common/exception.h"
32
#include "common/logging.h"
33
#include "common/status.h"
34
#include "core/assert_cast.h"
35
#include "core/block/block.h"
36
#include "core/block/columns_with_type_and_name.h"
37
#include "core/column/column.h"
38
#include "core/column/column_nullable.h"
39
#include "core/column/column_string.h"
40
#include "core/column/column_variant.h"
41
#include "core/data_type/data_type.h"
42
#include "core/types.h"
43
#include "io/fs/file_writer.h"
44
#include "storage/olap_define.h"
45
#include "storage/rowset/beta_rowset_writer.h" // SegmentStatistics
46
#include "storage/segment/segment_writer.h"
47
#include "storage/segment/vertical_segment_writer.h"
48
#include "storage/tablet/tablet_schema.h"
49
#include "storage/utils.h"
50
#include "util/debug_points.h"
51
#include "util/json/json_parser.h"
52
#include "util/pretty_printer.h"
53
#include "util/stopwatch.hpp"
54
55
namespace doris {
56
using namespace ErrorCode;
57
58
SegmentFlusher::SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection& seg_files,
59
                               InvertedIndexFileCollection& idx_files)
60
1.00k
        : _context(context), _seg_files(seg_files), _idx_files(idx_files) {}
61
62
1.00k
SegmentFlusher::~SegmentFlusher() = default;
63
64
// NOLINTNEXTLINE(readability-function-cognitive-complexity)
65
Status SegmentFlusher::flush_single_block(const Block* block, int32_t segment_id,
66
12
                                          int64_t* flush_size) {
67
12
    if (block->rows() == 0) {
68
0
        return Status::OK();
69
0
    }
70
12
    Block flush_block(*block);
71
12
    bool no_compression = flush_block.bytes() <= config::segment_compression_threshold_kb * 1024;
72
12
    if (config::enable_vertical_segment_writer) {
73
12
        std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
74
12
        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
75
12
        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows()));
76
12
        RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
77
12
    } else {
78
0
        std::unique_ptr<segment_v2::SegmentWriter> writer;
79
0
        RETURN_IF_ERROR(_create_segment_writer(writer, segment_id, no_compression));
80
0
        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0, flush_block.rows()));
81
0
        RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
82
0
    }
83
12
    return Status::OK();
84
12
}
85
86
641
Status SegmentFlusher::close() {
87
641
    RETURN_IF_ERROR(_seg_files.close());
88
641
    RETURN_IF_ERROR(_idx_files.finish_close());
89
641
    return Status::OK();
90
641
}
91
92
Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
93
1.72k
                                 const Block* block, size_t row_pos, size_t num_rows) {
94
1.72k
    RETURN_IF_ERROR(segment_writer->append_block(block, row_pos, num_rows));
95
1.72k
    _num_rows_written += num_rows;
96
1.72k
    return Status::OK();
97
1.72k
}
98
99
Status SegmentFlusher::_add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
100
12
                                 const Block* block, size_t row_pos, size_t num_rows) {
101
12
    RETURN_IF_ERROR(segment_writer->batch_block(block, row_pos, num_rows));
102
12
    RETURN_IF_ERROR(segment_writer->write_batch());
103
12
    _num_rows_written += num_rows;
104
12
    return Status::OK();
105
12
}
106
107
Status SegmentFlusher::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
108
1.19k
                                              int32_t segment_id, bool no_compression) {
109
1.19k
    io::FileWriterPtr segment_file_writer;
110
1.19k
    RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, segment_file_writer));
111
112
1.19k
    IndexFileWriterPtr index_file_writer;
113
1.19k
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
114
228
        RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, &index_file_writer));
115
228
    }
116
117
1.19k
    segment_v2::SegmentWriterOptions writer_options;
118
1.19k
    writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
119
1.19k
    writer_options.rowset_ctx = &_context;
120
1.19k
    writer_options.write_type = _context.write_type;
121
1.19k
    writer_options.max_rows_per_segment = _context.max_rows_per_segment;
122
1.19k
    writer_options.mow_ctx = _context.mow_context;
123
1.19k
    if (no_compression) {
124
0
        writer_options.compression_type = NO_COMPRESSION;
125
0
    }
126
127
1.19k
    writer = std::make_unique<segment_v2::SegmentWriter>(
128
1.19k
            segment_file_writer.get(), segment_id, _context.tablet_schema, _context.tablet,
129
1.19k
            _context.data_dir, writer_options, index_file_writer.get());
130
1.19k
    RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(segment_file_writer)));
131
1.19k
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
132
228
        RETURN_IF_ERROR(_idx_files.add(segment_id, std::move(index_file_writer)));
133
228
    }
134
1.19k
    auto s = writer->init();
135
1.19k
    if (!s.ok()) {
136
0
        LOG(WARNING) << "failed to init segment writer: " << s.to_string();
137
0
        writer.reset();
138
0
        return s;
139
0
    }
140
1.19k
    return Status::OK();
141
1.19k
}
142
143
Status SegmentFlusher::_create_segment_writer(
144
        std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int32_t segment_id,
145
12
        bool no_compression) {
146
12
    io::FileWriterPtr segment_file_writer;
147
12
    RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, segment_file_writer));
148
149
12
    IndexFileWriterPtr index_file_writer;
150
12
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
151
0
        RETURN_IF_ERROR(_context.file_writer_creator->create(segment_id, &index_file_writer));
152
0
    }
153
154
12
    segment_v2::VerticalSegmentWriterOptions writer_options;
155
12
    writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write;
156
12
    writer_options.rowset_ctx = &_context;
157
12
    writer_options.write_type = _context.write_type;
158
12
    writer_options.mow_ctx = _context.mow_context;
159
12
    if (no_compression) {
160
12
        writer_options.compression_type = NO_COMPRESSION;
161
12
    }
162
163
12
    writer = std::make_unique<segment_v2::VerticalSegmentWriter>(
164
12
            segment_file_writer.get(), segment_id, _context.tablet_schema, _context.tablet,
165
12
            _context.data_dir, writer_options, index_file_writer.get());
166
12
    RETURN_IF_ERROR(_seg_files.add(segment_id, std::move(segment_file_writer)));
167
12
    if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
168
0
        RETURN_IF_ERROR(_idx_files.add(segment_id, std::move(index_file_writer)));
169
0
    }
170
12
    auto s = writer->init();
171
12
    if (!s.ok()) {
172
0
        LOG(WARNING) << "failed to init segment writer: " << s.to_string();
173
0
        writer.reset();
174
0
        return s;
175
0
    }
176
177
12
    VLOG_DEBUG << "create new segment writer, tablet_id:" << _context.tablet_id
178
0
               << " segment id: " << segment_id << " filename: " << writer->data_dir_path()
179
0
               << " rowset_id:" << _context.rowset_id;
180
12
    return Status::OK();
181
12
}
182
183
Status SegmentFlusher::_flush_segment_writer(
184
12
        std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int64_t* flush_size) {
185
12
    MonotonicStopWatch total_timer;
186
12
    total_timer.start();
187
188
12
    uint32_t row_num = writer->num_rows_written();
189
12
    _num_rows_updated += writer->num_rows_updated();
190
12
    _num_rows_deleted += writer->num_rows_deleted();
191
12
    _num_rows_new_added += writer->num_rows_new_added();
192
12
    _num_rows_filtered += writer->num_rows_filtered();
193
194
12
    if (row_num == 0) {
195
0
        return Status::OK();
196
0
    }
197
198
12
    MonotonicStopWatch finalize_timer;
199
12
    finalize_timer.start();
200
12
    uint64_t segment_file_size;
201
12
    uint64_t common_index_size;
202
12
    Status s = writer->finalize(&segment_file_size, &common_index_size);
203
12
    finalize_timer.stop();
204
205
12
    if (!s.ok()) {
206
0
        return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string());
207
0
    }
208
209
12
    DBUG_EXECUTE_IF("SegmentFlusher._flush_segment_writer.after_finalize.sleep",
210
12
                    { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); });
211
212
12
    MonotonicStopWatch inverted_index_timer;
213
12
    inverted_index_timer.start();
214
12
    int64_t inverted_index_file_size = 0;
215
12
    RETURN_IF_ERROR(writer->close_inverted_index(&inverted_index_file_size));
216
12
    inverted_index_timer.stop();
217
218
12
    VLOG_DEBUG << "tablet_id:" << _context.tablet_id
219
0
               << " flushing filename: " << writer->data_dir_path()
220
0
               << " rowset_id:" << _context.rowset_id;
221
222
12
    KeyBoundsPB key_bounds;
223
12
    Slice min_key = writer->min_encoded_key();
224
12
    Slice max_key = writer->max_encoded_key();
225
12
    DCHECK_LE(min_key.compare(max_key), 0);
226
12
    key_bounds.set_min_key(min_key.to_string());
227
12
    key_bounds.set_max_key(max_key.to_string());
228
229
12
    uint32_t segment_id = writer->segment_id();
230
12
    SegmentStatistics segstat;
231
12
    segstat.row_num = row_num;
232
12
    segstat.data_size = segment_file_size;
233
12
    segstat.index_size = inverted_index_file_size;
234
12
    segstat.key_bounds = key_bounds;
235
236
12
    writer.reset();
237
238
12
    MonotonicStopWatch collector_timer;
239
12
    collector_timer.start();
240
12
    RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat));
241
12
    collector_timer.stop();
242
243
12
    total_timer.stop();
244
245
12
    LOG(INFO) << "tablet_id:" << _context.tablet_id
246
12
              << ", flushing rowset_dir: " << _context.tablet_path
247
12
              << ", rowset_id:" << _context.rowset_id
248
12
              << ", data size:" << PrettyPrinter::print_bytes(segstat.data_size)
249
12
              << ", index size:" << PrettyPrinter::print_bytes(segstat.index_size)
250
12
              << ", timing breakdown: total=" << total_timer.elapsed_time_milliseconds() << "ms"
251
12
              << ", finalize=" << finalize_timer.elapsed_time_milliseconds() << "ms"
252
12
              << ", inverted_index=" << inverted_index_timer.elapsed_time_milliseconds() << "ms"
253
12
              << ", collector=" << collector_timer.elapsed_time_milliseconds() << "ms";
254
255
12
    if (flush_size) {
256
12
        *flush_size = segment_file_size;
257
12
    }
258
12
    return Status::OK();
259
12
}
260
261
Status SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
262
1.19k
                                             int64_t* flush_size) {
263
1.19k
    MonotonicStopWatch total_timer;
264
1.19k
    total_timer.start();
265
266
1.19k
    uint32_t row_num = writer->num_rows_written();
267
1.19k
    _num_rows_updated += writer->num_rows_updated();
268
1.19k
    _num_rows_deleted += writer->num_rows_deleted();
269
1.19k
    _num_rows_new_added += writer->num_rows_new_added();
270
1.19k
    _num_rows_filtered += writer->num_rows_filtered();
271
272
1.19k
    if (row_num == 0) {
273
0
        return Status::OK();
274
0
    }
275
276
1.19k
    MonotonicStopWatch finalize_timer;
277
1.19k
    finalize_timer.start();
278
1.19k
    uint64_t segment_file_size;
279
1.19k
    uint64_t common_index_size;
280
1.19k
    Status s = writer->finalize(&segment_file_size, &common_index_size);
281
1.19k
    finalize_timer.stop();
282
283
1.19k
    if (!s.ok()) {
284
0
        return Status::Error(s.code(), "failed to finalize segment: {}", s.to_string());
285
0
    }
286
287
1.19k
    DBUG_EXECUTE_IF("SegmentFlusher._flush_segment_writer.after_finalize.sleep",
288
1.19k
                    { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); });
289
290
1.19k
    MonotonicStopWatch inverted_index_timer;
291
1.19k
    inverted_index_timer.start();
292
1.19k
    int64_t inverted_index_file_size = 0;
293
1.19k
    RETURN_IF_ERROR(writer->close_inverted_index(&inverted_index_file_size));
294
1.19k
    inverted_index_timer.stop();
295
296
1.19k
    VLOG_DEBUG << "tablet_id:" << _context.tablet_id
297
0
               << " flushing rowset_dir: " << _context.tablet_path
298
0
               << " rowset_id:" << _context.rowset_id;
299
300
1.19k
    KeyBoundsPB key_bounds;
301
1.19k
    Slice min_key = writer->min_encoded_key();
302
1.19k
    Slice max_key = writer->max_encoded_key();
303
1.19k
    DCHECK_LE(min_key.compare(max_key), 0);
304
1.19k
    key_bounds.set_min_key(min_key.to_string());
305
1.19k
    key_bounds.set_max_key(max_key.to_string());
306
307
1.19k
    uint32_t segment_id = writer->get_segment_id();
308
1.19k
    SegmentStatistics segstat;
309
1.19k
    segstat.row_num = row_num;
310
1.19k
    segstat.data_size = segment_file_size;
311
1.19k
    segstat.index_size = inverted_index_file_size;
312
1.19k
    segstat.key_bounds = key_bounds;
313
314
1.19k
    writer.reset();
315
316
1.19k
    MonotonicStopWatch collector_timer;
317
1.19k
    collector_timer.start();
318
1.19k
    RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat));
319
1.19k
    collector_timer.stop();
320
321
1.19k
    total_timer.stop();
322
323
1.19k
    LOG(INFO) << "tablet_id:" << _context.tablet_id
324
1.19k
              << ", flushing rowset_dir: " << _context.tablet_path
325
1.19k
              << ", rowset_id:" << _context.rowset_id
326
1.19k
              << ", data size:" << PrettyPrinter::print_bytes(segstat.data_size)
327
1.19k
              << ", index size:" << PrettyPrinter::print_bytes(segstat.index_size)
328
1.19k
              << ", timing breakdown: total=" << total_timer.elapsed_time_milliseconds() << "ms"
329
1.19k
              << ", finalize=" << finalize_timer.elapsed_time_milliseconds() << "ms"
330
1.19k
              << ", inverted_index=" << inverted_index_timer.elapsed_time_milliseconds() << "ms"
331
1.19k
              << ", collector=" << collector_timer.elapsed_time_milliseconds() << "ms";
332
333
1.19k
    if (flush_size) {
334
0
        *flush_size = segment_file_size;
335
0
    }
336
1.19k
    return Status::OK();
337
1.19k
}
338
339
Status SegmentFlusher::create_writer(std::unique_ptr<SegmentFlusher::Writer>& writer,
340
1.19k
                                     uint32_t segment_id) {
341
1.19k
    std::unique_ptr<segment_v2::SegmentWriter> segment_writer;
342
1.19k
    RETURN_IF_ERROR(_create_segment_writer(segment_writer, segment_id));
343
1.19k
    DCHECK(segment_writer != nullptr);
344
1.19k
    writer.reset(new SegmentFlusher::Writer(this, segment_writer));
345
1.19k
    return Status::OK();
346
1.19k
}
347
348
SegmentFlusher::Writer::Writer(SegmentFlusher* flusher,
349
                               std::unique_ptr<segment_v2::SegmentWriter>& segment_writer)
350
1.19k
        : _flusher(flusher), _writer(std::move(segment_writer)) {};
351
352
1.19k
SegmentFlusher::Writer::~Writer() = default;
353
354
1.19k
Status SegmentFlusher::Writer::flush() {
355
1.19k
    return _flusher->_flush_segment_writer(_writer);
356
1.19k
}
357
358
2.07k
int64_t SegmentFlusher::Writer::max_row_to_add(size_t row_avg_size_in_bytes) {
359
2.07k
    return _writer->max_row_to_add(row_avg_size_in_bytes);
360
2.07k
}
361
362
SegmentCreator::SegmentCreator(RowsetWriterContext& context, SegmentFileCollection& seg_files,
363
                               InvertedIndexFileCollection& idx_files)
364
1.00k
        : _segment_flusher(context, seg_files, idx_files) {}
365
366
1.36k
Status SegmentCreator::add_block(const Block* block) {
367
1.36k
    if (block->rows() == 0) {
368
0
        return Status::OK();
369
0
    }
370
371
1.36k
    size_t block_size_in_bytes = block->bytes();
372
1.36k
    size_t block_row_num = block->rows();
373
1.36k
    size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num);
374
1.36k
    size_t row_offset = 0;
375
376
1.36k
    if (_flush_writer == nullptr) {
377
838
        RETURN_IF_ERROR(_segment_flusher.create_writer(_flush_writer, allocate_segment_id()));
378
838
    }
379
380
1.72k
    do {
381
1.72k
        auto max_row_add = _flush_writer->max_row_to_add(row_avg_size_in_bytes);
382
1.72k
        if (UNLIKELY(max_row_add < 1)) {
383
            // no space for another single row, need flush now
384
354
            RETURN_IF_ERROR(flush());
385
354
            RETURN_IF_ERROR(_segment_flusher.create_writer(_flush_writer, allocate_segment_id()));
386
354
            max_row_add = _flush_writer->max_row_to_add(row_avg_size_in_bytes);
387
354
            DCHECK(max_row_add > 0);
388
354
        }
389
1.72k
        size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add));
390
1.72k
        RETURN_IF_ERROR(_flush_writer->add_rows(block, row_offset, input_row_num));
391
1.72k
        row_offset += input_row_num;
392
1.72k
    } while (row_offset < block_row_num);
393
394
1.36k
    return Status::OK();
395
1.36k
}
396
397
2.14k
Status SegmentCreator::flush() {
398
2.14k
    if (_flush_writer == nullptr) {
399
949
        return Status::OK();
400
949
    }
401
1.19k
    RETURN_IF_ERROR(_flush_writer->flush());
402
1.19k
    _flush_writer.reset();
403
1.19k
    return Status::OK();
404
1.19k
}
405
406
Status SegmentCreator::flush_single_block(const Block* block, int32_t segment_id,
407
12
                                          int64_t* flush_size) {
408
12
    if (block->rows() == 0) {
409
0
        return Status::OK();
410
0
    }
411
12
    RETURN_IF_ERROR(_segment_flusher.flush_single_block(block, segment_id, flush_size));
412
12
    return Status::OK();
413
12
}
414
415
641
Status SegmentCreator::close() {
416
641
    RETURN_IF_ERROR(flush());
417
641
    RETURN_IF_ERROR(_segment_flusher.close());
418
641
    return Status::OK();
419
641
}
420
421
} // namespace doris