Coverage Report

Created: 2026-06-22 19:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/segment_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/segment_writer.h"
19
20
#include <assert.h>
21
#include <gen_cpp/segment_v2.pb.h>
22
#include <parallel_hashmap/phmap.h>
23
24
#include <algorithm>
25
26
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
27
#include <crc32c/crc32c.h>
28
29
#include "cloud/config.h"
30
#include "common/cast_set.h"
31
#include "common/compiler_util.h" // IWYU pragma: keep
32
#include "common/config.h"
33
#include "common/logging.h" // LOG
34
#include "common/status.h"
35
#include "core/block/block.h"
36
#include "core/block/column_with_type_and_name.h"
37
#include "core/column/column_nullable.h"
38
#include "core/data_type/primitive_type.h"
39
#include "core/field.h"
40
#include "core/types.h"
41
#include "core/value/vdatetime_value.h"
42
#include "io/cache/block_file_cache.h"
43
#include "io/cache/block_file_cache_factory.h"
44
#include "io/fs/file_system.h"
45
#include "io/fs/file_writer.h"
46
#include "io/fs/local_file_system.h"
47
#include "runtime/exec_env.h"
48
#include "runtime/memory/mem_tracker.h"
49
#include "storage/data_dir.h"
50
#include "storage/index/index_file_writer.h"
51
#include "storage/index/index_writer.h"
52
#include "storage/index/inverted/inverted_index_fs_directory.h"
53
#include "storage/index/primary_key_index.h"
54
#include "storage/index/short_key_index.h"
55
#include "storage/iterator/olap_data_convertor.h"
56
#include "storage/key_coder.h"
57
#include "storage/mow/key_probe.h"
58
#include "storage/olap_common.h"
59
#include "storage/olap_define.h"
60
#include "storage/rowset/rowset_writer_context.h" // RowsetWriterContext
61
#include "storage/rowset/segment_creator.h"
62
#include "storage/segment/column_writer.h" // ColumnWriter
63
#include "storage/segment/encoding_info.h"
64
#include "storage/segment/external_col_meta_util.h"
65
#include "storage/segment/page_io.h"
66
#include "storage/segment/page_pointer.h"
67
#include "storage/segment/variant/variant_ext_meta_writer.h"
68
#include "storage/segment/variant_stats_calculator.h"
69
#include "storage/storage_engine.h"
70
#include "storage/tablet/tablet_schema.h"
71
#include "storage/utils.h"
72
#include "util/coding.h"
73
#include "util/faststring.h"
74
#include "util/simd/bits.h"
75
namespace doris {
76
namespace segment_v2 {
77
78
using namespace ErrorCode;
79
80
const char* k_segment_magic = "D0R1";
81
const uint32_t k_segment_magic_length = 4;
82
83
2.46k
inline std::string segment_mem_tracker_name(uint32_t segment_id) {
84
2.46k
    return "SegmentWriter:Segment-" + std::to_string(segment_id);
85
2.46k
}
86
87
SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id,
88
                             TabletSchemaSPtr tablet_schema, BaseTabletSPtr tablet,
89
                             DataDir* data_dir, const SegmentWriterOptions& opts,
90
                             IndexFileWriter* index_file_writer)
91
2.46k
        : _segment_id(segment_id),
92
2.46k
          _tablet_schema(std::move(tablet_schema)),
93
2.46k
          _tablet(std::move(tablet)),
94
2.46k
          _data_dir(data_dir),
95
2.46k
          _opts(opts),
96
2.46k
          _file_writer(file_writer),
97
2.46k
          _index_file_writer(index_file_writer),
98
2.46k
          _mem_tracker(std::make_unique<MemTracker>(segment_mem_tracker_name(segment_id))),
99
2.46k
          _key_encoder(*_tablet_schema, _is_mow()) {
100
2.46k
    CHECK_NOTNULL(file_writer);
101
2.46k
    _num_short_key_columns = _tablet_schema->num_short_key_columns();
102
2.46k
}
103
104
2.46k
SegmentWriter::~SegmentWriter() {
105
2.46k
    _mem_tracker->release(_mem_tracker->consumption());
106
2.46k
}
107
108
void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t column_id,
109
10.3k
                                     const TabletColumn& column, const ColumnWriterOptions& opts) {
110
10.3k
    meta->set_column_id(column_id);
111
10.3k
    meta->set_type(int(column.type()));
112
10.3k
    meta->set_length(column.length());
113
10.3k
    meta->set_encoding(EncodingInfo::resolve_default_encoding(opts.storage_format, column));
114
10.3k
    meta->set_compression(_opts.compression_type);
115
10.3k
    meta->set_is_nullable(column.is_nullable());
116
10.3k
    meta->set_default_value(column.default_value());
117
10.3k
    meta->set_precision(column.precision());
118
10.3k
    meta->set_frac(column.frac());
119
10.3k
    if (column.has_path_info()) {
120
289
        column.path_info_ptr()->to_protobuf(meta->mutable_column_path_info(),
121
289
                                            column.parent_unique_id());
122
289
    }
123
10.3k
    meta->set_unique_id(column.unique_id());
124
10.3k
    for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
125
16
        init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i), opts);
126
16
    }
127
10.3k
    meta->set_result_is_nullable(column.get_result_is_nullable());
128
10.3k
    meta->set_function_name(column.get_aggregation_name());
129
10.3k
    meta->set_be_exec_version(column.get_be_exec_version());
130
10.3k
    if (column.is_variant_type()) {
131
286
        meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count());
132
286
        meta->set_variant_enable_doc_mode(column.variant_enable_doc_mode());
133
286
    }
134
10.3k
}
135
136
1.26k
Status SegmentWriter::init() {
137
1.26k
    std::vector<uint32_t> column_ids;
138
1.26k
    auto column_cnt = cast_set<int>(_tablet_schema->num_columns());
139
8.17k
    for (uint32_t i = 0; i < column_cnt; ++i) {
140
6.90k
        column_ids.emplace_back(i);
141
6.90k
    }
142
1.26k
    return init(column_ids, true);
143
1.26k
}
144
145
Status SegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& column,
146
10.3k
                                            const TabletSchemaSPtr& schema) {
147
10.3k
    ColumnWriterOptions opts;
148
10.3k
    opts.meta = _footer.add_columns();
149
10.3k
    opts.storage_format = schema->storage_format();
150
151
10.3k
    init_column_meta(opts.meta, cid, column, opts);
152
153
    // now we create zone map for key columns in AGG_KEYS or all column in UNIQUE_KEYS or DUP_KEYS
154
    // except for columns whose type don't support zone map.
155
10.3k
    opts.need_zone_map = column.is_key() || schema->keys_type() != KeysType::AGG_KEYS;
156
10.3k
    opts.need_bloom_filter = column.is_bf_column();
157
10.3k
    if (opts.need_bloom_filter) {
158
4
        opts.bf_options.fpp = schema->has_bf_fpp() ? schema->bloom_filter_fpp() : 0.05;
159
4
    }
160
10.3k
    auto* tablet_index = schema->get_ngram_bf_index(column.unique_id());
161
10.3k
    if (tablet_index) {
162
0
        opts.need_bloom_filter = true;
163
0
        opts.is_ngram_bf_index = true;
164
        //narrow convert from int32_t to uint8_t and uint16_t which is dangerous
165
0
        auto gram_size = tablet_index->get_gram_size();
166
0
        auto gram_bf_size = tablet_index->get_gram_bf_size();
167
0
        if (gram_size > 256 || gram_size < 1) {
168
0
            return Status::NotSupported("Do not support ngram bloom filter for ngram_size: ",
169
0
                                        gram_size);
170
0
        }
171
0
        if (gram_bf_size > 65535 || gram_bf_size < 64) {
172
0
            return Status::NotSupported("Do not support ngram bloom filter for bf_size: ",
173
0
                                        gram_bf_size);
174
0
        }
175
0
        opts.gram_size = cast_set<uint8_t>(gram_size);
176
0
        opts.gram_bf_size = cast_set<uint16_t>(gram_bf_size);
177
0
    }
178
179
10.3k
    bool skip_inverted_index = false;
180
10.3k
    if (_opts.rowset_ctx != nullptr) {
181
        // skip write inverted index for index compaction column
182
8.10k
        skip_inverted_index =
183
8.10k
                _opts.rowset_ctx->columns_to_do_index_compaction.count(column.unique_id()) > 0;
184
8.10k
    }
185
    // skip write inverted index on load if skip_write_index_on_load is true
186
10.3k
    if (_opts.write_type == DataWriteType::TYPE_DIRECT && schema->skip_write_index_on_load()) {
187
0
        skip_inverted_index = true;
188
0
    }
189
    // indexes for this column
190
10.3k
    if (!skip_inverted_index) {
191
9.88k
        auto inverted_indexs = schema->inverted_indexs(column);
192
9.88k
        if (!inverted_indexs.empty()) {
193
2.18k
            opts.inverted_indexes = inverted_indexs;
194
2.18k
            opts.need_inverted_index = true;
195
2.18k
            DCHECK(_index_file_writer != nullptr);
196
2.18k
        }
197
9.88k
    }
198
    // indexes for this column
199
10.3k
    if (const auto& index = schema->ann_index(column); index != nullptr) {
200
1
        opts.ann_index = index;
201
1
        opts.need_ann_index = true;
202
1
        DCHECK(_index_file_writer != nullptr);
203
1
    }
204
205
10.3k
    opts.index_file_writer = _index_file_writer;
206
207
10.3k
#define DISABLE_INDEX_IF_FIELD_TYPE(TYPE)                     \
208
92.7k
    if (column.type() == FieldType::OLAP_FIELD_TYPE_##TYPE) { \
209
290
        opts.need_zone_map = false;                           \
210
290
        opts.need_bloom_filter = false;                       \
211
290
    }
212
213
10.3k
    DISABLE_INDEX_IF_FIELD_TYPE(STRUCT)
214
10.3k
    DISABLE_INDEX_IF_FIELD_TYPE(ARRAY)
215
10.3k
    DISABLE_INDEX_IF_FIELD_TYPE(JSONB)
216
10.3k
    DISABLE_INDEX_IF_FIELD_TYPE(AGG_STATE)
217
10.3k
    DISABLE_INDEX_IF_FIELD_TYPE(MAP)
218
10.3k
    DISABLE_INDEX_IF_FIELD_TYPE(BITMAP)
219
10.3k
    DISABLE_INDEX_IF_FIELD_TYPE(HLL)
220
10.3k
    DISABLE_INDEX_IF_FIELD_TYPE(QUANTILE_STATE)
221
10.3k
    DISABLE_INDEX_IF_FIELD_TYPE(VARIANT)
222
223
10.3k
#undef DISABLE_INDEX_IF_FIELD_TYPE
224
225
10.3k
    int64_t storage_page_size = _tablet_schema->storage_page_size();
226
    // storage_page_size must be between 4KB and 10MB.
227
10.3k
    if (storage_page_size >= 4096 && storage_page_size <= 10485760) {
228
10.3k
        opts.data_page_size = storage_page_size;
229
10.3k
    }
230
10.3k
    opts.dict_page_size = _tablet_schema->storage_dict_page_size();
231
10.3k
    DBUG_EXECUTE_IF("VerticalSegmentWriter._create_column_writer.storage_page_size", {
232
10.3k
        auto table_id = DebugPoints::instance()->get_debug_param_or_default<int64_t>(
233
10.3k
                "VerticalSegmentWriter._create_column_writer.storage_page_size", "table_id",
234
10.3k
                INT_MIN);
235
10.3k
        auto target_data_page_size = DebugPoints::instance()->get_debug_param_or_default<int64_t>(
236
10.3k
                "VerticalSegmentWriter._create_column_writer.storage_page_size",
237
10.3k
                "storage_page_size", INT_MIN);
238
10.3k
        if (table_id == INT_MIN || target_data_page_size == INT_MIN) {
239
10.3k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
240
10.3k
                    "Debug point parameters missing: either 'table_id' or 'storage_page_size' not "
241
10.3k
                    "set.");
242
10.3k
        }
243
10.3k
        if (table_id == _tablet_schema->table_id() &&
244
10.3k
            opts.data_page_size != target_data_page_size) {
245
10.3k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
246
10.3k
                    "Mismatch in 'storage_page_size': expected size does not match the current "
247
10.3k
                    "data page size. "
248
10.3k
                    "Expected: " +
249
10.3k
                    std::to_string(target_data_page_size) +
250
10.3k
                    ", Actual: " + std::to_string(opts.data_page_size) + ".");
251
10.3k
        }
252
10.3k
    })
253
10.3k
    if (column.is_row_store_column()) {
254
        // smaller page size for row store column; encoding is already set to PLAIN /
255
        // PLAIN_V2 by init_column_meta via resolve_default_encoding().
256
1
        auto page_size = _tablet_schema->row_store_page_size();
257
1
        opts.data_page_size =
258
1
                (page_size > 0) ? page_size : segment_v2::ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
259
1
    }
260
261
10.3k
    opts.rowset_ctx = _opts.rowset_ctx;
262
10.3k
    opts.file_writer = _file_writer;
263
10.3k
    opts.compression_type = _opts.compression_type;
264
10.3k
    opts.footer = &_footer;
265
10.3k
    if (_opts.rowset_ctx != nullptr) {
266
8.10k
        opts.input_rs_readers = _opts.rowset_ctx->input_rs_readers;
267
8.10k
    }
268
269
10.3k
    std::unique_ptr<ColumnWriter> writer;
270
10.3k
    RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer));
271
10.3k
    RETURN_IF_ERROR(writer->init());
272
10.3k
    _column_writers.push_back(std::move(writer));
273
274
10.3k
    _olap_data_convertor->add_column_data_convertor(column);
275
10.3k
    return Status::OK();
276
10.3k
}
277
278
3.38k
Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) {
279
3.38k
    DCHECK(_column_writers.empty());
280
3.38k
    DCHECK(_column_ids.empty());
281
3.38k
    _has_key = has_key;
282
3.38k
    _column_writers.reserve(_tablet_schema->columns().size());
283
3.38k
    _column_ids.insert(_column_ids.end(), col_ids.begin(), col_ids.end());
284
3.38k
    _olap_data_convertor = std::make_unique<OlapBlockDataConvertor>();
285
3.38k
    if (_opts.compression_type == UNKNOWN_COMPRESSION) {
286
2.45k
        _opts.compression_type = _tablet_schema->compression_type();
287
2.45k
    }
288
289
    // Vertical compaction calls init() multiple times against the same writer; the footer accumulates entries
290
    // across calls, so this init()'s slice of footer columns starts at the current size.
291
3.38k
    const int variant_stats_footer_offset = _footer.columns_size();
292
3.38k
    RETURN_IF_ERROR(_create_writers(_tablet_schema, col_ids));
293
294
    // Initialize variant statistics calculator
295
3.38k
    _variant_stats_calculator = std::make_unique<VariantStatsCaculator>(
296
3.38k
            &_footer, _tablet_schema, col_ids, variant_stats_footer_offset);
297
298
    // we don't need the short key index for unique key merge on write table.
299
3.38k
    if (_has_key) {
300
2.45k
        if (_is_mow()) {
301
81
            size_t seq_col_length = 0;
302
81
            if (_tablet_schema->has_sequence_col()) {
303
28
                seq_col_length =
304
28
                        _tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1;
305
28
            }
306
81
            size_t rowid_length = 0;
307
81
            if (_is_mow_with_cluster_key()) {
308
0
                rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH;
309
0
                _short_key_index_builder.reset(
310
0
                        new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block));
311
0
            }
312
81
            _primary_key_index_builder.reset(
313
81
                    new PrimaryKeyIndexBuilder(_file_writer, seq_col_length, rowid_length));
314
81
            RETURN_IF_ERROR(_primary_key_index_builder->init());
315
2.37k
        } else {
316
2.37k
            _short_key_index_builder.reset(
317
2.37k
                    new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block));
318
2.37k
        }
319
2.45k
    }
320
3.38k
    return Status::OK();
321
3.38k
}
322
323
Status SegmentWriter::_create_writers(const TabletSchemaSPtr& tablet_schema,
324
3.38k
                                      const std::vector<uint32_t>& col_ids) {
325
3.38k
    _olap_data_convertor->reserve(col_ids.size());
326
10.3k
    for (auto& cid : col_ids) {
327
10.3k
        RETURN_IF_ERROR(_create_column_writer(cid, tablet_schema->column(cid), tablet_schema));
328
10.3k
    }
329
3.38k
    return Status::OK();
330
3.38k
}
331
332
3.95k
Status SegmentWriter::append_block(const Block* block, size_t row_pos, size_t num_rows) {
333
3.95k
    if (block->columns() < _column_writers.size()) {
334
0
        return Status::InternalError(
335
0
                "block->columns() < _column_writers.size(), block->columns()=" +
336
0
                std::to_string(block->columns()) +
337
0
                ", _column_writers.size()=" + std::to_string(_column_writers.size()) +
338
0
                ", _tablet_schema->dump_structure()=" + _tablet_schema->dump_structure());
339
0
    }
340
3.95k
    CHECK(block->columns() >= _column_writers.size())
341
0
            << ", block->columns()=" << block->columns()
342
0
            << ", _column_writers.size()=" << _column_writers.size()
343
0
            << ", _tablet_schema->dump_structure()=" << _tablet_schema->dump_structure();
344
3.95k
    _olap_data_convertor->set_source_content(block, row_pos, num_rows);
345
346
    // convert column data from engine format to storage layer format
347
3.95k
    std::vector<IOlapColumnDataAccessor*> key_columns;
348
3.95k
    IOlapColumnDataAccessor* seq_column = nullptr;
349
13.5k
    for (size_t id = 0; id < _column_writers.size(); ++id) {
350
        // olap data convertor alway start from id = 0
351
9.62k
        auto converted_result = _olap_data_convertor->convert_column_data(id);
352
9.62k
        if (!converted_result.first.ok()) {
353
0
            return converted_result.first;
354
0
        }
355
9.62k
        auto cid = _column_ids[id];
356
9.62k
        if (_has_key && cid < _tablet_schema->num_key_columns()) {
357
2.57k
            key_columns.push_back(converted_result.second);
358
7.05k
        } else if (_has_key && _tablet_schema->has_sequence_col() &&
359
7.05k
                   cid == _tablet_schema->sequence_col_idx()) {
360
92
            seq_column = converted_result.second;
361
92
        }
362
9.62k
        RETURN_IF_ERROR(_column_writers[id]->append(converted_result.second->get_nullmap(),
363
9.62k
                                                    converted_result.second->get_data(), num_rows));
364
9.62k
    }
365
3.95k
    if (_opts.write_type == DataWriteType::TYPE_COMPACTION) {
366
2.21k
        RETURN_IF_ERROR(
367
2.21k
                _variant_stats_calculator->calculate_variant_stats(block, row_pos, num_rows));
368
2.21k
    }
369
370
3.95k
    RETURN_IF_ERROR(build_key_index(key_columns, seq_column, num_rows));
371
372
3.95k
    _num_rows_written += num_rows;
373
3.95k
    _olap_data_convertor->clear_source_content();
374
3.95k
    return Status::OK();
375
3.95k
}
376
377
Status SegmentWriter::build_key_index(std::vector<IOlapColumnDataAccessor*>& key_columns,
378
3.95k
                                      IOlapColumnDataAccessor* seq_column, size_t num_rows) {
379
3.95k
    if (!_has_key) {
380
981
        return Status::OK();
381
981
    }
382
383
    // find all row pos for short key indexes
384
2.97k
    std::vector<size_t> short_key_pos;
385
2.97k
    if (UNLIKELY(_short_key_row_pos == 0 && _num_rows_written == 0)) {
386
2.39k
        short_key_pos.push_back(0);
387
2.39k
    }
388
13.6k
    while (_short_key_row_pos + _opts.num_rows_per_block < _num_rows_written + num_rows) {
389
10.6k
        _short_key_row_pos += _opts.num_rows_per_block;
390
10.6k
        short_key_pos.push_back(_short_key_row_pos - _num_rows_written);
391
10.6k
    }
392
393
2.97k
    if (_is_mow_with_cluster_key()) {
394
        // For CLUSTER BY tables:
395
        // 1) generate primary key index (unique keys)
396
0
        RETURN_IF_ERROR(_generate_primary_key_index(key_columns, seq_column, num_rows, true));
397
        // 2) generate short key index (cluster keys)
398
0
        key_columns.clear();
399
0
        for (const auto& cid : _tablet_schema->cluster_key_uids()) {
400
0
            auto cluster_key_index = _tablet_schema->field_index(cid);
401
0
            if (cluster_key_index == -1) {
402
0
                return Status::InternalError("could not find cluster key column with unique_id=" +
403
0
                                             std::to_string(cid) + " in tablet schema");
404
0
            }
405
0
            bool found = false;
406
0
            for (auto i = 0; i < _column_ids.size(); ++i) {
407
0
                if (_column_ids[i] == cluster_key_index) {
408
0
                    auto converted_result = _olap_data_convertor->convert_column_data(i);
409
0
                    if (!converted_result.first.ok()) {
410
0
                        return converted_result.first;
411
0
                    }
412
0
                    key_columns.push_back(converted_result.second);
413
0
                    found = true;
414
0
                    break;
415
0
                }
416
0
            }
417
0
            if (!found) {
418
0
                return Status::InternalError(
419
0
                        "could not found cluster key column with unique_id=" + std::to_string(cid) +
420
0
                        ", tablet schema index=" + std::to_string(cluster_key_index));
421
0
            }
422
0
        }
423
0
        return _generate_short_key_index(key_columns, num_rows, short_key_pos);
424
0
    }
425
2.97k
    if (_is_mow()) {
426
22
        return _generate_primary_key_index(key_columns, seq_column, num_rows, false);
427
22
    }
428
2.94k
    return _generate_short_key_index(key_columns, num_rows, short_key_pos);
429
2.97k
}
430
431
2.08k
int64_t SegmentWriter::max_row_to_add(size_t row_avg_size_in_bytes) {
432
2.08k
    auto segment_size = estimate_segment_size();
433
2.08k
    if (segment_size >= MAX_SEGMENT_SIZE || _num_rows_written >= _opts.max_rows_per_segment)
434
354
            [[unlikely]] {
435
354
        return 0;
436
354
    }
437
1.73k
    int64_t size_rows = ((int64_t)MAX_SEGMENT_SIZE - (int64_t)segment_size) / row_avg_size_in_bytes;
438
1.73k
    int64_t count_rows = (int64_t)_opts.max_rows_per_segment - _num_rows_written;
439
440
1.73k
    return std::min(size_rows, count_rows);
441
2.08k
}
442
443
// TODO(lingbin): Currently this function does not include the size of various indexes,
444
// We should make this more precise.
445
// NOTE: This function will be called when any row of data is added, so we need to
446
// make this function efficient.
447
2.41k
uint64_t SegmentWriter::estimate_segment_size() {
448
    // footer_size(4) + checksum(4) + segment_magic(4)
449
2.41k
    uint64_t size = 12;
450
9.67k
    for (auto& column_writer : _column_writers) {
451
9.67k
        size += column_writer->estimate_buffer_size();
452
9.67k
    }
453
2.41k
    if (_is_mow_with_cluster_key()) {
454
0
        size += _primary_key_index_builder->size() + _short_key_index_builder->size();
455
2.41k
    } else if (_is_mow()) {
456
12
        size += _primary_key_index_builder->size();
457
2.39k
    } else {
458
2.39k
        size += _short_key_index_builder->size();
459
2.39k
    }
460
461
    // update the mem_tracker of segment size
462
2.41k
    _mem_tracker->consume(size - _mem_tracker->consumption());
463
2.41k
    return size;
464
2.41k
}
465
466
3.38k
Status SegmentWriter::finalize_columns_data() {
467
3.38k
    if (_has_key) {
468
2.45k
        _row_count = _num_rows_written;
469
2.45k
    } else {
470
924
        DCHECK(_row_count == _num_rows_written)
471
0
                << "_row_count != _num_rows_written:" << _row_count << " vs. " << _num_rows_written;
472
924
        if (_row_count != _num_rows_written) {
473
0
            std::stringstream ss;
474
0
            ss << "_row_count != _num_rows_written:" << _row_count << " vs. " << _num_rows_written;
475
0
            LOG(WARNING) << ss.str();
476
0
            return Status::InternalError(ss.str());
477
0
        }
478
924
    }
479
3.38k
    _num_rows_written = 0;
480
481
10.3k
    for (auto& column_writer : _column_writers) {
482
10.3k
        RETURN_IF_ERROR(column_writer->finish());
483
10.3k
    }
484
3.38k
    RETURN_IF_ERROR(_write_data());
485
486
3.38k
    return Status::OK();
487
3.38k
}
488
489
3.38k
Status SegmentWriter::finalize_columns_index(uint64_t* index_size) {
490
3.38k
    uint64_t index_start = _file_writer->bytes_appended();
491
3.38k
    RETURN_IF_ERROR(_write_ordinal_index());
492
3.38k
    RETURN_IF_ERROR(_write_zone_map());
493
3.38k
    RETURN_IF_ERROR(_write_inverted_index());
494
3.38k
    RETURN_IF_ERROR(_write_ann_index());
495
3.38k
    RETURN_IF_ERROR(_write_bloom_filter_index());
496
497
3.38k
    *index_size = _file_writer->bytes_appended() - index_start;
498
3.38k
    if (_has_key) {
499
2.45k
        if (_is_mow_with_cluster_key()) {
500
            // 1. sort primary keys
501
0
            std::sort(_primary_keys.begin(), _primary_keys.end());
502
            // 2. write primary keys index
503
0
            std::string last_key;
504
0
            for (const auto& key : _primary_keys) {
505
0
                DCHECK(key.compare(last_key) > 0)
506
0
                        << "found duplicate key or key is not sorted! current key: " << key
507
0
                        << ", last key: " << last_key;
508
0
                RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
509
0
                last_key = key;
510
0
            }
511
512
0
            RETURN_IF_ERROR(_write_short_key_index());
513
0
            *index_size = _file_writer->bytes_appended() - index_start;
514
0
            RETURN_IF_ERROR(_write_primary_key_index());
515
0
            *index_size += _primary_key_index_builder->disk_size();
516
2.45k
        } else if (_is_mow()) {
517
81
            RETURN_IF_ERROR(_write_primary_key_index());
518
            // IndexedColumnWriter write data pages mixed with segment data, we should use
519
            // the stat from primary key index builder.
520
81
            *index_size += _primary_key_index_builder->disk_size();
521
2.37k
        } else {
522
2.37k
            RETURN_IF_ERROR(_write_short_key_index());
523
2.37k
            *index_size = _file_writer->bytes_appended() - index_start;
524
2.37k
        }
525
2.45k
    }
526
    // reset all column writers and data_conveter
527
3.38k
    clear();
528
529
3.38k
    return Status::OK();
530
3.38k
}
531
532
2.45k
Status SegmentWriter::finalize_footer(uint64_t* segment_file_size) {
533
2.45k
    RETURN_IF_ERROR(_write_footer());
534
    // finish
535
2.45k
    RETURN_IF_ERROR(_file_writer->close(true));
536
2.45k
    *segment_file_size = _file_writer->bytes_appended();
537
2.45k
    if (*segment_file_size == 0) {
538
0
        return Status::Corruption("Bad segment, file size = 0");
539
0
    }
540
2.45k
    return Status::OK();
541
2.45k
}
542
543
1.26k
Status SegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* index_size) {
544
1.26k
    MonotonicStopWatch timer;
545
1.26k
    timer.start();
546
    // check disk capacity
547
1.26k
    if (_data_dir != nullptr && _data_dir->reach_capacity_limit((int64_t)estimate_segment_size())) {
548
0
        return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed capacity limit, path: {}",
549
0
                                                        _data_dir->path_hash(), _data_dir->path());
550
0
    }
551
    // write data
552
1.26k
    RETURN_IF_ERROR(finalize_columns_data());
553
    // Get the index start before finalize_footer since this function would write new data.
554
1.26k
    uint64_t index_start = _file_writer->bytes_appended();
555
    // write index
556
1.26k
    RETURN_IF_ERROR(finalize_columns_index(index_size));
557
    // write footer
558
1.26k
    RETURN_IF_ERROR(finalize_footer(segment_file_size));
559
560
1.26k
    if (timer.elapsed_time() > 5000000000l) {
561
0
        LOG(INFO) << "segment flush consumes a lot time_ns " << timer.elapsed_time()
562
0
                  << ", segmemt_size " << *segment_file_size;
563
0
    }
564
    // When the cache type is not ttl(expiration time == 0), the data should be split into normal cache queue
565
    // and index cache queue
566
1.26k
    if (auto* cache_builder = _file_writer->cache_builder(); cache_builder != nullptr &&
567
1.26k
                                                             cache_builder->_expiration_time == 0 &&
568
1.26k
                                                             config::is_cloud_mode()) {
569
0
        auto size = *index_size + *segment_file_size;
570
0
        auto holder = cache_builder->allocate_cache_holder(index_start, size, _tablet->tablet_id());
571
0
        for (auto& segment : holder->file_blocks) {
572
0
            static_cast<void>(segment->change_cache_type(io::FileCacheType::INDEX));
573
0
        }
574
0
    }
575
1.26k
    return Status::OK();
576
1.26k
}
577
578
3.40k
void SegmentWriter::clear() {
579
10.3k
    for (auto& column_writer : _column_writers) {
580
10.3k
        column_writer.reset();
581
10.3k
    }
582
3.40k
    _column_writers.clear();
583
3.40k
    _column_ids.clear();
584
3.40k
    _olap_data_convertor.reset();
585
3.40k
}
586
587
// write column data to file one by one
588
3.38k
Status SegmentWriter::_write_data() {
589
10.3k
    for (auto& column_writer : _column_writers) {
590
10.3k
        RETURN_IF_ERROR(column_writer->write_data());
591
592
10.3k
        auto* column_meta = column_writer->get_column_meta();
593
10.3k
        DCHECK(column_meta != nullptr);
594
10.3k
        column_meta->set_compressed_data_bytes(
595
10.3k
                (column_meta->has_compressed_data_bytes() ? column_meta->compressed_data_bytes()
596
10.3k
                                                          : 0) +
597
10.3k
                column_writer->get_total_compressed_data_pages_bytes());
598
10.3k
        column_meta->set_uncompressed_data_bytes(
599
10.3k
                (column_meta->has_uncompressed_data_bytes() ? column_meta->uncompressed_data_bytes()
600
10.3k
                                                            : 0) +
601
10.3k
                column_writer->get_total_uncompressed_data_pages_bytes());
602
10.3k
        column_meta->set_raw_data_bytes(
603
10.3k
                (column_meta->has_raw_data_bytes() ? column_meta->raw_data_bytes() : 0) +
604
10.3k
                column_writer->get_raw_data_bytes());
605
10.3k
    }
606
3.38k
    return Status::OK();
607
3.38k
}
608
609
// write ordinal index after data has been written
610
3.38k
Status SegmentWriter::_write_ordinal_index() {
611
10.3k
    for (auto& column_writer : _column_writers) {
612
10.3k
        RETURN_IF_ERROR(column_writer->write_ordinal_index());
613
10.3k
    }
614
3.38k
    return Status::OK();
615
3.38k
}
616
617
3.38k
Status SegmentWriter::_write_zone_map() {
618
10.3k
    for (auto& column_writer : _column_writers) {
619
10.3k
        RETURN_IF_ERROR(column_writer->write_zone_map());
620
10.3k
    }
621
3.38k
    return Status::OK();
622
3.38k
}
623
624
3.38k
Status SegmentWriter::_write_inverted_index() {
625
10.3k
    for (auto& column_writer : _column_writers) {
626
10.3k
        RETURN_IF_ERROR(column_writer->write_inverted_index());
627
10.3k
    }
628
3.38k
    return Status::OK();
629
3.38k
}
630
631
3.38k
Status SegmentWriter::_write_ann_index() {
632
10.3k
    for (auto& column_writer : _column_writers) {
633
10.3k
        RETURN_IF_ERROR(column_writer->write_ann_index());
634
10.3k
    }
635
3.38k
    return Status::OK();
636
3.38k
}
637
638
3.38k
Status SegmentWriter::_write_bloom_filter_index() {
639
10.3k
    for (auto& column_writer : _column_writers) {
640
10.3k
        RETURN_IF_ERROR(column_writer->write_bloom_filter_index());
641
10.3k
    }
642
3.38k
    return Status::OK();
643
3.38k
}
644
645
2.37k
Status SegmentWriter::_write_short_key_index() {
646
2.37k
    std::vector<Slice> body;
647
2.37k
    PageFooterPB footer;
648
2.37k
    RETURN_IF_ERROR(_short_key_index_builder->finalize(_row_count, &body, &footer));
649
2.37k
    PagePointer pp;
650
    // short key index page is not compressed right now
651
2.37k
    RETURN_IF_ERROR(PageIO::write_page(_file_writer, body, footer, &pp));
652
2.37k
    pp.to_proto(_footer.mutable_short_key_index_page());
653
2.37k
    return Status::OK();
654
2.37k
}
655
656
81
Status SegmentWriter::_write_primary_key_index() {
657
81
    CHECK_EQ(_primary_key_index_builder->num_rows(), _row_count);
658
81
    return _primary_key_index_builder->finalize(_footer.mutable_primary_key_index_meta());
659
81
}
660
661
2.45k
Status SegmentWriter::_write_footer() {
662
2.45k
    _footer.set_num_rows(_row_count);
663
    // Decide whether to externalize ColumnMetaPB by tablet default, and stamp footer version
664
2.45k
    if (_tablet_schema->storage_format() == TabletStorageFormatPB::TABLET_STORAGE_FORMAT_V3) {
665
72
        _footer.set_version(SEGMENT_FOOTER_VERSION_V3_EXT_COL_META);
666
72
        VLOG_DEBUG << "use external column meta";
667
        // External ColumnMetaPB writing (optional)
668
72
        RETURN_IF_ERROR(ExternalColMetaUtil::write_external_column_meta(
669
72
                _file_writer, &_footer, _opts.compression_type,
670
72
                [this](const std::vector<Slice>& slices) { return _write_raw_data(slices); }));
671
72
    }
672
673
    // Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4)
674
2.45k
    std::string footer_buf;
675
2.45k
    VLOG_DEBUG << "footer " << _footer.DebugString();
676
2.45k
    if (!_footer.SerializeToString(&footer_buf)) {
677
0
        return Status::InternalError("failed to serialize segment footer");
678
0
    }
679
680
2.45k
    faststring fixed_buf;
681
    // footer's size
682
2.45k
    put_fixed32_le(&fixed_buf, cast_set<uint32_t>(footer_buf.size()));
683
    // footer's checksum
684
2.45k
    uint32_t checksum = crc32c::Crc32c(footer_buf.data(), footer_buf.size());
685
2.45k
    put_fixed32_le(&fixed_buf, checksum);
686
    // Append magic number. we don't write magic number in the header because
687
    // that will need an extra seek when reading
688
2.45k
    fixed_buf.append(k_segment_magic, k_segment_magic_length);
689
690
2.45k
    std::vector<Slice> slices {footer_buf, fixed_buf};
691
2.45k
    return _write_raw_data(slices);
692
2.45k
}
693
694
4.18k
Status SegmentWriter::_write_raw_data(const std::vector<Slice>& slices) {
695
4.18k
    RETURN_IF_ERROR(_file_writer->appendv(&slices[0], slices.size()));
696
4.18k
    return Status::OK();
697
4.18k
}
698
699
2.45k
Slice SegmentWriter::min_encoded_key() {
700
2.45k
    return (_primary_key_index_builder == nullptr) ? Slice(_min_key.data(), _min_key.size())
701
2.45k
                                                   : _primary_key_index_builder->min_key();
702
2.45k
}
703
2.45k
Slice SegmentWriter::max_encoded_key() {
704
2.45k
    return (_primary_key_index_builder == nullptr) ? Slice(_max_key.data(), _max_key.size())
705
2.45k
                                                   : _primary_key_index_builder->max_key();
706
2.45k
}
707
708
211
void SegmentWriter::set_min_max_key(const Slice& key) {
709
211
    if (UNLIKELY(_is_first_row)) {
710
6
        _min_key.append(key.get_data(), key.get_size());
711
6
        _is_first_row = false;
712
6
    }
713
211
    if (key.compare(_max_key) > 0) {
714
211
        _max_key.clear();
715
211
        _max_key.append(key.get_data(), key.get_size());
716
211
    }
717
211
}
718
719
2.94k
void SegmentWriter::set_min_key(const Slice& key) {
720
2.94k
    if (UNLIKELY(_is_first_row)) {
721
2.37k
        _min_key.append(key.get_data(), key.get_size());
722
2.37k
        _is_first_row = false;
723
2.37k
    }
724
2.94k
}
725
726
2.94k
void SegmentWriter::set_max_key(const Slice& key) {
727
2.94k
    _max_key.clear();
728
2.94k
    _max_key.append(key.get_data(), key.get_size());
729
2.94k
}
730
731
Status SegmentWriter::_generate_primary_key_index(
732
        const std::vector<IOlapColumnDataAccessor*>& primary_key_columns,
733
22
        IOlapColumnDataAccessor* seq_column, size_t num_rows, bool need_sort) {
734
22
    if (!need_sort) { // mow table without cluster key
735
22
        std::string last_key;
736
849
        for (size_t pos = 0; pos < num_rows; pos++) {
737
827
            std::string key = _key_encoder.full_encode(primary_key_columns, pos);
738
827
            MowKeyProbe::maybe_invalidate_row_cache(_opts.rowset_ctx->tablet_id, *_tablet_schema,
739
827
                                                    _opts.write_type, key);
740
827
            if (_tablet_schema->has_sequence_col()) {
741
3
                _key_encoder.append_seq_suffix(&key, seq_column, pos);
742
3
            }
743
827
            DCHECK(key.compare(last_key) > 0)
744
0
                    << "found duplicate key or key is not sorted! current key: " << key
745
0
                    << ", last key: " << last_key;
746
827
            RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
747
827
            last_key = std::move(key);
748
827
        }
749
22
    } else { // mow table with cluster key
750
        // generate primary keys in memory
751
0
        for (uint32_t pos = 0; pos < num_rows; pos++) {
752
0
            std::string key = _key_encoder.full_encode_primary_keys(primary_key_columns, pos);
753
0
            MowKeyProbe::maybe_invalidate_row_cache(_opts.rowset_ctx->tablet_id, *_tablet_schema,
754
0
                                                    _opts.write_type, key);
755
0
            if (_tablet_schema->has_sequence_col()) {
756
0
                _key_encoder.append_seq_suffix(&key, seq_column, pos);
757
0
            }
758
0
            _key_encoder.append_rowid_suffix(&key, pos + _num_rows_written);
759
0
            _primary_keys_size += key.size();
760
0
            _primary_keys.emplace_back(std::move(key));
761
0
        }
762
0
    }
763
22
    return Status::OK();
764
22
}
765
766
Status SegmentWriter::_generate_short_key_index(std::vector<IOlapColumnDataAccessor*>& key_columns,
767
                                                size_t num_rows,
768
2.94k
                                                const std::vector<size_t>& short_key_pos) {
769
2.94k
    set_min_key(_key_encoder.full_encode(key_columns, 0));
770
2.94k
    set_max_key(_key_encoder.full_encode(key_columns, num_rows - 1));
771
2.94k
    DCHECK(Slice(_max_key.data(), _max_key.size())
772
0
                   .compare(Slice(_min_key.data(), _min_key.size())) >= 0)
773
0
            << "key is not sorted! min key: " << _min_key << ", max key: " << _max_key;
774
775
2.94k
    key_columns.resize(_num_short_key_columns);
776
2.94k
    std::string last_key;
777
13.0k
    for (const auto pos : short_key_pos) {
778
13.0k
        std::string key = _key_encoder.encode_short_keys(key_columns, pos);
779
13.0k
        DCHECK(key.compare(last_key) >= 0)
780
0
                << "key is not sorted! current key: " << key << ", last key: " << last_key;
781
13.0k
        RETURN_IF_ERROR(_short_key_index_builder->add_item(key));
782
13.0k
        last_key = std::move(key);
783
13.0k
    }
784
2.94k
    return Status::OK();
785
2.94k
}
786
787
} // namespace segment_v2
788
} // namespace doris