Coverage Report

Created: 2026-04-15 07:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/column_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/column_writer.h"
19
20
#include <gen_cpp/segment_v2.pb.h>
21
22
#include <algorithm>
23
#include <cstring>
24
#include <filesystem>
25
#include <memory>
26
27
#include "common/config.h"
28
#include "common/logging.h"
29
#include "core/data_type/data_type_agg_state.h"
30
#include "core/data_type/data_type_factory.hpp"
31
#include "core/types.h"
32
#include "io/fs/file_writer.h"
33
#include "runtime/collection_value.h"
34
#include "storage/field.h"
35
#include "storage/index/bloom_filter/bloom_filter_index_writer.h"
36
#include "storage/index/inverted/inverted_index_writer.h"
37
#include "storage/index/ordinal_page_index.h"
38
#include "storage/index/zone_map/zone_map_index.h"
39
#include "storage/olap_common.h"
40
#include "storage/segment/encoding_info.h"
41
#include "storage/segment/options.h"
42
#include "storage/segment/page_builder.h"
43
#include "storage/segment/page_io.h"
44
#include "storage/segment/page_pointer.h"
45
#include "storage/segment/variant/variant_column_writer_impl.h"
46
#include "storage/tablet/tablet_schema.h"
47
#include "storage/types.h"
48
#include "util/block_compression.h"
49
#include "util/debug_points.h"
50
#include "util/faststring.h"
51
#include "util/rle_encoding.h"
52
#include "util/simd/bits.h"
53
54
namespace doris::segment_v2 {
55
56
class NullBitmapBuilder {
57
public:
58
565k
    NullBitmapBuilder() : _has_null(false), _bitmap_buf(512), _rle_encoder(&_bitmap_buf, 1) {}
59
60
    explicit NullBitmapBuilder(size_t reserve_bits)
61
            : _has_null(false),
62
              _bitmap_buf(BitmapSize(reserve_bits)),
63
0
              _rle_encoder(&_bitmap_buf, 1) {}
64
65
2.45M
    void reserve_for_write(size_t num_rows, size_t non_null_count) {
66
2.45M
        if (num_rows == 0) {
67
0
            return;
68
0
        }
69
2.45M
        if (non_null_count == 0 || (non_null_count == num_rows && !_has_null)) {
70
445k
            if (_bitmap_buf.capacity() < kSmallReserveBytes) {
71
80
                _bitmap_buf.reserve(kSmallReserveBytes);
72
80
            }
73
445k
            return;
74
445k
        }
75
2.00M
        size_t raw_bytes = BitmapSize(num_rows);
76
2.00M
        size_t run_est = std::min(num_rows, non_null_count * 2 + 1);
77
2.00M
        size_t run_bytes_est = run_est * kBytesPerRun + kReserveSlackBytes;
78
2.00M
        size_t raw_overhead = raw_bytes / 63 + 1;
79
2.00M
        size_t raw_est = raw_bytes + raw_overhead + kReserveSlackBytes;
80
2.00M
        size_t reserve_bytes = std::min(raw_est, run_bytes_est);
81
2.00M
        if (_bitmap_buf.capacity() < reserve_bytes) {
82
9.77k
            const size_t cap = _bitmap_buf.capacity();
83
9.77k
            const size_t grow = cap + cap / 2;
84
9.77k
            const size_t new_cap = std::max(reserve_bytes, grow);
85
9.77k
            _bitmap_buf.reserve(new_cap);
86
9.77k
        }
87
2.00M
    }
88
89
7.08M
    void add_run(bool value, size_t run) {
90
7.08M
        _has_null |= value;
91
7.08M
        _rle_encoder.Put(value, run);
92
7.08M
    }
93
94
    // Returns whether the building nullmap contains nullptr
95
564k
    bool has_null() const { return _has_null; }
96
97
164k
    Status finish(OwnedSlice* slice) {
98
164k
        _rle_encoder.Flush();
99
164k
        RETURN_IF_CATCH_EXCEPTION({ *slice = _bitmap_buf.build(); });
100
164k
        return Status::OK();
101
164k
    }
102
103
564k
    void reset() {
104
564k
        _has_null = false;
105
564k
        _rle_encoder.Clear();
106
564k
    }
107
108
38.6k
    uint64_t size() { return _bitmap_buf.size(); }
109
110
private:
111
    static constexpr size_t kSmallReserveBytes = 64;
112
    static constexpr size_t kReserveSlackBytes = 16;
113
    static constexpr size_t kBytesPerRun = 6;
114
115
    bool _has_null;
116
    faststring _bitmap_buf;
117
    RleEncoder<bool> _rle_encoder;
118
};
119
120
inline ScalarColumnWriter* get_null_writer(const ColumnWriterOptions& opts,
121
73.0k
                                           io::FileWriter* file_writer, uint32_t id) {
122
73.0k
    if (!opts.meta->is_nullable()) {
123
28.6k
        return nullptr;
124
28.6k
    }
125
126
44.3k
    FieldType null_type = FieldType::OLAP_FIELD_TYPE_TINYINT;
127
44.3k
    ColumnWriterOptions null_options;
128
44.3k
    null_options.meta = opts.meta->add_children_columns();
129
44.3k
    null_options.meta->set_column_id(id);
130
44.3k
    null_options.meta->set_unique_id(id);
131
44.3k
    null_options.meta->set_type(int(null_type));
132
44.3k
    null_options.meta->set_is_nullable(false);
133
44.3k
    null_options.meta->set_length(
134
44.3k
            cast_set<int32_t>(get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_TINYINT>()->size()));
135
44.3k
    null_options.meta->set_encoding(DEFAULT_ENCODING);
136
44.3k
    null_options.meta->set_compression(opts.meta->compression());
137
138
44.3k
    null_options.need_zone_map = false;
139
44.3k
    null_options.need_bloom_filter = false;
140
44.3k
    null_options.encoding_preference = opts.encoding_preference;
141
142
44.3k
    TabletColumn null_column =
143
44.3k
            TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, null_type, false,
144
44.3k
                         null_options.meta->unique_id(), null_options.meta->length());
145
44.3k
    null_column.set_name("nullable");
146
44.3k
    null_column.set_index_length(-1); // no short key index
147
44.3k
    std::unique_ptr<StorageField> null_field(StorageFieldFactory::create(null_column));
148
44.3k
    return new ScalarColumnWriter(null_options, std::move(null_field), file_writer);
149
73.0k
}
150
151
ColumnWriter::ColumnWriter(std::unique_ptr<StorageField> field, bool is_nullable,
152
                           ColumnMetaPB* meta)
153
1.08M
        : _field(std::move(field)), _is_nullable(is_nullable), _column_meta(meta) {
154
1.08M
    _data_type = DataTypeFactory::instance().create_data_type(*_column_meta);
155
1.08M
}
156
Status ColumnWriter::create_struct_writer(const ColumnWriterOptions& opts,
157
                                          const TabletColumn* column, io::FileWriter* file_writer,
158
2.84k
                                          std::unique_ptr<ColumnWriter>* writer) {
159
    // not support empty struct
160
2.84k
    DCHECK(column->get_subtype_count() >= 1);
161
2.84k
    std::vector<std::unique_ptr<ColumnWriter>> sub_column_writers;
162
2.84k
    sub_column_writers.reserve(column->get_subtype_count());
163
22.1k
    for (uint32_t i = 0; i < column->get_subtype_count(); i++) {
164
19.2k
        const TabletColumn& sub_column = column->get_sub_column(i);
165
19.2k
        RETURN_IF_ERROR(sub_column.check_valid());
166
167
        // create sub writer
168
19.2k
        ColumnWriterOptions column_options;
169
19.2k
        column_options.meta = opts.meta->mutable_children_columns(i);
170
19.2k
        column_options.need_zone_map = false;
171
19.2k
        column_options.need_bloom_filter = sub_column.is_bf_column();
172
19.2k
        column_options.encoding_preference = opts.encoding_preference;
173
19.2k
        std::unique_ptr<ColumnWriter> sub_column_writer;
174
19.2k
        RETURN_IF_ERROR(
175
19.2k
                ColumnWriter::create(column_options, &sub_column, file_writer, &sub_column_writer));
176
19.2k
        sub_column_writers.push_back(std::move(sub_column_writer));
177
19.2k
    }
178
179
2.84k
    ScalarColumnWriter* null_writer =
180
2.84k
            get_null_writer(opts, file_writer, column->get_subtype_count() + 1);
181
182
2.84k
    *writer = std::unique_ptr<ColumnWriter>(new StructColumnWriter(
183
2.84k
            opts, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)), null_writer,
184
2.84k
            sub_column_writers));
185
2.84k
    return Status::OK();
186
2.84k
}
187
188
Status ColumnWriter::create_array_writer(const ColumnWriterOptions& opts,
189
                                         const TabletColumn* column, io::FileWriter* file_writer,
190
48.3k
                                         std::unique_ptr<ColumnWriter>* writer) {
191
48.3k
    DCHECK(column->get_subtype_count() == 1);
192
48.3k
    const TabletColumn& item_column = column->get_sub_column(0);
193
48.3k
    RETURN_IF_ERROR(item_column.check_valid());
194
195
    // create item writer
196
48.3k
    ColumnWriterOptions item_options;
197
48.3k
    item_options.meta = opts.meta->mutable_children_columns(0);
198
48.3k
    item_options.need_zone_map = false;
199
48.3k
    item_options.need_bloom_filter = item_column.is_bf_column();
200
48.3k
    item_options.encoding_preference = opts.encoding_preference;
201
48.3k
    std::unique_ptr<ColumnWriter> item_writer;
202
48.3k
    RETURN_IF_ERROR(ColumnWriter::create(item_options, &item_column, file_writer, &item_writer));
203
204
    // create length writer
205
48.3k
    FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT;
206
207
48.3k
    ColumnWriterOptions length_options;
208
48.3k
    length_options.meta = opts.meta->add_children_columns();
209
48.3k
    length_options.meta->set_column_id(2);
210
48.3k
    length_options.meta->set_unique_id(2);
211
48.3k
    length_options.meta->set_type(int(length_type));
212
48.3k
    length_options.meta->set_is_nullable(false);
213
48.3k
    length_options.meta->set_length(cast_set<int32_t>(
214
48.3k
            get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>()->size()));
215
48.3k
    length_options.meta->set_encoding(DEFAULT_ENCODING);
216
48.3k
    length_options.meta->set_compression(opts.meta->compression());
217
218
48.3k
    length_options.need_zone_map = false;
219
48.3k
    length_options.need_bloom_filter = false;
220
48.3k
    length_options.encoding_preference = opts.encoding_preference;
221
222
48.3k
    TabletColumn length_column =
223
48.3k
            TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, length_type,
224
48.3k
                         length_options.meta->is_nullable(), length_options.meta->unique_id(),
225
48.3k
                         length_options.meta->length());
226
48.3k
    length_column.set_name("length");
227
48.3k
    length_column.set_index_length(-1); // no short key index
228
48.3k
    std::unique_ptr<StorageField> bigint_field(StorageFieldFactory::create(length_column));
229
48.3k
    auto* length_writer =
230
48.3k
            new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer);
231
232
48.3k
    ScalarColumnWriter* null_writer = get_null_writer(opts, file_writer, 3);
233
234
48.3k
    *writer = std::unique_ptr<ColumnWriter>(new ArrayColumnWriter(
235
48.3k
            opts, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)),
236
48.3k
            length_writer, null_writer, std::move(item_writer)));
237
48.3k
    return Status::OK();
238
48.3k
}
239
240
Status ColumnWriter::create_map_writer(const ColumnWriterOptions& opts, const TabletColumn* column,
241
                                       io::FileWriter* file_writer,
242
21.8k
                                       std::unique_ptr<ColumnWriter>* writer) {
243
21.8k
    DCHECK(column->get_subtype_count() == 2);
244
21.8k
    if (column->get_subtype_count() < 2) {
245
0
        return Status::InternalError(
246
0
                "If you upgraded from version 1.2.*, please DROP the MAP columns and then "
247
0
                "ADD the MAP columns back.");
248
0
    }
249
    // create key & value writer
250
21.8k
    std::vector<std::unique_ptr<ColumnWriter>> inner_writer_list;
251
65.5k
    for (int i = 0; i < 2; ++i) {
252
43.6k
        const TabletColumn& item_column = column->get_sub_column(i);
253
43.6k
        RETURN_IF_ERROR(item_column.check_valid());
254
255
        // create item writer
256
43.6k
        ColumnWriterOptions item_options;
257
43.6k
        item_options.meta = opts.meta->mutable_children_columns(i);
258
43.6k
        item_options.need_zone_map = false;
259
43.6k
        item_options.need_bloom_filter = item_column.is_bf_column();
260
43.6k
        item_options.encoding_preference = opts.encoding_preference;
261
43.6k
        std::unique_ptr<ColumnWriter> item_writer;
262
43.6k
        RETURN_IF_ERROR(
263
43.6k
                ColumnWriter::create(item_options, &item_column, file_writer, &item_writer));
264
43.6k
        inner_writer_list.push_back(std::move(item_writer));
265
43.6k
    }
266
267
    // create offset writer
268
21.8k
    FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT;
269
270
    // Be Cautious: column unique id is used for column reader creation
271
21.8k
    ColumnWriterOptions length_options;
272
21.8k
    length_options.meta = opts.meta->add_children_columns();
273
21.8k
    length_options.meta->set_column_id(column->get_subtype_count() + 1);
274
21.8k
    length_options.meta->set_unique_id(column->get_subtype_count() + 1);
275
21.8k
    length_options.meta->set_type(int(length_type));
276
21.8k
    length_options.meta->set_is_nullable(false);
277
21.8k
    length_options.meta->set_length(cast_set<int32_t>(
278
21.8k
            get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>()->size()));
279
21.8k
    length_options.meta->set_encoding(DEFAULT_ENCODING);
280
21.8k
    length_options.meta->set_compression(opts.meta->compression());
281
282
21.8k
    length_options.need_zone_map = false;
283
21.8k
    length_options.need_bloom_filter = false;
284
21.8k
    length_options.encoding_preference = opts.encoding_preference;
285
286
21.8k
    TabletColumn length_column =
287
21.8k
            TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, length_type,
288
21.8k
                         length_options.meta->is_nullable(), length_options.meta->unique_id(),
289
21.8k
                         length_options.meta->length());
290
21.8k
    length_column.set_name("length");
291
21.8k
    length_column.set_index_length(-1); // no short key index
292
21.8k
    std::unique_ptr<StorageField> bigint_field(StorageFieldFactory::create(length_column));
293
21.8k
    auto* length_writer =
294
21.8k
            new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer);
295
296
21.8k
    ScalarColumnWriter* null_writer =
297
21.8k
            get_null_writer(opts, file_writer, column->get_subtype_count() + 2);
298
299
21.8k
    *writer = std::unique_ptr<ColumnWriter>(new MapColumnWriter(
300
21.8k
            opts, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)), null_writer,
301
21.8k
            length_writer, inner_writer_list));
302
303
21.8k
    return Status::OK();
304
21.8k
}
305
306
Status ColumnWriter::create_agg_state_writer(const ColumnWriterOptions& opts,
307
                                             const TabletColumn* column,
308
                                             io::FileWriter* file_writer,
309
1.29k
                                             std::unique_ptr<ColumnWriter>* writer) {
310
1.29k
    auto data_type = DataTypeFactory::instance().create_data_type(*column);
311
1.29k
    const auto* agg_state_type = assert_cast<const DataTypeAggState*>(data_type.get());
312
1.29k
    auto type = agg_state_type->get_serialized_type()->get_primitive_type();
313
1.29k
    if (type == PrimitiveType::TYPE_STRING || type == PrimitiveType::INVALID_TYPE ||
314
1.29k
        type == PrimitiveType::TYPE_FIXED_LENGTH_OBJECT || type == PrimitiveType::TYPE_BITMAP) {
315
1.21k
        *writer = std::unique_ptr<ColumnWriter>(new ScalarColumnWriter(
316
1.21k
                opts, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)),
317
1.21k
                file_writer));
318
1.21k
    } else if (type == PrimitiveType::TYPE_ARRAY) {
319
22
        RETURN_IF_ERROR(create_array_writer(opts, column, file_writer, writer));
320
62
    } else if (type == PrimitiveType::TYPE_MAP) {
321
51
        RETURN_IF_ERROR(create_map_writer(opts, column, file_writer, writer));
322
51
    } else {
323
11
        throw Exception(ErrorCode::INTERNAL_ERROR,
324
11
                        "OLAP_FIELD_TYPE_AGG_STATE meet unsupported type: {}",
325
11
                        agg_state_type->get_name());
326
11
    }
327
1.28k
    return Status::OK();
328
1.29k
}
329
330
Status ColumnWriter::create_variant_writer(const ColumnWriterOptions& opts,
331
                                           const TabletColumn* column, io::FileWriter* file_writer,
332
6.38k
                                           std::unique_ptr<ColumnWriter>* writer) {
333
    // Variant extracted columns have two kinds of physical writers:
334
    // - Doc-value snapshot column (`...__DORIS_VARIANT_DOC_VALUE__...`): use `VariantDocCompactWriter`
335
    //   to store the doc snapshot in a compact binary form.
336
    // - Regular extracted subcolumns: use `VariantSubcolumnWriter`.
337
    // The root VARIANT column itself uses `VariantColumnWriter`.
338
6.38k
    if (column->is_extracted_column()) {
339
380
        if (column->name().find(DOC_VALUE_COLUMN_PATH) != std::string::npos) {
340
275
            *writer = std::make_unique<VariantDocCompactWriter>(
341
275
                    opts, column,
342
275
                    std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)));
343
275
            return Status::OK();
344
275
        }
345
105
        VLOG_DEBUG << "gen subwriter for " << column->path_info_ptr()->get_path();
346
105
        *writer = std::make_unique<VariantSubcolumnWriter>(
347
105
                opts, column, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)));
348
105
        return Status::OK();
349
380
    }
350
6.00k
    *writer = std::make_unique<VariantColumnWriter>(
351
6.00k
            opts, column, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)));
352
6.00k
    return Status::OK();
353
6.38k
}
354
355
//Todo(Amory): here should according nullable and offset and need sub to simply this function
356
Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* column,
357
958k
                            io::FileWriter* file_writer, std::unique_ptr<ColumnWriter>* writer) {
358
958k
    std::unique_ptr<StorageField> field(StorageFieldFactory::create(*column));
359
958k
    DCHECK(field.get() != nullptr);
360
958k
    if (is_scalar_type(column->type())) {
361
888k
        *writer = std::unique_ptr<ColumnWriter>(
362
888k
                new ScalarColumnWriter(opts, std::move(field), file_writer));
363
888k
        return Status::OK();
364
888k
    } else {
365
70.2k
        switch (column->type()) {
366
1.29k
        case FieldType::OLAP_FIELD_TYPE_AGG_STATE: {
367
1.29k
            RETURN_IF_ERROR(create_agg_state_writer(opts, column, file_writer, writer));
368
1.29k
            return Status::OK();
369
1.29k
        }
370
2.84k
        case FieldType::OLAP_FIELD_TYPE_STRUCT: {
371
2.84k
            RETURN_IF_ERROR(create_struct_writer(opts, column, file_writer, writer));
372
2.84k
            return Status::OK();
373
2.84k
        }
374
48.3k
        case FieldType::OLAP_FIELD_TYPE_ARRAY: {
375
48.3k
            RETURN_IF_ERROR(create_array_writer(opts, column, file_writer, writer));
376
48.3k
            return Status::OK();
377
48.3k
        }
378
11.5k
        case FieldType::OLAP_FIELD_TYPE_MAP: {
379
11.5k
            RETURN_IF_ERROR(create_map_writer(opts, column, file_writer, writer));
380
11.5k
            return Status::OK();
381
11.5k
        }
382
6.38k
        case FieldType::OLAP_FIELD_TYPE_VARIANT: {
383
            // Process columns with sparse column
384
6.38k
            RETURN_IF_ERROR(create_variant_writer(opts, column, file_writer, writer));
385
6.38k
            return Status::OK();
386
6.38k
        }
387
0
        default:
388
0
            return Status::NotSupported("unsupported type for ColumnWriter: {}",
389
0
                                        std::to_string(int(field->type())));
390
70.2k
        }
391
70.2k
    }
392
958k
}
393
394
Status ColumnWriter::append_nullable(const uint8_t* is_null_bits, const void* data,
395
651k
                                     size_t num_rows) {
396
651k
    const auto* ptr = (const uint8_t*)data;
397
651k
    BitmapIterator null_iter(is_null_bits, num_rows);
398
651k
    bool is_null = false;
399
651k
    size_t this_run = 0;
400
1.30M
    while ((this_run = null_iter.Next(&is_null)) > 0) {
401
651k
        if (is_null) {
402
0
            RETURN_IF_ERROR(append_nulls(this_run));
403
651k
        } else {
404
651k
            RETURN_IF_ERROR(append_data(&ptr, this_run));
405
651k
        }
406
651k
    }
407
651k
    return Status::OK();
408
651k
}
409
410
Status ColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
411
0
                                     size_t num_rows) {
412
    // Fast path: use SIMD to detect all-NULL or all-non-NULL columns
413
0
    if (config::enable_rle_batch_put_optimization) {
414
0
        size_t non_null_count =
415
0
                simd::count_zero_num(reinterpret_cast<const int8_t*>(null_map), num_rows);
416
417
0
        if (non_null_count == 0) {
418
            // All NULL: skip run-length iteration, directly append all nulls
419
0
            RETURN_IF_ERROR(append_nulls(num_rows));
420
0
            *ptr += get_field()->size() * num_rows;
421
0
            return Status::OK();
422
0
        }
423
424
0
        if (non_null_count == num_rows) {
425
            // All non-NULL: skip run-length iteration, directly append all data
426
0
            return append_data(ptr, num_rows);
427
0
        }
428
0
    }
429
430
    // Mixed case or sparse optimization disabled: use run-length processing
431
0
    size_t offset = 0;
432
0
    auto next_run_step = [&]() {
433
0
        size_t step = 1;
434
0
        for (auto i = offset + 1; i < num_rows; ++i) {
435
0
            if (null_map[offset] == null_map[i]) {
436
0
                step++;
437
0
            } else {
438
0
                break;
439
0
            }
440
0
        }
441
0
        return step;
442
0
    };
443
444
0
    do {
445
0
        auto step = next_run_step();
446
0
        if (null_map[offset]) {
447
0
            RETURN_IF_ERROR(append_nulls(step));
448
0
            *ptr += get_field()->size() * step;
449
0
        } else {
450
            // TODO:
451
            //  1. `*ptr += get_field()->size() * step;` should do in this function, not append_data;
452
            //  2. support array vectorized load and ptr offset add
453
0
            RETURN_IF_ERROR(append_data(ptr, step));
454
0
        }
455
0
        offset += step;
456
0
    } while (offset < num_rows);
457
458
0
    return Status::OK();
459
0
}
460
461
2.90M
Status ColumnWriter::append(const uint8_t* nullmap, const void* data, size_t num_rows) {
462
2.90M
    assert(data && num_rows > 0);
463
2.90M
    const auto* ptr = (const uint8_t*)data;
464
2.90M
    if (nullmap) {
465
2.49M
        return append_nullable(nullmap, &ptr, num_rows);
466
2.49M
    } else {
467
408k
        return append_data(&ptr, num_rows);
468
408k
    }
469
2.90M
}
470
471
///////////////////////////////////////////////////////////////////////////////////
472
473
ScalarColumnWriter::ScalarColumnWriter(const ColumnWriterOptions& opts,
474
                                       std::unique_ptr<StorageField> field,
475
                                       io::FileWriter* file_writer)
476
1.01M
        : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta),
477
1.01M
          _opts(opts),
478
1.01M
          _file_writer(file_writer),
479
1.01M
          _data_size(0) {
480
    // these opts.meta fields should be set by client
481
1.01M
    DCHECK(opts.meta->has_column_id());
482
1.01M
    DCHECK(opts.meta->has_unique_id());
483
1.01M
    DCHECK(opts.meta->has_type());
484
1.01M
    DCHECK(opts.meta->has_length());
485
1.01M
    DCHECK(opts.meta->has_encoding());
486
1.01M
    DCHECK(opts.meta->has_compression());
487
1.01M
    DCHECK(opts.meta->has_is_nullable());
488
1.01M
    DCHECK(file_writer != nullptr);
489
1.01M
    _inverted_index_builders.resize(_opts.inverted_indexes.size());
490
1.01M
}
491
492
1.01M
ScalarColumnWriter::~ScalarColumnWriter() {
493
    // delete all pages
494
1.01M
    _pages.clear();
495
1.01M
}
496
497
1.00M
Status ScalarColumnWriter::init() {
498
1.00M
    RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec));
499
500
1.00M
    PageBuilder* page_builder = nullptr;
501
502
1.00M
    RETURN_IF_ERROR(EncodingInfo::get(get_field()->type(), _opts.meta->encoding(),
503
1.00M
                                      _opts.encoding_preference, &_encoding_info));
504
1.00M
    _opts.meta->set_encoding(_encoding_info->encoding());
505
    // create page builder
506
1.00M
    PageBuilderOptions opts;
507
1.00M
    opts.data_page_size = _opts.data_page_size;
508
1.00M
    opts.dict_page_size = _opts.dict_page_size;
509
1.00M
    opts.encoding_preference = _opts.encoding_preference;
510
1.00M
    RETURN_IF_ERROR(_encoding_info->create_page_builder(opts, &page_builder));
511
1.00M
    if (page_builder == nullptr) {
512
0
        return Status::NotSupported("Failed to create page builder for type {} and encoding {}",
513
0
                                    get_field()->type(), _opts.meta->encoding());
514
0
    }
515
    // should store more concrete encoding type instead of DEFAULT_ENCODING
516
    // because the default encoding of a data type can be changed in the future
517
1.00M
    DCHECK_NE(_opts.meta->encoding(), DEFAULT_ENCODING);
518
18.4E
    VLOG_DEBUG << fmt::format(
519
18.4E
            "[verbose] scalar column writer init, column_id={}, type={}, encoding={}, "
520
18.4E
            "is_nullable={}",
521
18.4E
            _opts.meta->column_id(), get_field()->type(),
522
18.4E
            EncodingTypePB_Name(_opts.meta->encoding()), _opts.meta->is_nullable());
523
1.00M
    _page_builder.reset(page_builder);
524
    // create ordinal builder
525
1.00M
    _ordinal_index_builder = std::make_unique<OrdinalIndexWriter>();
526
    // create null bitmap builder
527
1.00M
    if (is_nullable()) {
528
565k
        _null_bitmap_builder = std::make_unique<NullBitmapBuilder>();
529
565k
    }
530
1.00M
    if (_opts.need_zone_map) {
531
728k
        RETURN_IF_ERROR(
532
728k
                ZoneMapIndexWriter::create(_data_type, get_field(), _zone_map_index_builder));
533
728k
    }
534
535
1.00M
    if (_opts.need_inverted_index) {
536
40.7k
        do {
537
81.7k
            for (size_t i = 0; i < _opts.inverted_indexes.size(); i++) {
538
40.9k
                DBUG_EXECUTE_IF("column_writer.init", {
539
40.9k
                    class InvertedIndexColumnWriterEmpty final : public IndexColumnWriter {
540
40.9k
                    public:
541
40.9k
                        Status init() override { return Status::OK(); }
542
40.9k
                        Status add_values(const std::string name, const void* values,
543
40.9k
                                          size_t count) override {
544
40.9k
                            return Status::OK();
545
40.9k
                        }
546
40.9k
                        Status add_array_values(size_t field_size, const CollectionValue* values,
547
40.9k
                                                size_t count) override {
548
40.9k
                            return Status::OK();
549
40.9k
                        }
550
40.9k
                        Status add_array_values(size_t field_size, const void* value_ptr,
551
40.9k
                                                const uint8_t* null_map, const uint8_t* offsets_ptr,
552
40.9k
                                                size_t count) override {
553
40.9k
                            return Status::OK();
554
40.9k
                        }
555
40.9k
                        Status add_nulls(uint32_t count) override { return Status::OK(); }
556
40.9k
                        Status add_array_nulls(const uint8_t* null_map, size_t num_rows) override {
557
40.9k
                            return Status::OK();
558
40.9k
                        }
559
40.9k
                        Status finish() override { return Status::OK(); }
560
40.9k
                        int64_t size() const override { return 0; }
561
40.9k
                        void close_on_error() override {}
562
40.9k
                    };
563
564
40.9k
                    _inverted_index_builders[i] =
565
40.9k
                            std::make_unique<InvertedIndexColumnWriterEmpty>();
566
567
40.9k
                    break;
568
40.9k
                });
569
570
40.9k
                RETURN_IF_ERROR(IndexColumnWriter::create(get_field(), &_inverted_index_builders[i],
571
40.9k
                                                          _opts.index_file_writer,
572
40.9k
                                                          _opts.inverted_indexes[i]));
573
40.9k
            }
574
40.7k
        } while (false);
575
40.7k
    }
576
1.00M
    if (_opts.need_bloom_filter) {
577
8.30k
        if (_opts.is_ngram_bf_index) {
578
3.39k
            RETURN_IF_ERROR(NGramBloomFilterIndexWriterImpl::create(
579
3.39k
                    BloomFilterOptions(), get_field()->type_info(), _opts.gram_size,
580
3.39k
                    _opts.gram_bf_size, &_bloom_filter_index_builder));
581
4.90k
        } else {
582
4.90k
            RETURN_IF_ERROR(BloomFilterIndexWriter::create(
583
4.90k
                    _opts.bf_options, get_field()->type_info(), &_bloom_filter_index_builder));
584
4.90k
        }
585
8.30k
    }
586
1.00M
    return Status::OK();
587
1.00M
}
588
589
3.07M
Status ScalarColumnWriter::append_nulls(size_t num_rows) {
590
3.07M
    _null_bitmap_builder->add_run(true, num_rows);
591
3.07M
    _next_rowid += num_rows;
592
3.07M
    if (_opts.need_zone_map) {
593
2.98M
        _zone_map_index_builder->add_nulls(cast_set<uint32_t>(num_rows));
594
2.98M
    }
595
3.07M
    if (_opts.need_inverted_index) {
596
1.55M
        for (const auto& builder : _inverted_index_builders) {
597
1.55M
            RETURN_IF_ERROR(builder->add_nulls(cast_set<uint32_t>(num_rows)));
598
1.55M
        }
599
1.55M
    }
600
3.07M
    if (_opts.need_bloom_filter) {
601
585
        _bloom_filter_index_builder->add_nulls(cast_set<uint32_t>(num_rows));
602
585
    }
603
3.07M
    return Status::OK();
604
3.07M
}
605
606
// append data to page builder. this function will make sure that
607
// num_rows must be written before return. And ptr will be modified
608
// to next data should be written
609
4.43M
Status ScalarColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
610
4.43M
    size_t remaining = num_rows;
611
8.94M
    while (remaining > 0) {
612
4.50M
        size_t num_written = remaining;
613
4.50M
        RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written));
614
615
4.50M
        remaining -= num_written;
616
617
4.50M
        if (_page_builder->is_page_full()) {
618
73.3k
            RETURN_IF_ERROR(finish_current_page());
619
73.3k
        }
620
4.50M
    }
621
4.43M
    return Status::OK();
622
4.43M
}
623
624
Status ScalarColumnWriter::_internal_append_data_in_current_page(const uint8_t* data,
625
4.57M
                                                                 size_t* num_written) {
626
4.57M
    RETURN_IF_ERROR(_page_builder->add(data, num_written));
627
4.57M
    if (_opts.need_zone_map) {
628
4.24M
        _zone_map_index_builder->add_values(data, *num_written);
629
4.24M
    }
630
4.57M
    if (_opts.need_inverted_index) {
631
1.58M
        for (const auto& builder : _inverted_index_builders) {
632
1.58M
            RETURN_IF_ERROR(builder->add_values(get_field()->name(), data, *num_written));
633
1.58M
        }
634
1.58M
    }
635
4.57M
    if (_opts.need_bloom_filter) {
636
8.60k
        RETURN_IF_ERROR(_bloom_filter_index_builder->add_values(data, *num_written));
637
8.60k
    }
638
639
4.57M
    _next_rowid += *num_written;
640
641
    // we must write null bits after write data, because we don't
642
    // know how many rows can be written into current page
643
4.57M
    if (is_nullable()) {
644
4.03M
        _null_bitmap_builder->add_run(false, *num_written);
645
4.03M
    }
646
4.57M
    return Status::OK();
647
4.57M
}
648
649
4.57M
Status ScalarColumnWriter::append_data_in_current_page(const uint8_t** data, size_t* num_written) {
650
4.57M
    RETURN_IF_ERROR(append_data_in_current_page(*data, num_written));
651
4.57M
    *data += get_field()->size() * (*num_written);
652
4.57M
    return Status::OK();
653
4.57M
}
654
655
Status ScalarColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
656
2.44M
                                           size_t num_rows) {
657
    // When optimization is disabled, use base class implementation
658
2.44M
    if (!config::enable_rle_batch_put_optimization) {
659
0
        return ColumnWriter::append_nullable(null_map, ptr, num_rows);
660
0
    }
661
662
2.44M
    if (UNLIKELY(num_rows == 0)) {
663
0
        return Status::OK();
664
0
    }
665
666
    // Build run-length encoded null runs using memchr for fast boundary detection
667
2.44M
    _null_run_buffer.clear();
668
2.44M
    if (_null_run_buffer.capacity() < num_rows) {
669
569k
        _null_run_buffer.reserve(std::min(num_rows, size_t(256)));
670
569k
    }
671
672
2.44M
    size_t non_null_count = 0;
673
2.44M
    size_t offset = 0;
674
6.90M
    while (offset < num_rows) {
675
4.45M
        bool is_null = null_map[offset] != 0;
676
4.45M
        size_t remaining = num_rows - offset;
677
4.45M
        const uint8_t* run_end =
678
4.45M
                static_cast<const uint8_t*>(memchr(null_map + offset, is_null ? 0 : 1, remaining));
679
4.45M
        size_t run_length = run_end != nullptr ? (run_end - (null_map + offset)) : remaining;
680
4.45M
        _null_run_buffer.push_back(NullRun {is_null, static_cast<uint32_t>(run_length)});
681
4.45M
        if (!is_null) {
682
3.37M
            non_null_count += run_length;
683
3.37M
        }
684
4.45M
        offset += run_length;
685
4.45M
    }
686
687
    // Pre-allocate buffer based on estimated size
688
2.45M
    if (_null_bitmap_builder != nullptr) {
689
2.45M
        size_t current_rows = _next_rowid - _first_rowid;
690
2.45M
        size_t expected_rows = current_rows + num_rows;
691
2.45M
        size_t est_non_null = non_null_count;
692
2.45M
        if (num_rows > 0 && expected_rows > num_rows) {
693
1.95M
            est_non_null = (non_null_count * expected_rows) / num_rows;
694
1.95M
        }
695
2.45M
        _null_bitmap_builder->reserve_for_write(expected_rows, est_non_null);
696
2.45M
    }
697
698
2.44M
    if (non_null_count == 0) {
699
        // All NULL: skip data writing, only update null bitmap and indexes
700
52.3k
        RETURN_IF_ERROR(append_nulls(num_rows));
701
52.3k
        *ptr += get_field()->size() * num_rows;
702
52.3k
        return Status::OK();
703
52.3k
    }
704
705
2.39M
    if (non_null_count == num_rows) {
706
        // All non-NULL: use normal append_data which handles both data and null bitmap
707
2.33M
        return append_data(ptr, num_rows);
708
2.33M
    }
709
710
    // Process by runs
711
2.04M
    for (const auto& run : _null_run_buffer) {
712
2.04M
        size_t run_length = run.len;
713
2.04M
        if (run.is_null) {
714
1.03M
            RETURN_IF_ERROR(append_nulls(run_length));
715
1.03M
            *ptr += get_field()->size() * run_length;
716
1.03M
        } else {
717
            // TODO:
718
            //  1. `*ptr += get_field()->size() * step;` should do in this function, not append_data;
719
            //  2. support array vectorized load and ptr offset add
720
1.00M
            RETURN_IF_ERROR(append_data(ptr, run_length));
721
1.00M
        }
722
2.04M
    }
723
724
54.0k
    return Status::OK();
725
54.0k
}
726
727
55.9k
uint64_t ScalarColumnWriter::estimate_buffer_size() {
728
55.9k
    uint64_t size = _data_size;
729
55.9k
    size += _page_builder->size();
730
55.9k
    if (is_nullable()) {
731
38.6k
        size += _null_bitmap_builder->size();
732
38.6k
    }
733
55.9k
    size += _ordinal_index_builder->size();
734
55.9k
    if (_opts.need_zone_map) {
735
41.2k
        size += _zone_map_index_builder->size();
736
41.2k
    }
737
55.9k
    if (_opts.need_bloom_filter) {
738
267
        size += _bloom_filter_index_builder->size();
739
267
    }
740
55.9k
    return size;
741
55.9k
}
742
743
1.01M
Status ScalarColumnWriter::finish() {
744
1.01M
    RETURN_IF_ERROR(finish_current_page());
745
1.01M
    _opts.meta->set_num_rows(_next_rowid);
746
1.01M
    return Status::OK();
747
1.01M
}
748
749
1.01M
Status ScalarColumnWriter::write_data() {
750
1.01M
    auto offset = _file_writer->bytes_appended();
751
1.37M
    auto collect_uncompressed_bytes = [](const PageFooterPB& footer) {
752
1.37M
        return footer.uncompressed_size() + footer.ByteSizeLong() +
753
1.37M
               sizeof(uint32_t) /* footer size */ + sizeof(uint32_t) /* checksum */;
754
1.37M
    };
755
1.04M
    for (auto& page : _pages) {
756
1.04M
        _total_uncompressed_data_pages_size += collect_uncompressed_bytes(page->footer);
757
1.04M
        RETURN_IF_ERROR(_write_data_page(page.get()));
758
1.04M
    }
759
1.01M
    _pages.clear();
760
    // write column dict
761
1.01M
    if (_encoding_info->encoding() == DICT_ENCODING) {
762
322k
        OwnedSlice dict_body;
763
322k
        RETURN_IF_ERROR(_page_builder->get_dictionary_page(&dict_body));
764
322k
        EncodingTypePB dict_word_page_encoding;
765
322k
        RETURN_IF_ERROR(_page_builder->get_dictionary_page_encoding(&dict_word_page_encoding));
766
767
322k
        PageFooterPB footer;
768
322k
        footer.set_type(DICTIONARY_PAGE);
769
322k
        footer.set_uncompressed_size(cast_set<uint32_t>(dict_body.slice().get_size()));
770
322k
        footer.mutable_dict_page_footer()->set_encoding(dict_word_page_encoding);
771
322k
        _total_uncompressed_data_pages_size += collect_uncompressed_bytes(footer);
772
773
322k
        PagePointer dict_pp;
774
322k
        RETURN_IF_ERROR(PageIO::compress_and_write_page(
775
322k
                _compress_codec, _opts.compression_min_space_saving, _file_writer,
776
322k
                {dict_body.slice()}, footer, &dict_pp));
777
322k
        dict_pp.to_proto(_opts.meta->mutable_dict_page());
778
322k
    }
779
1.01M
    _total_compressed_data_pages_size += _file_writer->bytes_appended() - offset;
780
1.01M
    _page_builder.reset();
781
1.01M
    return Status::OK();
782
1.01M
}
783
784
975k
Status ScalarColumnWriter::write_ordinal_index() {
785
975k
    return _ordinal_index_builder->finish(_file_writer, _opts.meta->add_indexes());
786
975k
}
787
788
781k
Status ScalarColumnWriter::write_zone_map() {
789
781k
    if (_opts.need_zone_map) {
790
728k
        return _zone_map_index_builder->finish(_file_writer, _opts.meta->add_indexes());
791
728k
    }
792
53.1k
    return Status::OK();
793
781k
}
794
795
677k
Status ScalarColumnWriter::write_inverted_index() {
796
677k
    if (_opts.need_inverted_index) {
797
40.9k
        for (const auto& builder : _inverted_index_builders) {
798
40.9k
            RETURN_IF_ERROR(builder->finish());
799
40.9k
        }
800
40.7k
    }
801
677k
    return Status::OK();
802
677k
}
803
804
670k
Status ScalarColumnWriter::write_bloom_filter_index() {
805
670k
    if (_opts.need_bloom_filter) {
806
8.29k
        return _bloom_filter_index_builder->finish(_file_writer, _opts.meta->add_indexes());
807
8.29k
    }
808
661k
    return Status::OK();
809
670k
}
810
811
// write a data page into file and update ordinal index
812
1.04M
Status ScalarColumnWriter::_write_data_page(Page* page) {
813
1.04M
    PagePointer pp;
814
1.04M
    std::vector<Slice> compressed_body;
815
1.99M
    for (auto& data : page->data) {
816
1.99M
        compressed_body.push_back(data.slice());
817
1.99M
    }
818
1.04M
    RETURN_IF_ERROR(PageIO::write_page(_file_writer, compressed_body, page->footer, &pp));
819
1.04M
    _ordinal_index_builder->append_entry(page->footer.data_page_footer().first_ordinal(), pp);
820
1.04M
    return Status::OK();
821
1.04M
}
822
823
1.08M
Status ScalarColumnWriter::finish_current_page() {
824
1.08M
    if (_next_rowid == _first_rowid) {
825
35.9k
        return Status::OK();
826
35.9k
    }
827
1.04M
    if (_opts.need_zone_map) {
828
        // If the number of rows in the current page is less than the threshold,
829
        // we will invalidate zone map index for this page by set pass_all to true.
830
793k
        if (_next_rowid - _first_rowid < config::zone_map_row_num_threshold) {
831
548k
            _zone_map_index_builder->invalid_page_zone_map();
832
548k
        }
833
793k
        RETURN_IF_ERROR(_zone_map_index_builder->flush());
834
793k
    }
835
836
1.04M
    if (_opts.need_bloom_filter) {
837
8.54k
        RETURN_IF_ERROR(_bloom_filter_index_builder->flush());
838
8.54k
    }
839
840
1.04M
    _raw_data_bytes += _page_builder->get_raw_data_size();
841
842
    // build data page body : encoded values + [nullmap]
843
1.04M
    std::vector<Slice> body;
844
1.04M
    OwnedSlice encoded_values;
845
1.04M
    RETURN_IF_ERROR(_page_builder->finish(&encoded_values));
846
1.04M
    RETURN_IF_ERROR(_page_builder->reset());
847
1.04M
    body.push_back(encoded_values.slice());
848
849
1.04M
    OwnedSlice nullmap;
850
1.04M
    if (_null_bitmap_builder != nullptr) {
851
564k
        if (is_nullable() && _null_bitmap_builder->has_null()) {
852
164k
            RETURN_IF_ERROR(_null_bitmap_builder->finish(&nullmap));
853
164k
            body.push_back(nullmap.slice());
854
164k
        }
855
564k
        _null_bitmap_builder->reset();
856
564k
    }
857
858
    // prepare data page footer
859
1.04M
    std::unique_ptr<Page> page(new Page());
860
1.04M
    page->footer.set_type(DATA_PAGE);
861
1.04M
    page->footer.set_uncompressed_size(cast_set<uint32_t>(Slice::compute_total_size(body)));
862
1.04M
    auto* data_page_footer = page->footer.mutable_data_page_footer();
863
1.04M
    data_page_footer->set_first_ordinal(_first_rowid);
864
1.04M
    data_page_footer->set_num_values(_next_rowid - _first_rowid);
865
1.04M
    data_page_footer->set_nullmap_size(cast_set<uint32_t>(nullmap.slice().size));
866
1.04M
    if (_new_page_callback != nullptr) {
867
70.0k
        _new_page_callback->put_extra_info_in_page(data_page_footer);
868
70.0k
    }
869
    // trying to compress page body
870
1.04M
    OwnedSlice compressed_body;
871
1.04M
    RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving,
872
1.04M
                                               body, &compressed_body));
873
1.04M
    if (compressed_body.slice().empty()) {
874
        // page body is uncompressed
875
941k
        page->data.emplace_back(std::move(encoded_values));
876
941k
        page->data.emplace_back(std::move(nullmap));
877
941k
    } else {
878
        // page body is compressed
879
107k
        page->data.emplace_back(std::move(compressed_body));
880
107k
    }
881
882
1.04M
    _push_back_page(std::move(page));
883
1.04M
    _first_rowid = _next_rowid;
884
1.04M
    return Status::OK();
885
1.04M
}
886
887
////////////////////////////////////////////////////////////////////////////////
888
889
////////////////////////////////////////////////////////////////////////////////
890
// offset column writer
891
////////////////////////////////////////////////////////////////////////////////
892
893
OffsetColumnWriter::OffsetColumnWriter(const ColumnWriterOptions& opts,
894
                                       std::unique_ptr<StorageField> field,
895
                                       io::FileWriter* file_writer)
896
70.2k
        : ScalarColumnWriter(opts, std::move(field), file_writer) {
897
    // now we only explain data in offset column as uint64
898
70.2k
    DCHECK(get_field()->type() == FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT);
899
70.2k
}
900
901
70.2k
OffsetColumnWriter::~OffsetColumnWriter() = default;
902
903
70.1k
Status OffsetColumnWriter::init() {
904
70.1k
    RETURN_IF_ERROR(ScalarColumnWriter::init());
905
70.1k
    register_flush_page_callback(this);
906
70.1k
    _next_offset = 0;
907
70.1k
    return Status::OK();
908
70.1k
}
909
910
70.1k
Status OffsetColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
911
70.1k
    size_t remaining = num_rows;
912
140k
    while (remaining > 0) {
913
70.5k
        size_t num_written = remaining;
914
70.5k
        RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written));
915
        // _next_offset after append_data_in_current_page is the offset of next data, which will used in finish_current_page() to set next_array_item_ordinal
916
70.5k
        _next_offset = *(const uint64_t*)(*ptr);
917
70.5k
        remaining -= num_written;
918
919
70.5k
        if (_page_builder->is_page_full()) {
920
            // get next data for next array_item_rowid
921
520
            RETURN_IF_ERROR(finish_current_page());
922
520
        }
923
70.5k
    }
924
70.1k
    return Status::OK();
925
70.1k
}
926
927
70.0k
void OffsetColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
928
70.0k
    footer->set_next_array_item_ordinal(_next_offset);
929
70.0k
}
930
931
StructColumnWriter::StructColumnWriter(
932
        const ColumnWriterOptions& opts, std::unique_ptr<StorageField> field,
933
        ScalarColumnWriter* null_writer,
934
        std::vector<std::unique_ptr<ColumnWriter>>& sub_column_writers)
935
2.84k
        : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta), _opts(opts) {
936
19.2k
    for (auto& sub_column_writer : sub_column_writers) {
937
19.2k
        _sub_column_writers.push_back(std::move(sub_column_writer));
938
19.2k
    }
939
2.84k
    _num_sub_column_writers = _sub_column_writers.size();
940
2.84k
    DCHECK(_num_sub_column_writers >= 1);
941
2.84k
    if (is_nullable()) {
942
2.57k
        _null_writer.reset(null_writer);
943
2.57k
    }
944
2.84k
}
945
946
2.84k
Status StructColumnWriter::init() {
947
19.2k
    for (auto& column_writer : _sub_column_writers) {
948
19.2k
        RETURN_IF_ERROR(column_writer->init());
949
19.2k
    }
950
2.84k
    if (is_nullable()) {
951
2.57k
        RETURN_IF_ERROR(_null_writer->init());
952
2.57k
    }
953
2.84k
    return Status::OK();
954
2.84k
}
955
956
2.23k
Status StructColumnWriter::write_inverted_index() {
957
2.23k
    if (_opts.need_inverted_index) {
958
0
        for (auto& column_writer : _sub_column_writers) {
959
0
            RETURN_IF_ERROR(column_writer->write_inverted_index());
960
0
        }
961
0
    }
962
2.23k
    return Status::OK();
963
2.23k
}
964
965
Status StructColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
966
2.53k
                                           size_t num_rows) {
967
2.53k
    RETURN_IF_ERROR(append_data(ptr, num_rows));
968
2.53k
    RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
969
2.53k
    return Status::OK();
970
2.53k
}
971
972
2.80k
Status StructColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
973
2.80k
    const auto* results = reinterpret_cast<const uint64_t*>(*ptr);
974
21.7k
    for (size_t i = 0; i < _num_sub_column_writers; ++i) {
975
18.9k
        auto nullmap = *(results + _num_sub_column_writers + i);
976
18.9k
        auto data = *(results + i);
977
18.9k
        RETURN_IF_ERROR(_sub_column_writers[i]->append(reinterpret_cast<const uint8_t*>(nullmap),
978
18.9k
                                                       reinterpret_cast<const void*>(data),
979
18.9k
                                                       num_rows));
980
18.9k
    }
981
2.80k
    return Status::OK();
982
2.80k
}
983
984
186
uint64_t StructColumnWriter::estimate_buffer_size() {
985
186
    uint64_t size = 0;
986
780
    for (auto& column_writer : _sub_column_writers) {
987
780
        size += column_writer->estimate_buffer_size();
988
780
    }
989
186
    size += is_nullable() ? _null_writer->estimate_buffer_size() : 0;
990
186
    return size;
991
186
}
992
993
2.84k
Status StructColumnWriter::finish() {
994
19.2k
    for (auto& column_writer : _sub_column_writers) {
995
19.2k
        RETURN_IF_ERROR(column_writer->finish());
996
19.2k
    }
997
2.84k
    if (is_nullable()) {
998
2.57k
        RETURN_IF_ERROR(_null_writer->finish());
999
2.57k
    }
1000
2.84k
    _opts.meta->set_num_rows(get_next_rowid());
1001
2.84k
    return Status::OK();
1002
2.84k
}
1003
1004
2.84k
Status StructColumnWriter::write_data() {
1005
19.2k
    for (auto& column_writer : _sub_column_writers) {
1006
19.2k
        RETURN_IF_ERROR(column_writer->write_data());
1007
19.2k
    }
1008
2.84k
    if (is_nullable()) {
1009
2.57k
        RETURN_IF_ERROR(_null_writer->write_data());
1010
2.57k
    }
1011
2.84k
    return Status::OK();
1012
2.84k
}
1013
1014
2.80k
Status StructColumnWriter::write_ordinal_index() {
1015
18.9k
    for (auto& column_writer : _sub_column_writers) {
1016
18.9k
        RETURN_IF_ERROR(column_writer->write_ordinal_index());
1017
18.9k
    }
1018
2.80k
    if (is_nullable()) {
1019
2.53k
        RETURN_IF_ERROR(_null_writer->write_ordinal_index());
1020
2.53k
    }
1021
2.80k
    return Status::OK();
1022
2.80k
}
1023
1024
0
Status StructColumnWriter::append_nulls(size_t num_rows) {
1025
0
    for (auto& column_writer : _sub_column_writers) {
1026
0
        RETURN_IF_ERROR(column_writer->append_nulls(num_rows));
1027
0
    }
1028
0
    if (is_nullable()) {
1029
0
        std::vector<UInt8> null_signs(num_rows, 1);
1030
0
        const uint8_t* null_sign_ptr = null_signs.data();
1031
0
        RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, num_rows));
1032
0
    }
1033
0
    return Status::OK();
1034
0
}
1035
1036
0
Status StructColumnWriter::finish_current_page() {
1037
0
    return Status::NotSupported("struct writer has no data, can not finish_current_page");
1038
0
}
1039
1040
ArrayColumnWriter::ArrayColumnWriter(const ColumnWriterOptions& opts,
1041
                                     std::unique_ptr<StorageField> field,
1042
                                     OffsetColumnWriter* offset_writer,
1043
                                     ScalarColumnWriter* null_writer,
1044
                                     std::unique_ptr<ColumnWriter> item_writer)
1045
48.3k
        : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta),
1046
48.3k
          _item_writer(std::move(item_writer)),
1047
48.3k
          _opts(opts) {
1048
48.3k
    _offset_writer.reset(offset_writer);
1049
48.3k
    if (is_nullable()) {
1050
32.5k
        _null_writer.reset(null_writer);
1051
32.5k
    }
1052
48.3k
}
1053
1054
48.2k
Status ArrayColumnWriter::init() {
1055
48.2k
    RETURN_IF_ERROR(_offset_writer->init());
1056
48.2k
    if (is_nullable()) {
1057
32.5k
        RETURN_IF_ERROR(_null_writer->init());
1058
32.5k
    }
1059
48.2k
    RETURN_IF_ERROR(_item_writer->init());
1060
48.2k
    if (_opts.need_inverted_index) {
1061
1.80k
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1062
1.80k
        if (writer != nullptr) {
1063
1.80k
            RETURN_IF_ERROR(IndexColumnWriter::create(get_field(), &_inverted_index_writer,
1064
1.80k
                                                      _opts.index_file_writer,
1065
1.80k
                                                      _opts.inverted_indexes[0]));
1066
1.80k
        }
1067
1.80k
    }
1068
48.2k
    if (_opts.need_ann_index) {
1069
60
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1070
60
        if (writer != nullptr) {
1071
60
            _ann_index_writer = std::make_unique<AnnIndexColumnWriter>(_opts.index_file_writer,
1072
60
                                                                       _opts.ann_index);
1073
60
            RETURN_IF_ERROR(_ann_index_writer->init());
1074
60
        }
1075
60
    }
1076
48.2k
    return Status::OK();
1077
48.2k
}
1078
1079
42.1k
Status ArrayColumnWriter::write_inverted_index() {
1080
42.1k
    if (_opts.need_inverted_index) {
1081
1.79k
        return _inverted_index_writer->finish();
1082
1.79k
    }
1083
40.3k
    return Status::OK();
1084
42.1k
}
1085
1086
42.0k
Status ArrayColumnWriter::write_ann_index() {
1087
42.0k
    if (_opts.need_ann_index) {
1088
57
        return _ann_index_writer->finish();
1089
57
    }
1090
42.0k
    return Status::OK();
1091
42.0k
}
1092
1093
// batch append data for array
1094
48.1k
Status ArrayColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
1095
    // data_ptr contains
1096
    // [size, offset_ptr, item_data_ptr, item_nullmap_ptr]
1097
48.1k
    auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
1098
    // total number length
1099
48.1k
    size_t element_cnt = size_t((unsigned long)(*data_ptr));
1100
48.1k
    auto offset_data = *(data_ptr + 1);
1101
48.1k
    const uint8_t* offsets_ptr = (const uint8_t*)offset_data;
1102
48.1k
    auto data = *(data_ptr + 2);
1103
48.1k
    auto nested_null_map = *(data_ptr + 3);
1104
48.1k
    if (element_cnt > 0) {
1105
31.7k
        RETURN_IF_ERROR(_item_writer->append(reinterpret_cast<const uint8_t*>(nested_null_map),
1106
31.7k
                                             reinterpret_cast<const void*>(data), element_cnt));
1107
31.7k
    }
1108
48.1k
    if (_opts.need_inverted_index) {
1109
1.89k
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1110
        // now only support nested type is scala
1111
1.89k
        if (writer != nullptr) {
1112
            //NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr
1113
1.89k
            RETURN_IF_ERROR(_inverted_index_writer->add_array_values(
1114
1.89k
                    _item_writer->get_field()->size(), reinterpret_cast<const void*>(data),
1115
1.89k
                    reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
1116
1.89k
        }
1117
1.89k
    }
1118
1119
48.1k
    if (_opts.need_ann_index) {
1120
60
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1121
        // now only support nested type is scala
1122
60
        if (writer != nullptr) {
1123
            //NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr
1124
60
            RETURN_IF_ERROR(_ann_index_writer->add_array_values(
1125
60
                    _item_writer->get_field()->size(), reinterpret_cast<const void*>(data),
1126
60
                    reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
1127
60
        } else {
1128
0
            return Status::NotSupported(
1129
0
                    "Ann index can only be build on array with scalar type. but got {} as "
1130
0
                    "nested",
1131
0
                    _item_writer->get_field()->type());
1132
0
        }
1133
60
    }
1134
1135
48.1k
    RETURN_IF_ERROR(_offset_writer->append_data(&offsets_ptr, num_rows));
1136
48.1k
    return Status::OK();
1137
48.1k
}
1138
1139
827
uint64_t ArrayColumnWriter::estimate_buffer_size() {
1140
827
    return _offset_writer->estimate_buffer_size() +
1141
827
           (is_nullable() ? _null_writer->estimate_buffer_size() : 0) +
1142
827
           _item_writer->estimate_buffer_size();
1143
827
}
1144
1145
Status ArrayColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
1146
32.2k
                                          size_t num_rows) {
1147
32.2k
    RETURN_IF_ERROR(append_data(ptr, num_rows));
1148
32.2k
    if (is_nullable()) {
1149
32.2k
        if (_opts.need_inverted_index) {
1150
1.49k
            RETURN_IF_ERROR(_inverted_index_writer->add_array_nulls(null_map, num_rows));
1151
1.49k
        }
1152
32.2k
        RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
1153
32.2k
    }
1154
32.2k
    return Status::OK();
1155
32.2k
}
1156
1157
48.4k
Status ArrayColumnWriter::finish() {
1158
48.4k
    RETURN_IF_ERROR(_offset_writer->finish());
1159
48.4k
    if (is_nullable()) {
1160
32.5k
        RETURN_IF_ERROR(_null_writer->finish());
1161
32.5k
    }
1162
48.4k
    RETURN_IF_ERROR(_item_writer->finish());
1163
48.4k
    _opts.meta->set_num_rows(get_next_rowid());
1164
48.4k
    return Status::OK();
1165
48.4k
}
1166
1167
48.4k
Status ArrayColumnWriter::write_data() {
1168
48.4k
    RETURN_IF_ERROR(_offset_writer->write_data());
1169
48.4k
    if (is_nullable()) {
1170
32.5k
        RETURN_IF_ERROR(_null_writer->write_data());
1171
32.5k
    }
1172
48.4k
    RETURN_IF_ERROR(_item_writer->write_data());
1173
48.4k
    return Status::OK();
1174
48.4k
}
1175
1176
47.9k
Status ArrayColumnWriter::write_ordinal_index() {
1177
47.9k
    RETURN_IF_ERROR(_offset_writer->write_ordinal_index());
1178
47.9k
    if (is_nullable()) {
1179
32.0k
        RETURN_IF_ERROR(_null_writer->write_ordinal_index());
1180
32.0k
    }
1181
47.9k
    if (!has_empty_items()) {
1182
31.5k
        RETURN_IF_ERROR(_item_writer->write_ordinal_index());
1183
31.5k
    }
1184
47.9k
    return Status::OK();
1185
47.9k
}
1186
1187
0
Status ArrayColumnWriter::append_nulls(size_t num_rows) {
1188
0
    size_t num_lengths = num_rows;
1189
0
    const ordinal_t offset = _item_writer->get_next_rowid();
1190
0
    while (num_lengths > 0) {
1191
        // TODO llj bulk write
1192
0
        const auto* offset_ptr = reinterpret_cast<const uint8_t*>(&offset);
1193
0
        RETURN_IF_ERROR(_offset_writer->append_data(&offset_ptr, 1));
1194
0
        --num_lengths;
1195
0
    }
1196
0
    return write_null_column(num_rows, true);
1197
0
}
1198
1199
0
Status ArrayColumnWriter::write_null_column(size_t num_rows, bool is_null) {
1200
0
    uint8_t null_sign = is_null ? 1 : 0;
1201
0
    while (is_nullable() && num_rows > 0) {
1202
        // TODO llj bulk write
1203
0
        const uint8_t* null_sign_ptr = &null_sign;
1204
0
        RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, 1));
1205
0
        --num_rows;
1206
0
    }
1207
0
    return Status::OK();
1208
0
}
1209
1210
0
Status ArrayColumnWriter::finish_current_page() {
1211
0
    return Status::NotSupported("array writer has no data, can not finish_current_page");
1212
0
}
1213
1214
/// ============================= MapColumnWriter =====================////
1215
MapColumnWriter::MapColumnWriter(const ColumnWriterOptions& opts,
1216
                                 std::unique_ptr<StorageField> field,
1217
                                 ScalarColumnWriter* null_writer, OffsetColumnWriter* offset_writer,
1218
                                 std::vector<std::unique_ptr<ColumnWriter>>& kv_writers)
1219
21.8k
        : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta), _opts(opts) {
1220
21.8k
    CHECK_EQ(kv_writers.size(), 2);
1221
21.8k
    _offsets_writer.reset(offset_writer);
1222
21.8k
    if (is_nullable()) {
1223
9.31k
        _null_writer.reset(null_writer);
1224
9.31k
    }
1225
43.6k
    for (auto& sub_writers : kv_writers) {
1226
43.6k
        _kv_writers.push_back(std::move(sub_writers));
1227
43.6k
    }
1228
21.8k
}
1229
1230
21.8k
Status MapColumnWriter::init() {
1231
21.8k
    RETURN_IF_ERROR(_offsets_writer->init());
1232
21.8k
    if (is_nullable()) {
1233
9.32k
        RETURN_IF_ERROR(_null_writer->init());
1234
9.32k
    }
1235
    // here register_flush_page_callback to call this.put_extra_info_in_page()
1236
    // when finish cur data page
1237
43.6k
    for (auto& sub_writer : _kv_writers) {
1238
43.6k
        RETURN_IF_ERROR(sub_writer->init());
1239
43.6k
    }
1240
21.8k
    return Status::OK();
1241
21.8k
}
1242
1243
1.16k
uint64_t MapColumnWriter::estimate_buffer_size() {
1244
1.16k
    size_t estimate = 0;
1245
2.32k
    for (auto& sub_writer : _kv_writers) {
1246
2.32k
        estimate += sub_writer->estimate_buffer_size();
1247
2.32k
    }
1248
1.16k
    estimate += _offsets_writer->estimate_buffer_size();
1249
1.16k
    if (is_nullable()) {
1250
1.13k
        estimate += _null_writer->estimate_buffer_size();
1251
1.13k
    }
1252
1.16k
    return estimate;
1253
1.16k
}
1254
1255
21.8k
Status MapColumnWriter::finish() {
1256
21.8k
    RETURN_IF_ERROR(_offsets_writer->finish());
1257
21.8k
    if (is_nullable()) {
1258
9.32k
        RETURN_IF_ERROR(_null_writer->finish());
1259
9.32k
    }
1260
43.6k
    for (auto& sub_writer : _kv_writers) {
1261
43.6k
        RETURN_IF_ERROR(sub_writer->finish());
1262
43.6k
    }
1263
21.8k
    _opts.meta->set_num_rows(get_next_rowid());
1264
21.8k
    return Status::OK();
1265
21.8k
}
1266
1267
Status MapColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
1268
9.34k
                                        size_t num_rows) {
1269
9.34k
    RETURN_IF_ERROR(append_data(ptr, num_rows));
1270
9.34k
    if (is_nullable()) {
1271
9.34k
        RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
1272
9.34k
    }
1273
9.34k
    return Status::OK();
1274
9.34k
}
1275
1276
// write key value data with offsets
1277
21.9k
Status MapColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
1278
    // data_ptr contains
1279
    // [size, offset_ptr, key_data_ptr, val_data_ptr, k_nullmap_ptr, v_nullmap_pr]
1280
    // which converted results from olap_map_convertor and later will use a structure to replace it
1281
21.9k
    auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
1282
    // total number length
1283
21.9k
    size_t element_cnt = size_t((unsigned long)(*data_ptr));
1284
21.9k
    auto offset_data = *(data_ptr + 1);
1285
21.9k
    const uint8_t* offsets_ptr = (const uint8_t*)offset_data;
1286
1287
21.9k
    if (element_cnt > 0) {
1288
39.9k
        for (size_t i = 0; i < 2; ++i) {
1289
26.6k
            auto data = *(data_ptr + 2 + i);
1290
26.6k
            auto nested_null_map = *(data_ptr + 2 + 2 + i);
1291
26.6k
            RETURN_IF_ERROR(
1292
26.6k
                    _kv_writers[i]->append(reinterpret_cast<const uint8_t*>(nested_null_map),
1293
26.6k
                                           reinterpret_cast<const void*>(data), element_cnt));
1294
26.6k
        }
1295
13.3k
    }
1296
    // make sure the order : offset writer flush next_array_item_ordinal after kv_writers append_data
1297
    // because we use _kv_writers[0]->get_next_rowid() to set next_array_item_ordinal in offset page footer
1298
21.9k
    RETURN_IF_ERROR(_offsets_writer->append_data(&offsets_ptr, num_rows));
1299
21.9k
    return Status::OK();
1300
21.9k
}
1301
1302
21.8k
Status MapColumnWriter::write_data() {
1303
21.8k
    RETURN_IF_ERROR(_offsets_writer->write_data());
1304
21.8k
    if (is_nullable()) {
1305
9.32k
        RETURN_IF_ERROR(_null_writer->write_data());
1306
9.32k
    }
1307
43.6k
    for (auto& sub_writer : _kv_writers) {
1308
43.6k
        RETURN_IF_ERROR(sub_writer->write_data());
1309
43.6k
    }
1310
21.8k
    return Status::OK();
1311
21.8k
}
1312
1313
21.5k
Status MapColumnWriter::write_ordinal_index() {
1314
21.5k
    RETURN_IF_ERROR(_offsets_writer->write_ordinal_index());
1315
21.5k
    if (is_nullable()) {
1316
9.06k
        RETURN_IF_ERROR(_null_writer->write_ordinal_index());
1317
9.06k
    }
1318
43.1k
    for (auto& sub_writer : _kv_writers) {
1319
43.1k
        if (sub_writer->get_next_rowid() != 0) {
1320
25.9k
            RETURN_IF_ERROR(sub_writer->write_ordinal_index());
1321
25.9k
        }
1322
43.1k
    }
1323
21.5k
    return Status::OK();
1324
21.5k
}
1325
1326
0
Status MapColumnWriter::append_nulls(size_t num_rows) {
1327
0
    for (auto& sub_writer : _kv_writers) {
1328
0
        RETURN_IF_ERROR(sub_writer->append_nulls(num_rows));
1329
0
    }
1330
0
    const ordinal_t offset = _kv_writers[0]->get_next_rowid();
1331
0
    std::vector<UInt8> offsets_data(num_rows, cast_set<uint8_t>(offset));
1332
0
    const uint8_t* offsets_ptr = offsets_data.data();
1333
0
    RETURN_IF_ERROR(_offsets_writer->append_data(&offsets_ptr, num_rows));
1334
1335
0
    if (is_nullable()) {
1336
0
        std::vector<UInt8> null_signs(num_rows, 1);
1337
0
        const uint8_t* null_sign_ptr = null_signs.data();
1338
0
        RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, num_rows));
1339
0
    }
1340
0
    return Status::OK();
1341
0
}
1342
1343
0
Status MapColumnWriter::finish_current_page() {
1344
0
    return Status::NotSupported("map writer has no data, can not finish_current_page");
1345
0
}
1346
1347
10.6k
Status MapColumnWriter::write_inverted_index() {
1348
10.6k
    if (_opts.need_inverted_index) {
1349
0
        return _index_builder->finish();
1350
0
    }
1351
10.6k
    return Status::OK();
1352
10.6k
}
1353
1354
VariantColumnWriter::VariantColumnWriter(const ColumnWriterOptions& opts,
1355
                                         const TabletColumn* column,
1356
                                         std::unique_ptr<StorageField> field)
1357
6.00k
        : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta) {
1358
6.00k
    _impl = std::make_unique<VariantColumnWriterImpl>(opts, column);
1359
6.00k
}
1360
1361
5.99k
Status VariantColumnWriter::init() {
1362
5.99k
    return _impl->init();
1363
5.99k
}
1364
1365
664
Status VariantColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
1366
664
    _next_rowid += num_rows;
1367
664
    return _impl->append_data(ptr, num_rows);
1368
664
}
1369
1370
859
uint64_t VariantColumnWriter::estimate_buffer_size() {
1371
859
    return _impl->estimate_buffer_size();
1372
859
}
1373
1374
6.02k
Status VariantColumnWriter::finish() {
1375
6.02k
    return _impl->finish();
1376
6.02k
}
1377
6.03k
Status VariantColumnWriter::write_data() {
1378
6.03k
    return _impl->write_data();
1379
6.03k
}
1380
6.02k
Status VariantColumnWriter::write_ordinal_index() {
1381
6.02k
    return _impl->write_ordinal_index();
1382
6.02k
}
1383
1384
6.02k
Status VariantColumnWriter::write_zone_map() {
1385
6.02k
    return _impl->write_zone_map();
1386
6.02k
}
1387
1388
6.01k
Status VariantColumnWriter::write_inverted_index() {
1389
6.01k
    return _impl->write_inverted_index();
1390
6.01k
}
1391
6.00k
Status VariantColumnWriter::write_bloom_filter_index() {
1392
6.00k
    return _impl->write_bloom_filter_index();
1393
6.00k
}
1394
1395
Status VariantColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
1396
5.56k
                                            size_t num_rows) {
1397
5.56k
    return _impl->append_nullable(null_map, ptr, num_rows);
1398
5.56k
}
1399
1400
} // namespace doris::segment_v2