Coverage Report

Created: 2026-04-07 10:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/column_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/column_writer.h"
19
20
#include <gen_cpp/segment_v2.pb.h>
21
22
#include <algorithm>
23
#include <cstring>
24
#include <filesystem>
25
#include <memory>
26
27
#include "common/config.h"
28
#include "common/logging.h"
29
#include "core/data_type/data_type_agg_state.h"
30
#include "core/data_type/data_type_factory.hpp"
31
#include "core/types.h"
32
#include "io/fs/file_writer.h"
33
#include "runtime/collection_value.h"
34
#include "storage/field.h"
35
#include "storage/index/bloom_filter/bloom_filter_index_writer.h"
36
#include "storage/index/inverted/inverted_index_writer.h"
37
#include "storage/index/ordinal_page_index.h"
38
#include "storage/index/zone_map/zone_map_index.h"
39
#include "storage/olap_common.h"
40
#include "storage/segment/encoding_info.h"
41
#include "storage/segment/options.h"
42
#include "storage/segment/page_builder.h"
43
#include "storage/segment/page_io.h"
44
#include "storage/segment/page_pointer.h"
45
#include "storage/segment/variant/variant_column_writer_impl.h"
46
#include "storage/tablet/tablet_schema.h"
47
#include "storage/types.h"
48
#include "util/block_compression.h"
49
#include "util/debug_points.h"
50
#include "util/faststring.h"
51
#include "util/rle_encoding.h"
52
#include "util/simd/bits.h"
53
54
namespace doris::segment_v2 {
55
#include "common/compile_check_begin.h"
56
57
class NullBitmapBuilder {
58
public:
59
8.39k
    NullBitmapBuilder() : _has_null(false), _bitmap_buf(512), _rle_encoder(&_bitmap_buf, 1) {}
60
61
    explicit NullBitmapBuilder(size_t reserve_bits)
62
            : _has_null(false),
63
              _bitmap_buf(BitmapSize(reserve_bits)),
64
0
              _rle_encoder(&_bitmap_buf, 1) {}
65
66
7.25k
    void reserve_for_write(size_t num_rows, size_t non_null_count) {
67
7.25k
        if (num_rows == 0) {
68
0
            return;
69
0
        }
70
7.25k
        if (non_null_count == 0 || (non_null_count == num_rows && !_has_null)) {
71
5.31k
            if (_bitmap_buf.capacity() < kSmallReserveBytes) {
72
0
                _bitmap_buf.reserve(kSmallReserveBytes);
73
0
            }
74
5.31k
            return;
75
5.31k
        }
76
1.93k
        size_t raw_bytes = BitmapSize(num_rows);
77
1.93k
        size_t run_est = std::min(num_rows, non_null_count * 2 + 1);
78
1.93k
        size_t run_bytes_est = run_est * kBytesPerRun + kReserveSlackBytes;
79
1.93k
        size_t raw_overhead = raw_bytes / 63 + 1;
80
1.93k
        size_t raw_est = raw_bytes + raw_overhead + kReserveSlackBytes;
81
1.93k
        size_t reserve_bytes = std::min(raw_est, run_bytes_est);
82
1.93k
        if (_bitmap_buf.capacity() < reserve_bytes) {
83
0
            const size_t cap = _bitmap_buf.capacity();
84
0
            const size_t grow = cap + cap / 2;
85
0
            const size_t new_cap = std::max(reserve_bytes, grow);
86
0
            _bitmap_buf.reserve(new_cap);
87
0
        }
88
1.93k
    }
89
90
721k
    void add_run(bool value, size_t run) {
91
721k
        _has_null |= value;
92
721k
        _rle_encoder.Put(value, run);
93
721k
    }
94
95
    // Returns whether the building nullmap contains nullptr
96
11.8k
    bool has_null() const { return _has_null; }
97
98
1.43k
    Status finish(OwnedSlice* slice) {
99
1.43k
        _rle_encoder.Flush();
100
1.43k
        RETURN_IF_CATCH_EXCEPTION({ *slice = _bitmap_buf.build(); });
101
1.43k
        return Status::OK();
102
1.43k
    }
103
104
11.8k
    void reset() {
105
11.8k
        _has_null = false;
106
11.8k
        _rle_encoder.Clear();
107
11.8k
    }
108
109
4.72k
    uint64_t size() { return _bitmap_buf.size(); }
110
111
private:
112
    static constexpr size_t kSmallReserveBytes = 64;
113
    static constexpr size_t kReserveSlackBytes = 16;
114
    static constexpr size_t kBytesPerRun = 6;
115
116
    bool _has_null;
117
    faststring _bitmap_buf;
118
    RleEncoder<bool> _rle_encoder;
119
};
120
121
inline ScalarColumnWriter* get_null_writer(const ColumnWriterOptions& opts,
122
424
                                           io::FileWriter* file_writer, uint32_t id) {
123
424
    if (!opts.meta->is_nullable()) {
124
329
        return nullptr;
125
329
    }
126
127
95
    FieldType null_type = FieldType::OLAP_FIELD_TYPE_TINYINT;
128
95
    ColumnWriterOptions null_options;
129
95
    null_options.meta = opts.meta->add_children_columns();
130
95
    null_options.meta->set_column_id(id);
131
95
    null_options.meta->set_unique_id(id);
132
95
    null_options.meta->set_type(int(null_type));
133
95
    null_options.meta->set_is_nullable(false);
134
95
    null_options.meta->set_length(
135
95
            cast_set<int32_t>(get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_TINYINT>()->size()));
136
95
    null_options.meta->set_encoding(DEFAULT_ENCODING);
137
95
    null_options.meta->set_compression(opts.meta->compression());
138
139
95
    null_options.need_zone_map = false;
140
95
    null_options.need_bloom_filter = false;
141
95
    null_options.encoding_preference = opts.encoding_preference;
142
143
95
    TabletColumn null_column =
144
95
            TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, null_type, false,
145
95
                         null_options.meta->unique_id(), null_options.meta->length());
146
95
    null_column.set_name("nullable");
147
95
    null_column.set_index_length(-1); // no short key index
148
95
    std::unique_ptr<StorageField> null_field(StorageFieldFactory::create(null_column));
149
95
    return new ScalarColumnWriter(null_options, std::move(null_field), file_writer);
150
424
}
151
152
ColumnWriter::ColumnWriter(std::unique_ptr<StorageField> field, bool is_nullable,
153
                           ColumnMetaPB* meta)
154
23.0k
        : _field(std::move(field)), _is_nullable(is_nullable), _column_meta(meta) {
155
23.0k
    _data_type = DataTypeFactory::instance().create_data_type(*_column_meta);
156
23.0k
}
157
Status ColumnWriter::create_struct_writer(const ColumnWriterOptions& opts,
158
                                          const TabletColumn* column, io::FileWriter* file_writer,
159
0
                                          std::unique_ptr<ColumnWriter>* writer) {
160
    // not support empty struct
161
0
    DCHECK(column->get_subtype_count() >= 1);
162
0
    std::vector<std::unique_ptr<ColumnWriter>> sub_column_writers;
163
0
    sub_column_writers.reserve(column->get_subtype_count());
164
0
    for (uint32_t i = 0; i < column->get_subtype_count(); i++) {
165
0
        const TabletColumn& sub_column = column->get_sub_column(i);
166
0
        RETURN_IF_ERROR(sub_column.check_valid());
167
168
        // create sub writer
169
0
        ColumnWriterOptions column_options;
170
0
        column_options.meta = opts.meta->mutable_children_columns(i);
171
0
        column_options.need_zone_map = false;
172
0
        column_options.need_bloom_filter = sub_column.is_bf_column();
173
0
        column_options.encoding_preference = opts.encoding_preference;
174
0
        std::unique_ptr<ColumnWriter> sub_column_writer;
175
0
        RETURN_IF_ERROR(
176
0
                ColumnWriter::create(column_options, &sub_column, file_writer, &sub_column_writer));
177
0
        sub_column_writers.push_back(std::move(sub_column_writer));
178
0
    }
179
180
0
    ScalarColumnWriter* null_writer =
181
0
            get_null_writer(opts, file_writer, column->get_subtype_count() + 1);
182
183
0
    *writer = std::unique_ptr<ColumnWriter>(new StructColumnWriter(
184
0
            opts, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)), null_writer,
185
0
            sub_column_writers));
186
0
    return Status::OK();
187
0
}
188
189
Status ColumnWriter::create_array_writer(const ColumnWriterOptions& opts,
190
                                         const TabletColumn* column, io::FileWriter* file_writer,
191
42
                                         std::unique_ptr<ColumnWriter>* writer) {
192
42
    DCHECK(column->get_subtype_count() == 1);
193
42
    const TabletColumn& item_column = column->get_sub_column(0);
194
42
    RETURN_IF_ERROR(item_column.check_valid());
195
196
    // create item writer
197
42
    ColumnWriterOptions item_options;
198
42
    item_options.meta = opts.meta->mutable_children_columns(0);
199
42
    item_options.need_zone_map = false;
200
42
    item_options.need_bloom_filter = item_column.is_bf_column();
201
42
    item_options.encoding_preference = opts.encoding_preference;
202
42
    std::unique_ptr<ColumnWriter> item_writer;
203
42
    RETURN_IF_ERROR(ColumnWriter::create(item_options, &item_column, file_writer, &item_writer));
204
205
    // create length writer
206
42
    FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT;
207
208
42
    ColumnWriterOptions length_options;
209
42
    length_options.meta = opts.meta->add_children_columns();
210
42
    length_options.meta->set_column_id(2);
211
42
    length_options.meta->set_unique_id(2);
212
42
    length_options.meta->set_type(int(length_type));
213
42
    length_options.meta->set_is_nullable(false);
214
42
    length_options.meta->set_length(cast_set<int32_t>(
215
42
            get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>()->size()));
216
42
    length_options.meta->set_encoding(DEFAULT_ENCODING);
217
42
    length_options.meta->set_compression(opts.meta->compression());
218
219
42
    length_options.need_zone_map = false;
220
42
    length_options.need_bloom_filter = false;
221
42
    length_options.encoding_preference = opts.encoding_preference;
222
223
42
    TabletColumn length_column =
224
42
            TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, length_type,
225
42
                         length_options.meta->is_nullable(), length_options.meta->unique_id(),
226
42
                         length_options.meta->length());
227
42
    length_column.set_name("length");
228
42
    length_column.set_index_length(-1); // no short key index
229
42
    std::unique_ptr<StorageField> bigint_field(StorageFieldFactory::create(length_column));
230
42
    auto* length_writer =
231
42
            new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer);
232
233
42
    ScalarColumnWriter* null_writer = get_null_writer(opts, file_writer, 3);
234
235
42
    *writer = std::unique_ptr<ColumnWriter>(new ArrayColumnWriter(
236
42
            opts, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)),
237
42
            length_writer, null_writer, std::move(item_writer)));
238
42
    return Status::OK();
239
42
}
240
241
Status ColumnWriter::create_map_writer(const ColumnWriterOptions& opts, const TabletColumn* column,
242
                                       io::FileWriter* file_writer,
243
382
                                       std::unique_ptr<ColumnWriter>* writer) {
244
382
    DCHECK(column->get_subtype_count() == 2);
245
382
    if (column->get_subtype_count() < 2) {
246
0
        return Status::InternalError(
247
0
                "If you upgraded from version 1.2.*, please DROP the MAP columns and then "
248
0
                "ADD the MAP columns back.");
249
0
    }
250
    // create key & value writer
251
382
    std::vector<std::unique_ptr<ColumnWriter>> inner_writer_list;
252
1.14k
    for (int i = 0; i < 2; ++i) {
253
764
        const TabletColumn& item_column = column->get_sub_column(i);
254
764
        RETURN_IF_ERROR(item_column.check_valid());
255
256
        // create item writer
257
764
        ColumnWriterOptions item_options;
258
764
        item_options.meta = opts.meta->mutable_children_columns(i);
259
764
        item_options.need_zone_map = false;
260
764
        item_options.need_bloom_filter = item_column.is_bf_column();
261
764
        item_options.encoding_preference = opts.encoding_preference;
262
764
        std::unique_ptr<ColumnWriter> item_writer;
263
764
        RETURN_IF_ERROR(
264
764
                ColumnWriter::create(item_options, &item_column, file_writer, &item_writer));
265
764
        inner_writer_list.push_back(std::move(item_writer));
266
764
    }
267
268
    // create offset writer
269
382
    FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT;
270
271
    // Be Cautious: column unique id is used for column reader creation
272
382
    ColumnWriterOptions length_options;
273
382
    length_options.meta = opts.meta->add_children_columns();
274
382
    length_options.meta->set_column_id(column->get_subtype_count() + 1);
275
382
    length_options.meta->set_unique_id(column->get_subtype_count() + 1);
276
382
    length_options.meta->set_type(int(length_type));
277
382
    length_options.meta->set_is_nullable(false);
278
382
    length_options.meta->set_length(cast_set<int32_t>(
279
382
            get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>()->size()));
280
382
    length_options.meta->set_encoding(DEFAULT_ENCODING);
281
382
    length_options.meta->set_compression(opts.meta->compression());
282
283
382
    length_options.need_zone_map = false;
284
382
    length_options.need_bloom_filter = false;
285
382
    length_options.encoding_preference = opts.encoding_preference;
286
287
382
    TabletColumn length_column =
288
382
            TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, length_type,
289
382
                         length_options.meta->is_nullable(), length_options.meta->unique_id(),
290
382
                         length_options.meta->length());
291
382
    length_column.set_name("length");
292
382
    length_column.set_index_length(-1); // no short key index
293
382
    std::unique_ptr<StorageField> bigint_field(StorageFieldFactory::create(length_column));
294
382
    auto* length_writer =
295
382
            new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer);
296
297
382
    ScalarColumnWriter* null_writer =
298
382
            get_null_writer(opts, file_writer, column->get_subtype_count() + 2);
299
300
382
    *writer = std::unique_ptr<ColumnWriter>(new MapColumnWriter(
301
382
            opts, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)), null_writer,
302
382
            length_writer, inner_writer_list));
303
304
382
    return Status::OK();
305
382
}
306
307
Status ColumnWriter::create_agg_state_writer(const ColumnWriterOptions& opts,
308
                                             const TabletColumn* column,
309
                                             io::FileWriter* file_writer,
310
0
                                             std::unique_ptr<ColumnWriter>* writer) {
311
0
    auto data_type = DataTypeFactory::instance().create_data_type(*column);
312
0
    const auto* agg_state_type = assert_cast<const DataTypeAggState*>(data_type.get());
313
0
    auto type = agg_state_type->get_serialized_type()->get_primitive_type();
314
0
    if (type == PrimitiveType::TYPE_STRING || type == PrimitiveType::INVALID_TYPE ||
315
0
        type == PrimitiveType::TYPE_FIXED_LENGTH_OBJECT || type == PrimitiveType::TYPE_BITMAP) {
316
0
        *writer = std::unique_ptr<ColumnWriter>(new ScalarColumnWriter(
317
0
                opts, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)),
318
0
                file_writer));
319
0
    } else if (type == PrimitiveType::TYPE_ARRAY) {
320
0
        RETURN_IF_ERROR(create_array_writer(opts, column, file_writer, writer));
321
0
    } else if (type == PrimitiveType::TYPE_MAP) {
322
0
        RETURN_IF_ERROR(create_map_writer(opts, column, file_writer, writer));
323
0
    } else {
324
0
        throw Exception(ErrorCode::INTERNAL_ERROR,
325
0
                        "OLAP_FIELD_TYPE_AGG_STATE meet unsupported type: {}",
326
0
                        agg_state_type->get_name());
327
0
    }
328
0
    return Status::OK();
329
0
}
330
331
Status ColumnWriter::create_variant_writer(const ColumnWriterOptions& opts,
332
                                           const TabletColumn* column, io::FileWriter* file_writer,
333
325
                                           std::unique_ptr<ColumnWriter>* writer) {
334
    // Variant extracted columns have two kinds of physical writers:
335
    // - Doc-value snapshot column (`...__DORIS_VARIANT_DOC_VALUE__...`): use `VariantDocCompactWriter`
336
    //   to store the doc snapshot in a compact binary form.
337
    // - Regular extracted subcolumns: use `VariantSubcolumnWriter`.
338
    // The root VARIANT column itself uses `VariantColumnWriter`.
339
325
    if (column->is_extracted_column()) {
340
4
        if (column->name().find(DOC_VALUE_COLUMN_PATH) != std::string::npos) {
341
3
            *writer = std::make_unique<VariantDocCompactWriter>(
342
3
                    opts, column,
343
3
                    std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)));
344
3
            return Status::OK();
345
3
        }
346
1
        VLOG_DEBUG << "gen subwriter for " << column->path_info_ptr()->get_path();
347
1
        *writer = std::make_unique<VariantSubcolumnWriter>(
348
1
                opts, column, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)));
349
1
        return Status::OK();
350
4
    }
351
321
    *writer = std::make_unique<VariantColumnWriter>(
352
321
            opts, column, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)));
353
321
    return Status::OK();
354
325
}
355
356
//Todo(Amory): here should according nullable and offset and need sub to simply this function
357
Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* column,
358
21.8k
                            io::FileWriter* file_writer, std::unique_ptr<ColumnWriter>* writer) {
359
21.8k
    std::unique_ptr<StorageField> field(StorageFieldFactory::create(*column));
360
21.8k
    DCHECK(field.get() != nullptr);
361
21.8k
    if (is_scalar_type(column->type())) {
362
21.4k
        *writer = std::unique_ptr<ColumnWriter>(
363
21.4k
                new ScalarColumnWriter(opts, std::move(field), file_writer));
364
21.4k
        return Status::OK();
365
21.4k
    } else {
366
438
        switch (column->type()) {
367
0
        case FieldType::OLAP_FIELD_TYPE_AGG_STATE: {
368
0
            RETURN_IF_ERROR(create_agg_state_writer(opts, column, file_writer, writer));
369
0
            return Status::OK();
370
0
        }
371
0
        case FieldType::OLAP_FIELD_TYPE_STRUCT: {
372
0
            RETURN_IF_ERROR(create_struct_writer(opts, column, file_writer, writer));
373
0
            return Status::OK();
374
0
        }
375
42
        case FieldType::OLAP_FIELD_TYPE_ARRAY: {
376
42
            RETURN_IF_ERROR(create_array_writer(opts, column, file_writer, writer));
377
42
            return Status::OK();
378
42
        }
379
56
        case FieldType::OLAP_FIELD_TYPE_MAP: {
380
56
            RETURN_IF_ERROR(create_map_writer(opts, column, file_writer, writer));
381
56
            return Status::OK();
382
56
        }
383
324
        case FieldType::OLAP_FIELD_TYPE_VARIANT: {
384
            // Process columns with sparse column
385
324
            RETURN_IF_ERROR(create_variant_writer(opts, column, file_writer, writer));
386
324
            return Status::OK();
387
324
        }
388
0
        default:
389
0
            return Status::NotSupported("unsupported type for ColumnWriter: {}",
390
0
                                        std::to_string(int(field->type())));
391
438
        }
392
438
    }
393
21.8k
}
394
395
Status ColumnWriter::append_nullable(const uint8_t* is_null_bits, const void* data,
396
651k
                                     size_t num_rows) {
397
651k
    const auto* ptr = (const uint8_t*)data;
398
651k
    BitmapIterator null_iter(is_null_bits, num_rows);
399
651k
    bool is_null = false;
400
651k
    size_t this_run = 0;
401
1.30M
    while ((this_run = null_iter.Next(&is_null)) > 0) {
402
651k
        if (is_null) {
403
0
            RETURN_IF_ERROR(append_nulls(this_run));
404
651k
        } else {
405
651k
            RETURN_IF_ERROR(append_data(&ptr, this_run));
406
651k
        }
407
651k
    }
408
651k
    return Status::OK();
409
651k
}
410
411
Status ColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
412
0
                                     size_t num_rows) {
413
    // Fast path: use SIMD to detect all-NULL or all-non-NULL columns
414
0
    if (config::enable_rle_batch_put_optimization) {
415
0
        size_t non_null_count =
416
0
                simd::count_zero_num(reinterpret_cast<const int8_t*>(null_map), num_rows);
417
418
0
        if (non_null_count == 0) {
419
            // All NULL: skip run-length iteration, directly append all nulls
420
0
            RETURN_IF_ERROR(append_nulls(num_rows));
421
0
            *ptr += get_field()->size() * num_rows;
422
0
            return Status::OK();
423
0
        }
424
425
0
        if (non_null_count == num_rows) {
426
            // All non-NULL: skip run-length iteration, directly append all data
427
0
            return append_data(ptr, num_rows);
428
0
        }
429
0
    }
430
431
    // Mixed case or sparse optimization disabled: use run-length processing
432
0
    size_t offset = 0;
433
0
    auto next_run_step = [&]() {
434
0
        size_t step = 1;
435
0
        for (auto i = offset + 1; i < num_rows; ++i) {
436
0
            if (null_map[offset] == null_map[i]) {
437
0
                step++;
438
0
            } else {
439
0
                break;
440
0
            }
441
0
        }
442
0
        return step;
443
0
    };
444
445
0
    do {
446
0
        auto step = next_run_step();
447
0
        if (null_map[offset]) {
448
0
            RETURN_IF_ERROR(append_nulls(step));
449
0
            *ptr += get_field()->size() * step;
450
0
        } else {
451
            // TODO:
452
            //  1. `*ptr += get_field()->size() * step;` should do in this function, not append_data;
453
            //  2. support array vectorized load and ptr offset add
454
0
            RETURN_IF_ERROR(append_data(ptr, step));
455
0
        }
456
0
        offset += step;
457
0
    } while (offset < num_rows);
458
459
0
    return Status::OK();
460
0
}
461
462
24.4k
Status ColumnWriter::append(const uint8_t* nullmap, const void* data, size_t num_rows) {
463
24.4k
    assert(data && num_rows > 0);
464
24.4k
    const auto* ptr = (const uint8_t*)data;
465
24.4k
    if (nullmap) {
466
7.37k
        return append_nullable(nullmap, &ptr, num_rows);
467
17.1k
    } else {
468
17.1k
        return append_data(&ptr, num_rows);
469
17.1k
    }
470
24.4k
}
471
472
///////////////////////////////////////////////////////////////////////////////////
473
474
ScalarColumnWriter::ScalarColumnWriter(const ColumnWriterOptions& opts,
475
                                       std::unique_ptr<StorageField> field,
476
                                       io::FileWriter* file_writer)
477
22.2k
        : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta),
478
22.2k
          _opts(opts),
479
22.2k
          _file_writer(file_writer),
480
22.2k
          _data_size(0) {
481
    // these opts.meta fields should be set by client
482
22.2k
    DCHECK(opts.meta->has_column_id());
483
22.2k
    DCHECK(opts.meta->has_unique_id());
484
22.2k
    DCHECK(opts.meta->has_type());
485
22.2k
    DCHECK(opts.meta->has_length());
486
22.2k
    DCHECK(opts.meta->has_encoding());
487
22.2k
    DCHECK(opts.meta->has_compression());
488
22.2k
    DCHECK(opts.meta->has_is_nullable());
489
22.2k
    DCHECK(file_writer != nullptr);
490
22.2k
    _inverted_index_builders.resize(_opts.inverted_indexes.size());
491
22.2k
}
492
493
22.2k
ScalarColumnWriter::~ScalarColumnWriter() {
494
    // delete all pages
495
22.2k
    _pages.clear();
496
22.2k
}
497
498
22.2k
Status ScalarColumnWriter::init() {
499
22.2k
    RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec));
500
501
22.2k
    PageBuilder* page_builder = nullptr;
502
503
22.2k
    RETURN_IF_ERROR(EncodingInfo::get(get_field()->type(), _opts.meta->encoding(),
504
22.2k
                                      _opts.encoding_preference, &_encoding_info));
505
22.2k
    _opts.meta->set_encoding(_encoding_info->encoding());
506
    // create page builder
507
22.2k
    PageBuilderOptions opts;
508
22.2k
    opts.data_page_size = _opts.data_page_size;
509
22.2k
    opts.dict_page_size = _opts.dict_page_size;
510
22.2k
    opts.encoding_preference = _opts.encoding_preference;
511
22.2k
    RETURN_IF_ERROR(_encoding_info->create_page_builder(opts, &page_builder));
512
22.2k
    if (page_builder == nullptr) {
513
0
        return Status::NotSupported("Failed to create page builder for type {} and encoding {}",
514
0
                                    get_field()->type(), _opts.meta->encoding());
515
0
    }
516
    // should store more concrete encoding type instead of DEFAULT_ENCODING
517
    // because the default encoding of a data type can be changed in the future
518
22.2k
    DCHECK_NE(_opts.meta->encoding(), DEFAULT_ENCODING);
519
18.4E
    VLOG_DEBUG << fmt::format(
520
18.4E
            "[verbose] scalar column writer init, column_id={}, type={}, encoding={}, "
521
18.4E
            "is_nullable={}",
522
18.4E
            _opts.meta->column_id(), get_field()->type(),
523
18.4E
            EncodingTypePB_Name(_opts.meta->encoding()), _opts.meta->is_nullable());
524
22.2k
    _page_builder.reset(page_builder);
525
    // create ordinal builder
526
22.2k
    _ordinal_index_builder = std::make_unique<OrdinalIndexWriter>();
527
    // create null bitmap builder
528
22.2k
    if (is_nullable()) {
529
8.39k
        _null_bitmap_builder = std::make_unique<NullBitmapBuilder>();
530
8.39k
    }
531
22.2k
    if (_opts.need_zone_map) {
532
19.7k
        RETURN_IF_ERROR(
533
19.7k
                ZoneMapIndexWriter::create(_data_type, get_field(), _zone_map_index_builder));
534
19.7k
    }
535
536
22.2k
    if (_opts.need_inverted_index) {
537
2.18k
        do {
538
4.38k
            for (size_t i = 0; i < _opts.inverted_indexes.size(); i++) {
539
2.19k
                DBUG_EXECUTE_IF("column_writer.init", {
540
2.19k
                    class InvertedIndexColumnWriterEmpty final : public IndexColumnWriter {
541
2.19k
                    public:
542
2.19k
                        Status init() override { return Status::OK(); }
543
2.19k
                        Status add_values(const std::string name, const void* values,
544
2.19k
                                          size_t count) override {
545
2.19k
                            return Status::OK();
546
2.19k
                        }
547
2.19k
                        Status add_array_values(size_t field_size, const CollectionValue* values,
548
2.19k
                                                size_t count) override {
549
2.19k
                            return Status::OK();
550
2.19k
                        }
551
2.19k
                        Status add_array_values(size_t field_size, const void* value_ptr,
552
2.19k
                                                const uint8_t* null_map, const uint8_t* offsets_ptr,
553
2.19k
                                                size_t count) override {
554
2.19k
                            return Status::OK();
555
2.19k
                        }
556
2.19k
                        Status add_nulls(uint32_t count) override { return Status::OK(); }
557
2.19k
                        Status add_array_nulls(const uint8_t* null_map, size_t num_rows) override {
558
2.19k
                            return Status::OK();
559
2.19k
                        }
560
2.19k
                        Status finish() override { return Status::OK(); }
561
2.19k
                        int64_t size() const override { return 0; }
562
2.19k
                        void close_on_error() override {}
563
2.19k
                    };
564
565
2.19k
                    _inverted_index_builders[i] =
566
2.19k
                            std::make_unique<InvertedIndexColumnWriterEmpty>();
567
568
2.19k
                    break;
569
2.19k
                });
570
571
2.19k
                RETURN_IF_ERROR(IndexColumnWriter::create(get_field(), &_inverted_index_builders[i],
572
2.19k
                                                          _opts.index_file_writer,
573
2.19k
                                                          _opts.inverted_indexes[i]));
574
2.19k
            }
575
2.18k
        } while (false);
576
2.18k
    }
577
22.2k
    if (_opts.need_bloom_filter) {
578
7
        if (_opts.is_ngram_bf_index) {
579
0
            RETURN_IF_ERROR(NGramBloomFilterIndexWriterImpl::create(
580
0
                    BloomFilterOptions(), get_field()->type_info(), _opts.gram_size,
581
0
                    _opts.gram_bf_size, &_bloom_filter_index_builder));
582
7
        } else {
583
7
            RETURN_IF_ERROR(BloomFilterIndexWriter::create(
584
7
                    _opts.bf_options, get_field()->type_info(), &_bloom_filter_index_builder));
585
7
        }
586
7
    }
587
22.2k
    return Status::OK();
588
22.2k
}
589
590
31.0k
Status ScalarColumnWriter::append_nulls(size_t num_rows) {
591
31.0k
    _null_bitmap_builder->add_run(true, num_rows);
592
31.0k
    _next_rowid += num_rows;
593
31.0k
    if (_opts.need_zone_map) {
594
24.7k
        _zone_map_index_builder->add_nulls(cast_set<uint32_t>(num_rows));
595
24.7k
    }
596
31.0k
    if (_opts.need_inverted_index) {
597
2
        for (const auto& builder : _inverted_index_builders) {
598
2
            RETURN_IF_ERROR(builder->add_nulls(cast_set<uint32_t>(num_rows)));
599
2
        }
600
2
    }
601
31.0k
    if (_opts.need_bloom_filter) {
602
0
        _bloom_filter_index_builder->add_nulls(cast_set<uint32_t>(num_rows));
603
0
    }
604
31.0k
    return Status::OK();
605
31.0k
}
606
607
// append data to page builder. this function will make sure that
608
// num_rows must be written before return. And ptr will be modified
609
// to next data should be written
610
703k
Status ScalarColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
611
703k
    size_t remaining = num_rows;
612
1.41M
    while (remaining > 0) {
613
707k
        size_t num_written = remaining;
614
707k
        RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written));
615
616
707k
        remaining -= num_written;
617
618
707k
        if (_page_builder->is_page_full()) {
619
4.09k
            RETURN_IF_ERROR(finish_current_page());
620
4.09k
        }
621
707k
    }
622
703k
    return Status::OK();
623
703k
}
624
625
Status ScalarColumnWriter::_internal_append_data_in_current_page(const uint8_t* data,
626
708k
                                                                 size_t* num_written) {
627
708k
    RETURN_IF_ERROR(_page_builder->add(data, num_written));
628
708k
    if (_opts.need_zone_map) {
629
699k
        _zone_map_index_builder->add_values(data, *num_written);
630
699k
    }
631
708k
    if (_opts.need_inverted_index) {
632
4.48k
        for (const auto& builder : _inverted_index_builders) {
633
4.48k
            RETURN_IF_ERROR(builder->add_values(get_field()->name(), data, *num_written));
634
4.48k
        }
635
4.47k
    }
636
708k
    if (_opts.need_bloom_filter) {
637
7
        RETURN_IF_ERROR(_bloom_filter_index_builder->add_values(data, *num_written));
638
7
    }
639
640
708k
    _next_rowid += *num_written;
641
642
    // we must write null bits after write data, because we don't
643
    // know how many rows can be written into current page
644
708k
    if (is_nullable()) {
645
690k
        _null_bitmap_builder->add_run(false, *num_written);
646
690k
    }
647
708k
    return Status::OK();
648
708k
}
649
650
708k
Status ScalarColumnWriter::append_data_in_current_page(const uint8_t** data, size_t* num_written) {
651
708k
    RETURN_IF_ERROR(append_data_in_current_page(*data, num_written));
652
708k
    *data += get_field()->size() * (*num_written);
653
708k
    return Status::OK();
654
708k
}
655
656
Status ScalarColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
657
7.28k
                                           size_t num_rows) {
658
    // When optimization is disabled, use base class implementation
659
7.28k
    if (!config::enable_rle_batch_put_optimization) {
660
0
        return ColumnWriter::append_nullable(null_map, ptr, num_rows);
661
0
    }
662
663
7.28k
    if (UNLIKELY(num_rows == 0)) {
664
0
        return Status::OK();
665
0
    }
666
667
    // Build run-length encoded null runs using memchr for fast boundary detection
668
7.28k
    _null_run_buffer.clear();
669
7.28k
    if (_null_run_buffer.capacity() < num_rows) {
670
6.33k
        _null_run_buffer.reserve(std::min(num_rows, size_t(256)));
671
6.33k
    }
672
673
7.28k
    size_t non_null_count = 0;
674
7.28k
    size_t offset = 0;
675
73.3k
    while (offset < num_rows) {
676
66.0k
        bool is_null = null_map[offset] != 0;
677
66.0k
        size_t remaining = num_rows - offset;
678
66.0k
        const uint8_t* run_end =
679
66.0k
                static_cast<const uint8_t*>(memchr(null_map + offset, is_null ? 0 : 1, remaining));
680
66.0k
        size_t run_length = run_end != nullptr ? (run_end - (null_map + offset)) : remaining;
681
66.0k
        _null_run_buffer.push_back(NullRun {is_null, static_cast<uint32_t>(run_length)});
682
66.0k
        if (!is_null) {
683
35.9k
            non_null_count += run_length;
684
35.9k
        }
685
66.0k
        offset += run_length;
686
66.0k
    }
687
688
    // Pre-allocate buffer based on estimated size
689
7.28k
    if (_null_bitmap_builder != nullptr) {
690
7.25k
        size_t current_rows = _next_rowid - _first_rowid;
691
7.25k
        size_t expected_rows = current_rows + num_rows;
692
7.25k
        size_t est_non_null = non_null_count;
693
7.25k
        if (num_rows > 0 && expected_rows > num_rows) {
694
1.08k
            est_non_null = (non_null_count * expected_rows) / num_rows;
695
1.08k
        }
696
7.25k
        _null_bitmap_builder->reserve_for_write(expected_rows, est_non_null);
697
7.25k
    }
698
699
7.28k
    if (non_null_count == 0) {
700
        // All NULL: skip data writing, only update null bitmap and indexes
701
511
        RETURN_IF_ERROR(append_nulls(num_rows));
702
511
        *ptr += get_field()->size() * num_rows;
703
511
        return Status::OK();
704
511
    }
705
706
6.77k
    if (non_null_count == num_rows) {
707
        // All non-NULL: use normal append_data which handles both data and null bitmap
708
5.82k
        return append_data(ptr, num_rows);
709
5.82k
    }
710
711
    // Process by runs
712
59.7k
    for (const auto& run : _null_run_buffer) {
713
59.7k
        size_t run_length = run.len;
714
59.7k
        if (run.is_null) {
715
29.5k
            RETURN_IF_ERROR(append_nulls(run_length));
716
29.5k
            *ptr += get_field()->size() * run_length;
717
30.1k
        } else {
718
            // TODO:
719
            //  1. `*ptr += get_field()->size() * step;` should do in this function, not append_data;
720
            //  2. support array vectorized load and ptr offset add
721
30.1k
            RETURN_IF_ERROR(append_data(ptr, run_length));
722
30.1k
        }
723
59.7k
    }
724
725
950
    return Status::OK();
726
950
}
727
728
9.77k
uint64_t ScalarColumnWriter::estimate_buffer_size() {
729
9.77k
    uint64_t size = _data_size;
730
9.77k
    size += _page_builder->size();
731
9.77k
    if (is_nullable()) {
732
4.72k
        size += _null_bitmap_builder->size();
733
4.72k
    }
734
9.77k
    size += _ordinal_index_builder->size();
735
9.77k
    if (_opts.need_zone_map) {
736
9.12k
        size += _zone_map_index_builder->size();
737
9.12k
    }
738
9.77k
    if (_opts.need_bloom_filter) {
739
8
        size += _bloom_filter_index_builder->size();
740
8
    }
741
9.77k
    return size;
742
9.77k
}
743
744
22.2k
Status ScalarColumnWriter::finish() {
745
22.2k
    RETURN_IF_ERROR(finish_current_page());
746
22.2k
    _opts.meta->set_num_rows(_next_rowid);
747
22.2k
    return Status::OK();
748
22.2k
}
749
750
22.2k
Status ScalarColumnWriter::write_data() {
751
22.2k
    auto offset = _file_writer->bytes_appended();
752
32.4k
    auto collect_uncompressed_bytes = [](const PageFooterPB& footer) {
753
32.4k
        return footer.uncompressed_size() + footer.ByteSizeLong() +
754
32.4k
               sizeof(uint32_t) /* footer size */ + sizeof(uint32_t) /* checksum */;
755
32.4k
    };
756
26.2k
    for (auto& page : _pages) {
757
26.2k
        _total_uncompressed_data_pages_size += collect_uncompressed_bytes(page->footer);
758
26.2k
        RETURN_IF_ERROR(_write_data_page(page.get()));
759
26.2k
    }
760
22.2k
    _pages.clear();
761
    // write column dict
762
22.2k
    if (_encoding_info->encoding() == DICT_ENCODING) {
763
6.14k
        OwnedSlice dict_body;
764
6.14k
        RETURN_IF_ERROR(_page_builder->get_dictionary_page(&dict_body));
765
6.14k
        EncodingTypePB dict_word_page_encoding;
766
6.14k
        RETURN_IF_ERROR(_page_builder->get_dictionary_page_encoding(&dict_word_page_encoding));
767
768
6.14k
        PageFooterPB footer;
769
6.14k
        footer.set_type(DICTIONARY_PAGE);
770
6.14k
        footer.set_uncompressed_size(cast_set<uint32_t>(dict_body.slice().get_size()));
771
6.14k
        footer.mutable_dict_page_footer()->set_encoding(dict_word_page_encoding);
772
6.14k
        _total_uncompressed_data_pages_size += collect_uncompressed_bytes(footer);
773
774
6.14k
        PagePointer dict_pp;
775
6.14k
        RETURN_IF_ERROR(PageIO::compress_and_write_page(
776
6.14k
                _compress_codec, _opts.compression_min_space_saving, _file_writer,
777
6.14k
                {dict_body.slice()}, footer, &dict_pp));
778
6.14k
        dict_pp.to_proto(_opts.meta->mutable_dict_page());
779
6.14k
    }
780
22.2k
    _total_compressed_data_pages_size += _file_writer->bytes_appended() - offset;
781
22.2k
    _page_builder.reset();
782
22.2k
    return Status::OK();
783
22.2k
}
784
785
22.1k
Status ScalarColumnWriter::write_ordinal_index() {
786
22.1k
    return _ordinal_index_builder->finish(_file_writer, _opts.meta->add_indexes());
787
22.1k
}
788
789
20.2k
Status ScalarColumnWriter::write_zone_map() {
790
20.2k
    if (_opts.need_zone_map) {
791
19.7k
        return _zone_map_index_builder->finish(_file_writer, _opts.meta->add_indexes());
792
19.7k
    }
793
546
    return Status::OK();
794
20.2k
}
795
796
19.6k
Status ScalarColumnWriter::write_inverted_index() {
797
19.6k
    if (_opts.need_inverted_index) {
798
2.19k
        for (const auto& builder : _inverted_index_builders) {
799
2.19k
            RETURN_IF_ERROR(builder->finish());
800
2.19k
        }
801
2.18k
    }
802
19.6k
    return Status::OK();
803
19.6k
}
804
805
19.6k
Status ScalarColumnWriter::write_bloom_filter_index() {
806
19.6k
    if (_opts.need_bloom_filter) {
807
7
        return _bloom_filter_index_builder->finish(_file_writer, _opts.meta->add_indexes());
808
7
    }
809
19.6k
    return Status::OK();
810
19.6k
}
811
812
// write a data page into file and update ordinal index
813
26.2k
Status ScalarColumnWriter::_write_data_page(Page* page) {
814
26.2k
    PagePointer pp;
815
26.2k
    std::vector<Slice> compressed_body;
816
43.8k
    for (auto& data : page->data) {
817
43.8k
        compressed_body.push_back(data.slice());
818
43.8k
    }
819
26.2k
    RETURN_IF_ERROR(PageIO::write_page(_file_writer, compressed_body, page->footer, &pp));
820
26.2k
    _ordinal_index_builder->append_entry(page->footer.data_page_footer().first_ordinal(), pp);
821
26.2k
    return Status::OK();
822
26.2k
}
823
824
26.3k
Status ScalarColumnWriter::finish_current_page() {
825
26.3k
    if (_next_rowid == _first_rowid) {
826
40
        return Status::OK();
827
40
    }
828
26.2k
    if (_opts.need_zone_map) {
829
        // If the number of rows in the current page is less than the threshold,
830
        // we will invalidate zone map index for this page by set pass_all to true.
831
23.7k
        if (_next_rowid - _first_rowid < config::zone_map_row_num_threshold) {
832
6.22k
            _zone_map_index_builder->invalid_page_zone_map();
833
6.22k
        }
834
23.7k
        RETURN_IF_ERROR(_zone_map_index_builder->flush());
835
23.7k
    }
836
837
26.2k
    if (_opts.need_bloom_filter) {
838
7
        RETURN_IF_ERROR(_bloom_filter_index_builder->flush());
839
7
    }
840
841
26.2k
    _raw_data_bytes += _page_builder->get_raw_data_size();
842
843
    // build data page body : encoded values + [nullmap]
844
26.2k
    std::vector<Slice> body;
845
26.2k
    OwnedSlice encoded_values;
846
26.2k
    RETURN_IF_ERROR(_page_builder->finish(&encoded_values));
847
26.2k
    RETURN_IF_ERROR(_page_builder->reset());
848
26.2k
    body.push_back(encoded_values.slice());
849
850
26.2k
    OwnedSlice nullmap;
851
26.2k
    if (_null_bitmap_builder != nullptr) {
852
11.8k
        if (is_nullable() && _null_bitmap_builder->has_null()) {
853
1.43k
            RETURN_IF_ERROR(_null_bitmap_builder->finish(&nullmap));
854
1.43k
            body.push_back(nullmap.slice());
855
1.43k
        }
856
11.8k
        _null_bitmap_builder->reset();
857
11.8k
    }
858
859
    // prepare data page footer
860
26.2k
    std::unique_ptr<Page> page(new Page());
861
26.2k
    page->footer.set_type(DATA_PAGE);
862
26.2k
    page->footer.set_uncompressed_size(cast_set<uint32_t>(Slice::compute_total_size(body)));
863
26.2k
    auto* data_page_footer = page->footer.mutable_data_page_footer();
864
26.2k
    data_page_footer->set_first_ordinal(_first_rowid);
865
26.2k
    data_page_footer->set_num_values(_next_rowid - _first_rowid);
866
26.2k
    data_page_footer->set_nullmap_size(cast_set<uint32_t>(nullmap.slice().size));
867
26.2k
    if (_new_page_callback != nullptr) {
868
419
        _new_page_callback->put_extra_info_in_page(data_page_footer);
869
419
    }
870
    // trying to compress page body
871
26.2k
    OwnedSlice compressed_body;
872
26.2k
    RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving,
873
26.2k
                                               body, &compressed_body));
874
26.2k
    if (compressed_body.slice().empty()) {
875
        // page body is uncompressed
876
17.5k
        page->data.emplace_back(std::move(encoded_values));
877
17.5k
        page->data.emplace_back(std::move(nullmap));
878
17.5k
    } else {
879
        // page body is compressed
880
8.76k
        page->data.emplace_back(std::move(compressed_body));
881
8.76k
    }
882
883
26.2k
    _push_back_page(std::move(page));
884
26.2k
    _first_rowid = _next_rowid;
885
26.2k
    return Status::OK();
886
26.2k
}
887
888
////////////////////////////////////////////////////////////////////////////////
889
890
////////////////////////////////////////////////////////////////////////////////
891
// offset column writer
892
////////////////////////////////////////////////////////////////////////////////
893
894
OffsetColumnWriter::OffsetColumnWriter(const ColumnWriterOptions& opts,
895
                                       std::unique_ptr<StorageField> field,
896
                                       io::FileWriter* file_writer)
897
424
        : ScalarColumnWriter(opts, std::move(field), file_writer) {
898
    // now we only explain data in offset column as uint64
899
424
    DCHECK(get_field()->type() == FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT);
900
424
}
901
902
424
OffsetColumnWriter::~OffsetColumnWriter() = default;
903
904
424
Status OffsetColumnWriter::init() {
905
424
    RETURN_IF_ERROR(ScalarColumnWriter::init());
906
424
    register_flush_page_callback(this);
907
424
    _next_offset = 0;
908
424
    return Status::OK();
909
424
}
910
911
425
Status OffsetColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
912
425
    size_t remaining = num_rows;
913
850
    while (remaining > 0) {
914
425
        size_t num_written = remaining;
915
425
        RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written));
916
        // _next_offset after append_data_in_current_page is the offset of next data, which will used in finish_current_page() to set next_array_item_ordinal
917
425
        _next_offset = *(const uint64_t*)(*ptr);
918
425
        remaining -= num_written;
919
920
425
        if (_page_builder->is_page_full()) {
921
            // get next data for next array_item_rowid
922
0
            RETURN_IF_ERROR(finish_current_page());
923
0
        }
924
425
    }
925
425
    return Status::OK();
926
425
}
927
928
419
void OffsetColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
929
419
    footer->set_next_array_item_ordinal(_next_offset);
930
419
}
931
932
StructColumnWriter::StructColumnWriter(
933
        const ColumnWriterOptions& opts, std::unique_ptr<StorageField> field,
934
        ScalarColumnWriter* null_writer,
935
        std::vector<std::unique_ptr<ColumnWriter>>& sub_column_writers)
936
0
        : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta), _opts(opts) {
937
0
    for (auto& sub_column_writer : sub_column_writers) {
938
0
        _sub_column_writers.push_back(std::move(sub_column_writer));
939
0
    }
940
0
    _num_sub_column_writers = _sub_column_writers.size();
941
0
    DCHECK(_num_sub_column_writers >= 1);
942
0
    if (is_nullable()) {
943
0
        _null_writer.reset(null_writer);
944
0
    }
945
0
}
946
947
0
Status StructColumnWriter::init() {
948
0
    for (auto& column_writer : _sub_column_writers) {
949
0
        RETURN_IF_ERROR(column_writer->init());
950
0
    }
951
0
    if (is_nullable()) {
952
0
        RETURN_IF_ERROR(_null_writer->init());
953
0
    }
954
0
    return Status::OK();
955
0
}
956
957
0
Status StructColumnWriter::write_inverted_index() {
958
0
    if (_opts.need_inverted_index) {
959
0
        for (auto& column_writer : _sub_column_writers) {
960
0
            RETURN_IF_ERROR(column_writer->write_inverted_index());
961
0
        }
962
0
    }
963
0
    return Status::OK();
964
0
}
965
966
Status StructColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
967
0
                                           size_t num_rows) {
968
0
    RETURN_IF_ERROR(append_data(ptr, num_rows));
969
0
    RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
970
0
    return Status::OK();
971
0
}
972
973
0
Status StructColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
974
0
    const auto* results = reinterpret_cast<const uint64_t*>(*ptr);
975
0
    for (size_t i = 0; i < _num_sub_column_writers; ++i) {
976
0
        auto nullmap = *(results + _num_sub_column_writers + i);
977
0
        auto data = *(results + i);
978
0
        RETURN_IF_ERROR(_sub_column_writers[i]->append(reinterpret_cast<const uint8_t*>(nullmap),
979
0
                                                       reinterpret_cast<const void*>(data),
980
0
                                                       num_rows));
981
0
    }
982
0
    return Status::OK();
983
0
}
984
985
0
uint64_t StructColumnWriter::estimate_buffer_size() {
986
0
    uint64_t size = 0;
987
0
    for (auto& column_writer : _sub_column_writers) {
988
0
        size += column_writer->estimate_buffer_size();
989
0
    }
990
0
    size += is_nullable() ? _null_writer->estimate_buffer_size() : 0;
991
0
    return size;
992
0
}
993
994
0
Status StructColumnWriter::finish() {
995
0
    for (auto& column_writer : _sub_column_writers) {
996
0
        RETURN_IF_ERROR(column_writer->finish());
997
0
    }
998
0
    if (is_nullable()) {
999
0
        RETURN_IF_ERROR(_null_writer->finish());
1000
0
    }
1001
0
    _opts.meta->set_num_rows(get_next_rowid());
1002
0
    return Status::OK();
1003
0
}
1004
1005
0
Status StructColumnWriter::write_data() {
1006
0
    for (auto& column_writer : _sub_column_writers) {
1007
0
        RETURN_IF_ERROR(column_writer->write_data());
1008
0
    }
1009
0
    if (is_nullable()) {
1010
0
        RETURN_IF_ERROR(_null_writer->write_data());
1011
0
    }
1012
0
    return Status::OK();
1013
0
}
1014
1015
0
Status StructColumnWriter::write_ordinal_index() {
1016
0
    for (auto& column_writer : _sub_column_writers) {
1017
0
        RETURN_IF_ERROR(column_writer->write_ordinal_index());
1018
0
    }
1019
0
    if (is_nullable()) {
1020
0
        RETURN_IF_ERROR(_null_writer->write_ordinal_index());
1021
0
    }
1022
0
    return Status::OK();
1023
0
}
1024
1025
0
Status StructColumnWriter::append_nulls(size_t num_rows) {
1026
0
    for (auto& column_writer : _sub_column_writers) {
1027
0
        RETURN_IF_ERROR(column_writer->append_nulls(num_rows));
1028
0
    }
1029
0
    if (is_nullable()) {
1030
0
        std::vector<UInt8> null_signs(num_rows, 1);
1031
0
        const uint8_t* null_sign_ptr = null_signs.data();
1032
0
        RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, num_rows));
1033
0
    }
1034
0
    return Status::OK();
1035
0
}
1036
1037
0
Status StructColumnWriter::finish_current_page() {
1038
0
    return Status::NotSupported("struct writer has no data, can not finish_current_page");
1039
0
}
1040
1041
ArrayColumnWriter::ArrayColumnWriter(const ColumnWriterOptions& opts,
1042
                                     std::unique_ptr<StorageField> field,
1043
                                     OffsetColumnWriter* offset_writer,
1044
                                     ScalarColumnWriter* null_writer,
1045
                                     std::unique_ptr<ColumnWriter> item_writer)
1046
42
        : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta),
1047
42
          _item_writer(std::move(item_writer)),
1048
42
          _opts(opts) {
1049
42
    _offset_writer.reset(offset_writer);
1050
42
    if (is_nullable()) {
1051
39
        _null_writer.reset(null_writer);
1052
39
    }
1053
42
}
1054
1055
42
Status ArrayColumnWriter::init() {
1056
42
    RETURN_IF_ERROR(_offset_writer->init());
1057
42
    if (is_nullable()) {
1058
39
        RETURN_IF_ERROR(_null_writer->init());
1059
39
    }
1060
42
    RETURN_IF_ERROR(_item_writer->init());
1061
42
    if (_opts.need_inverted_index) {
1062
0
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1063
0
        if (writer != nullptr) {
1064
0
            RETURN_IF_ERROR(IndexColumnWriter::create(get_field(), &_inverted_index_writer,
1065
0
                                                      _opts.index_file_writer,
1066
0
                                                      _opts.inverted_indexes[0]));
1067
0
        }
1068
0
    }
1069
42
    if (_opts.need_ann_index) {
1070
1
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1071
1
        if (writer != nullptr) {
1072
1
            _ann_index_writer = std::make_unique<AnnIndexColumnWriter>(_opts.index_file_writer,
1073
1
                                                                       _opts.ann_index);
1074
1
            RETURN_IF_ERROR(_ann_index_writer->init());
1075
1
        }
1076
1
    }
1077
42
    return Status::OK();
1078
42
}
1079
1080
31
Status ArrayColumnWriter::write_inverted_index() {
1081
31
    if (_opts.need_inverted_index) {
1082
0
        return _inverted_index_writer->finish();
1083
0
    }
1084
31
    return Status::OK();
1085
31
}
1086
1087
31
Status ArrayColumnWriter::write_ann_index() {
1088
31
    if (_opts.need_ann_index) {
1089
1
        return _ann_index_writer->finish();
1090
1
    }
1091
30
    return Status::OK();
1092
31
}
1093
1094
// batch append data for array
1095
43
Status ArrayColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
1096
    // data_ptr contains
1097
    // [size, offset_ptr, item_data_ptr, item_nullmap_ptr]
1098
43
    auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
1099
    // total number length
1100
43
    size_t element_cnt = size_t((unsigned long)(*data_ptr));
1101
43
    auto offset_data = *(data_ptr + 1);
1102
43
    const uint8_t* offsets_ptr = (const uint8_t*)offset_data;
1103
43
    auto data = *(data_ptr + 2);
1104
43
    auto nested_null_map = *(data_ptr + 3);
1105
43
    if (element_cnt > 0) {
1106
29
        RETURN_IF_ERROR(_item_writer->append(reinterpret_cast<const uint8_t*>(nested_null_map),
1107
29
                                             reinterpret_cast<const void*>(data), element_cnt));
1108
29
    }
1109
43
    if (_opts.need_inverted_index) {
1110
0
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1111
        // now only support nested type is scala
1112
0
        if (writer != nullptr) {
1113
            //NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr
1114
0
            RETURN_IF_ERROR(_inverted_index_writer->add_array_values(
1115
0
                    _item_writer->get_field()->size(), reinterpret_cast<const void*>(data),
1116
0
                    reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
1117
0
        }
1118
0
    }
1119
1120
43
    if (_opts.need_ann_index) {
1121
1
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1122
        // now only support nested type is scala
1123
1
        if (writer != nullptr) {
1124
            //NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr
1125
1
            RETURN_IF_ERROR(_ann_index_writer->add_array_values(
1126
1
                    _item_writer->get_field()->size(), reinterpret_cast<const void*>(data),
1127
1
                    reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
1128
1
        } else {
1129
0
            return Status::NotSupported(
1130
0
                    "Ann index can only be build on array with scalar type. but got {} as "
1131
0
                    "nested",
1132
0
                    _item_writer->get_field()->type());
1133
0
        }
1134
1
    }
1135
1136
43
    RETURN_IF_ERROR(_offset_writer->append_data(&offsets_ptr, num_rows));
1137
43
    return Status::OK();
1138
43
}
1139
1140
31
uint64_t ArrayColumnWriter::estimate_buffer_size() {
1141
31
    return _offset_writer->estimate_buffer_size() +
1142
31
           (is_nullable() ? _null_writer->estimate_buffer_size() : 0) +
1143
31
           _item_writer->estimate_buffer_size();
1144
31
}
1145
1146
Status ArrayColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
1147
40
                                          size_t num_rows) {
1148
40
    RETURN_IF_ERROR(append_data(ptr, num_rows));
1149
40
    if (is_nullable()) {
1150
40
        if (_opts.need_inverted_index) {
1151
0
            RETURN_IF_ERROR(_inverted_index_writer->add_array_nulls(null_map, num_rows));
1152
0
        }
1153
40
        RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
1154
40
    }
1155
40
    return Status::OK();
1156
40
}
1157
1158
42
Status ArrayColumnWriter::finish() {
1159
42
    RETURN_IF_ERROR(_offset_writer->finish());
1160
42
    if (is_nullable()) {
1161
39
        RETURN_IF_ERROR(_null_writer->finish());
1162
39
    }
1163
42
    RETURN_IF_ERROR(_item_writer->finish());
1164
42
    _opts.meta->set_num_rows(get_next_rowid());
1165
42
    return Status::OK();
1166
42
}
1167
1168
42
Status ArrayColumnWriter::write_data() {
1169
42
    RETURN_IF_ERROR(_offset_writer->write_data());
1170
42
    if (is_nullable()) {
1171
39
        RETURN_IF_ERROR(_null_writer->write_data());
1172
39
    }
1173
42
    RETURN_IF_ERROR(_item_writer->write_data());
1174
42
    return Status::OK();
1175
42
}
1176
1177
42
Status ArrayColumnWriter::write_ordinal_index() {
1178
42
    RETURN_IF_ERROR(_offset_writer->write_ordinal_index());
1179
42
    if (is_nullable()) {
1180
39
        RETURN_IF_ERROR(_null_writer->write_ordinal_index());
1181
39
    }
1182
42
    if (!has_empty_items()) {
1183
28
        RETURN_IF_ERROR(_item_writer->write_ordinal_index());
1184
28
    }
1185
42
    return Status::OK();
1186
42
}
1187
1188
0
Status ArrayColumnWriter::append_nulls(size_t num_rows) {
1189
0
    size_t num_lengths = num_rows;
1190
0
    const ordinal_t offset = _item_writer->get_next_rowid();
1191
0
    while (num_lengths > 0) {
1192
        // TODO llj bulk write
1193
0
        const auto* offset_ptr = reinterpret_cast<const uint8_t*>(&offset);
1194
0
        RETURN_IF_ERROR(_offset_writer->append_data(&offset_ptr, 1));
1195
0
        --num_lengths;
1196
0
    }
1197
0
    return write_null_column(num_rows, true);
1198
0
}
1199
1200
0
Status ArrayColumnWriter::write_null_column(size_t num_rows, bool is_null) {
1201
0
    uint8_t null_sign = is_null ? 1 : 0;
1202
0
    while (is_nullable() && num_rows > 0) {
1203
        // TODO llj bulk write
1204
0
        const uint8_t* null_sign_ptr = &null_sign;
1205
0
        RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, 1));
1206
0
        --num_rows;
1207
0
    }
1208
0
    return Status::OK();
1209
0
}
1210
1211
0
Status ArrayColumnWriter::finish_current_page() {
1212
0
    return Status::NotSupported("array writer has no data, can not finish_current_page");
1213
0
}
1214
1215
/// ============================= MapColumnWriter =====================////
1216
MapColumnWriter::MapColumnWriter(const ColumnWriterOptions& opts,
1217
                                 std::unique_ptr<StorageField> field,
1218
                                 ScalarColumnWriter* null_writer, OffsetColumnWriter* offset_writer,
1219
                                 std::vector<std::unique_ptr<ColumnWriter>>& kv_writers)
1220
382
        : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta), _opts(opts) {
1221
382
    CHECK_EQ(kv_writers.size(), 2);
1222
382
    _offsets_writer.reset(offset_writer);
1223
382
    if (is_nullable()) {
1224
56
        _null_writer.reset(null_writer);
1225
56
    }
1226
764
    for (auto& sub_writers : kv_writers) {
1227
764
        _kv_writers.push_back(std::move(sub_writers));
1228
764
    }
1229
382
}
1230
1231
382
Status MapColumnWriter::init() {
1232
382
    RETURN_IF_ERROR(_offsets_writer->init());
1233
382
    if (is_nullable()) {
1234
56
        RETURN_IF_ERROR(_null_writer->init());
1235
56
    }
1236
    // here register_flush_page_callback to call this.put_extra_info_in_page()
1237
    // when finish cur data page
1238
764
    for (auto& sub_writer : _kv_writers) {
1239
764
        RETURN_IF_ERROR(sub_writer->init());
1240
764
    }
1241
382
    return Status::OK();
1242
382
}
1243
1244
57
uint64_t MapColumnWriter::estimate_buffer_size() {
1245
57
    size_t estimate = 0;
1246
114
    for (auto& sub_writer : _kv_writers) {
1247
114
        estimate += sub_writer->estimate_buffer_size();
1248
114
    }
1249
57
    estimate += _offsets_writer->estimate_buffer_size();
1250
57
    if (is_nullable()) {
1251
56
        estimate += _null_writer->estimate_buffer_size();
1252
56
    }
1253
57
    return estimate;
1254
57
}
1255
1256
377
Status MapColumnWriter::finish() {
1257
377
    RETURN_IF_ERROR(_offsets_writer->finish());
1258
377
    if (is_nullable()) {
1259
56
        RETURN_IF_ERROR(_null_writer->finish());
1260
56
    }
1261
754
    for (auto& sub_writer : _kv_writers) {
1262
754
        RETURN_IF_ERROR(sub_writer->finish());
1263
754
    }
1264
377
    _opts.meta->set_num_rows(get_next_rowid());
1265
377
    return Status::OK();
1266
377
}
1267
1268
Status MapColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
1269
56
                                        size_t num_rows) {
1270
56
    RETURN_IF_ERROR(append_data(ptr, num_rows));
1271
56
    if (is_nullable()) {
1272
56
        RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
1273
56
    }
1274
56
    return Status::OK();
1275
56
}
1276
1277
// write key value data with offsets
1278
382
Status MapColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
1279
    // data_ptr contains
1280
    // [size, offset_ptr, key_data_ptr, val_data_ptr, k_nullmap_ptr, v_nullmap_pr]
1281
    // which converted results from olap_map_convertor and later will use a structure to replace it
1282
382
    auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
1283
    // total number length
1284
382
    size_t element_cnt = size_t((unsigned long)(*data_ptr));
1285
382
    auto offset_data = *(data_ptr + 1);
1286
382
    const uint8_t* offsets_ptr = (const uint8_t*)offset_data;
1287
1288
382
    if (element_cnt > 0) {
1289
1.10k
        for (size_t i = 0; i < 2; ++i) {
1290
738
            auto data = *(data_ptr + 2 + i);
1291
738
            auto nested_null_map = *(data_ptr + 2 + 2 + i);
1292
738
            RETURN_IF_ERROR(
1293
738
                    _kv_writers[i]->append(reinterpret_cast<const uint8_t*>(nested_null_map),
1294
738
                                           reinterpret_cast<const void*>(data), element_cnt));
1295
738
        }
1296
369
    }
1297
    // make sure the order : offset writer flush next_array_item_ordinal after kv_writers append_data
1298
    // because we use _kv_writers[0]->get_next_rowid() to set next_array_item_ordinal in offset page footer
1299
382
    RETURN_IF_ERROR(_offsets_writer->append_data(&offsets_ptr, num_rows));
1300
382
    return Status::OK();
1301
382
}
1302
1303
378
Status MapColumnWriter::write_data() {
1304
378
    RETURN_IF_ERROR(_offsets_writer->write_data());
1305
378
    if (is_nullable()) {
1306
56
        RETURN_IF_ERROR(_null_writer->write_data());
1307
56
    }
1308
756
    for (auto& sub_writer : _kv_writers) {
1309
756
        RETURN_IF_ERROR(sub_writer->write_data());
1310
756
    }
1311
378
    return Status::OK();
1312
378
}
1313
1314
377
Status MapColumnWriter::write_ordinal_index() {
1315
377
    RETURN_IF_ERROR(_offsets_writer->write_ordinal_index());
1316
377
    if (is_nullable()) {
1317
56
        RETURN_IF_ERROR(_null_writer->write_ordinal_index());
1318
56
    }
1319
754
    for (auto& sub_writer : _kv_writers) {
1320
754
        if (sub_writer->get_next_rowid() != 0) {
1321
738
            RETURN_IF_ERROR(sub_writer->write_ordinal_index());
1322
738
        }
1323
754
    }
1324
377
    return Status::OK();
1325
377
}
1326
1327
0
Status MapColumnWriter::append_nulls(size_t num_rows) {
1328
0
    for (auto& sub_writer : _kv_writers) {
1329
0
        RETURN_IF_ERROR(sub_writer->append_nulls(num_rows));
1330
0
    }
1331
0
    const ordinal_t offset = _kv_writers[0]->get_next_rowid();
1332
0
    std::vector<UInt8> offsets_data(num_rows, cast_set<uint8_t>(offset));
1333
0
    const uint8_t* offsets_ptr = offsets_data.data();
1334
0
    RETURN_IF_ERROR(_offsets_writer->append_data(&offsets_ptr, num_rows));
1335
1336
0
    if (is_nullable()) {
1337
0
        std::vector<UInt8> null_signs(num_rows, 1);
1338
0
        const uint8_t* null_sign_ptr = null_signs.data();
1339
0
        RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, num_rows));
1340
0
    }
1341
0
    return Status::OK();
1342
0
}
1343
1344
0
Status MapColumnWriter::finish_current_page() {
1345
0
    return Status::NotSupported("map writer has no data, can not finish_current_page");
1346
0
}
1347
1348
56
Status MapColumnWriter::write_inverted_index() {
1349
56
    if (_opts.need_inverted_index) {
1350
0
        return _index_builder->finish();
1351
0
    }
1352
56
    return Status::OK();
1353
56
}
1354
1355
VariantColumnWriter::VariantColumnWriter(const ColumnWriterOptions& opts,
1356
                                         const TabletColumn* column,
1357
                                         std::unique_ptr<StorageField> field)
1358
321
        : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta) {
1359
321
    _impl = std::make_unique<VariantColumnWriterImpl>(opts, column);
1360
321
}
1361
1362
321
Status VariantColumnWriter::init() {
1363
321
    return _impl->init();
1364
321
}
1365
1366
315
Status VariantColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
1367
315
    _next_rowid += num_rows;
1368
315
    return _impl->append_data(ptr, num_rows);
1369
315
}
1370
1371
765
uint64_t VariantColumnWriter::estimate_buffer_size() {
1372
765
    return _impl->estimate_buffer_size();
1373
765
}
1374
1375
316
Status VariantColumnWriter::finish() {
1376
316
    return _impl->finish();
1377
316
}
1378
317
Status VariantColumnWriter::write_data() {
1379
317
    return _impl->write_data();
1380
317
}
1381
316
Status VariantColumnWriter::write_ordinal_index() {
1382
316
    return _impl->write_ordinal_index();
1383
316
}
1384
1385
312
Status VariantColumnWriter::write_zone_map() {
1386
312
    return _impl->write_zone_map();
1387
312
}
1388
1389
299
Status VariantColumnWriter::write_inverted_index() {
1390
299
    return _impl->write_inverted_index();
1391
299
}
1392
300
Status VariantColumnWriter::write_bloom_filter_index() {
1393
300
    return _impl->write_bloom_filter_index();
1394
300
}
1395
1396
Status VariantColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
1397
7
                                            size_t num_rows) {
1398
7
    return _impl->append_nullable(null_map, ptr, num_rows);
1399
7
}
1400
1401
#include "common/compile_check_end.h"
1402
1403
} // namespace doris::segment_v2