Coverage Report

Created: 2026-05-30 07:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/column_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/column_writer.h"
19
20
#include <gen_cpp/segment_v2.pb.h>
21
22
#include <algorithm>
23
#include <cstring>
24
#include <filesystem>
25
#include <memory>
26
27
#include "common/config.h"
28
#include "common/logging.h"
29
#include "core/data_type/data_type_agg_state.h"
30
#include "core/data_type/data_type_factory.hpp"
31
#include "core/types.h"
32
#include "io/fs/file_writer.h"
33
#include "storage/index/bloom_filter/bloom_filter_index_writer.h"
34
#include "storage/index/inverted/inverted_index_writer.h"
35
#include "storage/index/ordinal_page_index.h"
36
#include "storage/index/zone_map/zone_map_index.h"
37
#include "storage/olap_common.h"
38
#include "storage/segment/encoding_info.h"
39
#include "storage/segment/options.h"
40
#include "storage/segment/page_builder.h"
41
#include "storage/segment/page_io.h"
42
#include "storage/segment/page_pointer.h"
43
#include "storage/segment/variant/variant_column_writer_impl.h"
44
#include "storage/tablet/tablet_schema.h"
45
#include "storage/types.h"
46
#include "util/block_compression.h"
47
#include "util/debug_points.h"
48
#include "util/faststring.h"
49
#include "util/rle_encoding.h"
50
#include "util/simd/bits.h"
51
52
namespace doris::segment_v2 {
53
54
class NullBitmapBuilder {
55
public:
56
546k
    NullBitmapBuilder() : _has_null(false), _bitmap_buf(512), _rle_encoder(&_bitmap_buf, 1) {}
57
58
    explicit NullBitmapBuilder(size_t reserve_bits)
59
            : _has_null(false),
60
              _bitmap_buf(BitmapSize(reserve_bits)),
61
0
              _rle_encoder(&_bitmap_buf, 1) {}
62
63
1.71M
    void reserve_for_write(size_t num_rows, size_t non_null_count) {
64
1.71M
        if (num_rows == 0) {
65
0
            return;
66
0
        }
67
1.71M
        if (non_null_count == 0 || (non_null_count == num_rows && !_has_null)) {
68
444k
            if (_bitmap_buf.capacity() < kSmallReserveBytes) {
69
80
                _bitmap_buf.reserve(kSmallReserveBytes);
70
80
            }
71
444k
            return;
72
444k
        }
73
1.27M
        size_t raw_bytes = BitmapSize(num_rows);
74
1.27M
        size_t run_est = std::min(num_rows, non_null_count * 2 + 1);
75
1.27M
        size_t run_bytes_est = run_est * kBytesPerRun + kReserveSlackBytes;
76
1.27M
        size_t raw_overhead = raw_bytes / 63 + 1;
77
1.27M
        size_t raw_est = raw_bytes + raw_overhead + kReserveSlackBytes;
78
1.27M
        size_t reserve_bytes = std::min(raw_est, run_bytes_est);
79
1.27M
        if (_bitmap_buf.capacity() < reserve_bytes) {
80
8.92k
            const size_t cap = _bitmap_buf.capacity();
81
8.92k
            const size_t grow = cap + cap / 2;
82
8.92k
            const size_t new_cap = std::max(reserve_bytes, grow);
83
8.92k
            _bitmap_buf.reserve(new_cap);
84
8.92k
        }
85
1.27M
    }
86
87
8.62M
    void add_run(bool value, size_t run) {
88
8.62M
        _has_null |= value;
89
8.62M
        _rle_encoder.Put(value, run);
90
8.62M
    }
91
92
    // Returns whether the building nullmap contains nullptr
93
545k
    bool has_null() const { return _has_null; }
94
95
149k
    Status finish(OwnedSlice* slice) {
96
149k
        _rle_encoder.Flush();
97
149k
        RETURN_IF_CATCH_EXCEPTION({ *slice = _bitmap_buf.build(); });
98
149k
        return Status::OK();
99
149k
    }
100
101
545k
    void reset() {
102
545k
        _has_null = false;
103
545k
        _rle_encoder.Clear();
104
545k
    }
105
106
57.8k
    uint64_t size() { return _bitmap_buf.size(); }
107
108
private:
109
    static constexpr size_t kSmallReserveBytes = 64;
110
    static constexpr size_t kReserveSlackBytes = 16;
111
    static constexpr size_t kBytesPerRun = 6;
112
113
    bool _has_null;
114
    faststring _bitmap_buf;
115
    RleEncoder<bool> _rle_encoder;
116
};
117
118
inline ScalarColumnWriter* get_null_writer(const ColumnWriterOptions& opts,
119
75.7k
                                           io::FileWriter* file_writer, uint32_t id) {
120
75.7k
    if (!opts.meta->is_nullable()) {
121
36.0k
        return nullptr;
122
36.0k
    }
123
124
39.6k
    FieldType null_type = FieldType::OLAP_FIELD_TYPE_TINYINT;
125
39.6k
    ColumnWriterOptions null_options;
126
39.6k
    null_options.meta = opts.meta->add_children_columns();
127
39.6k
    null_options.meta->set_column_id(id);
128
39.6k
    null_options.meta->set_unique_id(id);
129
39.6k
    null_options.meta->set_type(int(null_type));
130
39.6k
    null_options.meta->set_is_nullable(false);
131
39.6k
    null_options.meta->set_length(
132
39.6k
            cast_set<int32_t>(field_type_size(FieldType::OLAP_FIELD_TYPE_TINYINT)));
133
39.6k
    null_options.meta->set_compression(opts.meta->compression());
134
135
39.6k
    null_options.need_zone_map = false;
136
39.6k
    null_options.need_bloom_filter = false;
137
39.6k
    null_options.storage_format = opts.storage_format;
138
139
39.6k
    auto null_column_ptr = std::make_shared<TabletColumn>(
140
39.6k
            FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, null_type, false,
141
39.6k
            null_options.meta->unique_id(), null_options.meta->length());
142
39.6k
    null_column_ptr->set_name("nullable");
143
39.6k
    null_column_ptr->set_index_length(-1); // no short key index
144
39.6k
    null_options.meta->set_encoding(
145
39.6k
            EncodingInfo::resolve_default_encoding(opts.storage_format, *null_column_ptr));
146
39.6k
    return new ScalarColumnWriter(null_options, std::move(null_column_ptr), file_writer);
147
75.7k
}
148
149
ColumnWriter::ColumnWriter(TabletColumnPtr column, bool is_nullable, ColumnMetaPB* meta)
150
1.11M
        : _column(std::move(column)), _is_nullable(is_nullable), _column_meta(meta) {
151
1.11M
    _data_type = DataTypeFactory::instance().create_data_type(*_column_meta);
152
1.11M
}
153
Status ColumnWriter::create_struct_writer(const ColumnWriterOptions& opts,
154
                                          const TabletColumn* column, io::FileWriter* file_writer,
155
2.34k
                                          std::unique_ptr<ColumnWriter>* writer) {
156
    // not support empty struct
157
2.34k
    DCHECK(column->get_subtype_count() >= 1);
158
2.34k
    std::vector<std::unique_ptr<ColumnWriter>> sub_column_writers;
159
2.34k
    sub_column_writers.reserve(column->get_subtype_count());
160
12.9k
    for (uint32_t i = 0; i < column->get_subtype_count(); i++) {
161
10.6k
        const TabletColumn& sub_column = column->get_sub_column(i);
162
10.6k
        RETURN_IF_ERROR(sub_column.check_valid());
163
164
        // create sub writer
165
10.6k
        ColumnWriterOptions column_options;
166
10.6k
        column_options.meta = opts.meta->mutable_children_columns(i);
167
10.6k
        column_options.need_zone_map = false;
168
10.6k
        column_options.need_bloom_filter = sub_column.is_bf_column();
169
10.6k
        column_options.storage_format = opts.storage_format;
170
10.6k
        std::unique_ptr<ColumnWriter> sub_column_writer;
171
10.6k
        RETURN_IF_ERROR(
172
10.6k
                ColumnWriter::create(column_options, &sub_column, file_writer, &sub_column_writer));
173
10.6k
        sub_column_writers.push_back(std::move(sub_column_writer));
174
10.6k
    }
175
176
2.34k
    ScalarColumnWriter* null_writer =
177
2.34k
            get_null_writer(opts, file_writer, column->get_subtype_count() + 1);
178
179
2.34k
    *writer = std::unique_ptr<ColumnWriter>(new StructColumnWriter(
180
2.34k
            opts, std::make_shared<TabletColumn>(*column), null_writer, sub_column_writers));
181
2.34k
    return Status::OK();
182
2.34k
}
183
184
Status ColumnWriter::create_array_writer(const ColumnWriterOptions& opts,
185
                                         const TabletColumn* column, io::FileWriter* file_writer,
186
44.3k
                                         std::unique_ptr<ColumnWriter>* writer) {
187
44.3k
    DCHECK(column->get_subtype_count() == 1);
188
44.3k
    const TabletColumn& item_column = column->get_sub_column(0);
189
44.3k
    RETURN_IF_ERROR(item_column.check_valid());
190
191
    // create item writer
192
44.3k
    ColumnWriterOptions item_options;
193
44.3k
    item_options.meta = opts.meta->mutable_children_columns(0);
194
44.3k
    item_options.need_zone_map = false;
195
44.3k
    item_options.need_bloom_filter = item_column.is_bf_column();
196
44.3k
    item_options.storage_format = opts.storage_format;
197
44.3k
    std::unique_ptr<ColumnWriter> item_writer;
198
44.3k
    RETURN_IF_ERROR(ColumnWriter::create(item_options, &item_column, file_writer, &item_writer));
199
200
    // create length writer
201
44.3k
    FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT;
202
203
44.3k
    ColumnWriterOptions length_options;
204
44.3k
    length_options.meta = opts.meta->add_children_columns();
205
44.3k
    length_options.meta->set_column_id(2);
206
44.3k
    length_options.meta->set_unique_id(2);
207
44.3k
    length_options.meta->set_type(int(length_type));
208
44.3k
    length_options.meta->set_is_nullable(false);
209
44.3k
    length_options.meta->set_length(
210
44.3k
            cast_set<int32_t>(field_type_size(FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT)));
211
44.3k
    length_options.meta->set_compression(opts.meta->compression());
212
213
44.3k
    length_options.need_zone_map = false;
214
44.3k
    length_options.need_bloom_filter = false;
215
44.3k
    length_options.storage_format = opts.storage_format;
216
217
44.3k
    auto length_column_ptr = std::make_shared<TabletColumn>(
218
44.3k
            FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, length_type,
219
44.3k
            length_options.meta->is_nullable(), length_options.meta->unique_id(),
220
44.3k
            length_options.meta->length());
221
44.3k
    length_column_ptr->set_name("length");
222
44.3k
    length_column_ptr->set_index_length(-1); // no short key index
223
44.3k
    length_options.meta->set_encoding(
224
44.3k
            EncodingInfo::resolve_default_encoding(opts.storage_format, *length_column_ptr));
225
44.3k
    auto* length_writer =
226
44.3k
            new OffsetColumnWriter(length_options, std::move(length_column_ptr), file_writer);
227
228
44.3k
    ScalarColumnWriter* null_writer = get_null_writer(opts, file_writer, 3);
229
230
44.3k
    *writer = std::unique_ptr<ColumnWriter>(
231
44.3k
            new ArrayColumnWriter(opts, std::make_shared<TabletColumn>(*column), length_writer,
232
44.3k
                                  null_writer, std::move(item_writer)));
233
44.3k
    return Status::OK();
234
44.3k
}
235
236
Status ColumnWriter::create_map_writer(const ColumnWriterOptions& opts, const TabletColumn* column,
237
                                       io::FileWriter* file_writer,
238
29.0k
                                       std::unique_ptr<ColumnWriter>* writer) {
239
29.0k
    DCHECK(column->get_subtype_count() == 2);
240
29.0k
    if (column->get_subtype_count() < 2) {
241
0
        return Status::InternalError(
242
0
                "If you upgraded from version 1.2.*, please DROP the MAP columns and then "
243
0
                "ADD the MAP columns back.");
244
0
    }
245
    // create key & value writer
246
29.0k
    std::vector<std::unique_ptr<ColumnWriter>> inner_writer_list;
247
87.1k
    for (int i = 0; i < 2; ++i) {
248
58.0k
        const TabletColumn& item_column = column->get_sub_column(i);
249
58.0k
        RETURN_IF_ERROR(item_column.check_valid());
250
251
        // create item writer
252
58.0k
        ColumnWriterOptions item_options;
253
58.0k
        item_options.meta = opts.meta->mutable_children_columns(i);
254
58.0k
        item_options.need_zone_map = false;
255
58.0k
        item_options.need_bloom_filter = item_column.is_bf_column();
256
58.0k
        item_options.storage_format = opts.storage_format;
257
58.0k
        std::unique_ptr<ColumnWriter> item_writer;
258
58.0k
        RETURN_IF_ERROR(
259
58.0k
                ColumnWriter::create(item_options, &item_column, file_writer, &item_writer));
260
58.0k
        inner_writer_list.push_back(std::move(item_writer));
261
58.0k
    }
262
263
    // create offset writer
264
29.0k
    FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT;
265
266
    // Be Cautious: column unique id is used for column reader creation
267
29.0k
    ColumnWriterOptions length_options;
268
29.0k
    length_options.meta = opts.meta->add_children_columns();
269
29.0k
    length_options.meta->set_column_id(column->get_subtype_count() + 1);
270
29.0k
    length_options.meta->set_unique_id(column->get_subtype_count() + 1);
271
29.0k
    length_options.meta->set_type(int(length_type));
272
29.0k
    length_options.meta->set_is_nullable(false);
273
29.0k
    length_options.meta->set_length(
274
29.0k
            cast_set<int32_t>(field_type_size(FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT)));
275
29.0k
    length_options.meta->set_compression(opts.meta->compression());
276
277
29.0k
    length_options.need_zone_map = false;
278
29.0k
    length_options.need_bloom_filter = false;
279
29.0k
    length_options.storage_format = opts.storage_format;
280
281
29.0k
    auto length_column_ptr = std::make_shared<TabletColumn>(
282
29.0k
            FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, length_type,
283
29.0k
            length_options.meta->is_nullable(), length_options.meta->unique_id(),
284
29.0k
            length_options.meta->length());
285
29.0k
    length_column_ptr->set_name("length");
286
29.0k
    length_column_ptr->set_index_length(-1); // no short key index
287
29.0k
    length_options.meta->set_encoding(
288
29.0k
            EncodingInfo::resolve_default_encoding(opts.storage_format, *length_column_ptr));
289
29.0k
    auto* length_writer =
290
29.0k
            new OffsetColumnWriter(length_options, std::move(length_column_ptr), file_writer);
291
292
29.0k
    ScalarColumnWriter* null_writer =
293
29.0k
            get_null_writer(opts, file_writer, column->get_subtype_count() + 2);
294
295
29.0k
    *writer = std::unique_ptr<ColumnWriter>(
296
29.0k
            new MapColumnWriter(opts, std::make_shared<TabletColumn>(*column), null_writer,
297
29.0k
                                length_writer, inner_writer_list));
298
299
29.0k
    return Status::OK();
300
29.0k
}
301
302
Status ColumnWriter::create_agg_state_writer(const ColumnWriterOptions& opts,
303
                                             const TabletColumn* column,
304
                                             io::FileWriter* file_writer,
305
1.30k
                                             std::unique_ptr<ColumnWriter>* writer) {
306
1.30k
    auto data_type = DataTypeFactory::instance().create_data_type(*column);
307
1.30k
    const auto* agg_state_type = assert_cast<const DataTypeAggState*>(data_type.get());
308
1.30k
    auto type = agg_state_type->get_serialized_type()->get_primitive_type();
309
1.30k
    if (type == PrimitiveType::TYPE_STRING || type == PrimitiveType::INVALID_TYPE ||
310
1.30k
        type == PrimitiveType::TYPE_FIXED_LENGTH_OBJECT || type == PrimitiveType::TYPE_BITMAP) {
311
1.21k
        *writer = std::unique_ptr<ColumnWriter>(
312
1.21k
                new ScalarColumnWriter(opts, std::make_shared<TabletColumn>(*column), file_writer));
313
1.21k
    } else if (type == PrimitiveType::TYPE_ARRAY) {
314
21
        RETURN_IF_ERROR(create_array_writer(opts, column, file_writer, writer));
315
67
    } else if (type == PrimitiveType::TYPE_MAP) {
316
48
        RETURN_IF_ERROR(create_map_writer(opts, column, file_writer, writer));
317
48
    } else {
318
19
        throw Exception(ErrorCode::INTERNAL_ERROR,
319
19
                        "OLAP_FIELD_TYPE_AGG_STATE meet unsupported type: {}",
320
19
                        agg_state_type->get_name());
321
19
    }
322
1.28k
    return Status::OK();
323
1.30k
}
324
325
Status ColumnWriter::create_variant_writer(const ColumnWriterOptions& opts,
326
                                           const TabletColumn* column, io::FileWriter* file_writer,
327
6.39k
                                           std::unique_ptr<ColumnWriter>* writer) {
328
    // Variant extracted columns have two kinds of physical writers:
329
    // - Doc-value snapshot column (`...__DORIS_VARIANT_DOC_VALUE__...`): use `VariantDocCompactWriter`
330
    //   to store the doc snapshot in a compact binary form.
331
    // - Regular extracted subcolumns: use `VariantSubcolumnWriter`.
332
    // The root VARIANT column itself uses `VariantColumnWriter`.
333
6.39k
    if (column->is_extracted_column()) {
334
498
        if (column->name().find(DOC_VALUE_COLUMN_PATH) != std::string::npos) {
335
405
            *writer = std::make_unique<VariantDocCompactWriter>(
336
405
                    opts, std::make_shared<TabletColumn>(*column));
337
405
            return Status::OK();
338
405
        }
339
93
        VLOG_DEBUG << "gen subwriter for " << column->path_info_ptr()->get_path();
340
93
        *writer = std::make_unique<VariantSubcolumnWriter>(opts,
341
93
                                                           std::make_shared<TabletColumn>(*column));
342
93
        return Status::OK();
343
498
    }
344
5.89k
    *writer = std::make_unique<VariantColumnWriter>(opts, std::make_shared<TabletColumn>(*column));
345
5.89k
    return Status::OK();
346
6.39k
}
347
348
//Todo(Amory): here should according nullable and offset and need sub to simply this function
349
Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* column,
350
979k
                            io::FileWriter* file_writer, std::unique_ptr<ColumnWriter>* writer) {
351
979k
    auto column_ptr = std::make_shared<TabletColumn>(*column);
352
979k
    if (is_scalar_type(column->type())) {
353
914k
        *writer = std::unique_ptr<ColumnWriter>(
354
914k
                new ScalarColumnWriter(opts, std::move(column_ptr), file_writer));
355
914k
        return Status::OK();
356
914k
    } else {
357
65.5k
        switch (column->type()) {
358
1.30k
        case FieldType::OLAP_FIELD_TYPE_AGG_STATE: {
359
1.30k
            RETURN_IF_ERROR(create_agg_state_writer(opts, column, file_writer, writer));
360
1.30k
            return Status::OK();
361
1.30k
        }
362
2.34k
        case FieldType::OLAP_FIELD_TYPE_STRUCT: {
363
2.34k
            RETURN_IF_ERROR(create_struct_writer(opts, column, file_writer, writer));
364
2.34k
            return Status::OK();
365
2.34k
        }
366
44.3k
        case FieldType::OLAP_FIELD_TYPE_ARRAY: {
367
44.3k
            RETURN_IF_ERROR(create_array_writer(opts, column, file_writer, writer));
368
44.3k
            return Status::OK();
369
44.3k
        }
370
11.6k
        case FieldType::OLAP_FIELD_TYPE_MAP: {
371
11.6k
            RETURN_IF_ERROR(create_map_writer(opts, column, file_writer, writer));
372
11.6k
            return Status::OK();
373
11.6k
        }
374
6.40k
        case FieldType::OLAP_FIELD_TYPE_VARIANT: {
375
            // Process columns with sparse column
376
6.40k
            RETURN_IF_ERROR(create_variant_writer(opts, column, file_writer, writer));
377
6.40k
            return Status::OK();
378
6.40k
        }
379
0
        default:
380
0
            return Status::NotSupported("unsupported type for ColumnWriter: {}",
381
0
                                        std::to_string(int(column_ptr->type())));
382
65.5k
        }
383
65.5k
    }
384
979k
}
385
386
Status ColumnWriter::append_nullable(const uint8_t* is_null_bits, const void* data,
387
651k
                                     size_t num_rows) {
388
651k
    const auto* ptr = (const uint8_t*)data;
389
651k
    BitmapIterator null_iter(is_null_bits, num_rows);
390
651k
    bool is_null = false;
391
651k
    size_t this_run = 0;
392
1.30M
    while ((this_run = null_iter.Next(&is_null)) > 0) {
393
651k
        if (is_null) {
394
0
            RETURN_IF_ERROR(append_nulls(this_run));
395
651k
        } else {
396
651k
            RETURN_IF_ERROR(append_data(&ptr, this_run));
397
651k
        }
398
651k
    }
399
651k
    return Status::OK();
400
651k
}
401
402
Status ColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
403
0
                                     size_t num_rows) {
404
    // Fast path: use SIMD to detect all-NULL or all-non-NULL columns
405
0
    if (config::enable_rle_batch_put_optimization) {
406
0
        size_t non_null_count =
407
0
                simd::count_zero_num(reinterpret_cast<const int8_t*>(null_map), num_rows);
408
409
0
        if (non_null_count == 0) {
410
            // All NULL: skip run-length iteration, directly append all nulls
411
0
            RETURN_IF_ERROR(append_nulls(num_rows));
412
0
            *ptr += cell_size() * num_rows;
413
0
            return Status::OK();
414
0
        }
415
416
0
        if (non_null_count == num_rows) {
417
            // All non-NULL: skip run-length iteration, directly append all data
418
0
            return append_data(ptr, num_rows);
419
0
        }
420
0
    }
421
422
    // Mixed case or sparse optimization disabled: use run-length processing
423
0
    size_t offset = 0;
424
0
    auto next_run_step = [&]() {
425
0
        size_t step = 1;
426
0
        for (auto i = offset + 1; i < num_rows; ++i) {
427
0
            if (null_map[offset] == null_map[i]) {
428
0
                step++;
429
0
            } else {
430
0
                break;
431
0
            }
432
0
        }
433
0
        return step;
434
0
    };
435
436
0
    do {
437
0
        auto step = next_run_step();
438
0
        if (null_map[offset]) {
439
0
            RETURN_IF_ERROR(append_nulls(step));
440
0
            *ptr += cell_size() * step;
441
0
        } else {
442
            // TODO:
443
            //  1. `*ptr += cell_size() * step;` should do in this function, not append_data;
444
            //  2. support array vectorized load and ptr offset add
445
0
            RETURN_IF_ERROR(append_data(ptr, step));
446
0
        }
447
0
        offset += step;
448
0
    } while (offset < num_rows);
449
450
0
    return Status::OK();
451
0
}
452
453
2.16M
Status ColumnWriter::append(const uint8_t* nullmap, const void* data, size_t num_rows) {
454
2.16M
    assert(data && num_rows > 0);
455
2.16M
    const auto* ptr = (const uint8_t*)data;
456
2.16M
    if (nullmap) {
457
1.75M
        return append_nullable(nullmap, &ptr, num_rows);
458
1.75M
    } else {
459
411k
        return append_data(&ptr, num_rows);
460
411k
    }
461
2.16M
}
462
463
///////////////////////////////////////////////////////////////////////////////////
464
465
ScalarColumnWriter::ScalarColumnWriter(const ColumnWriterOptions& opts, TabletColumnPtr column,
466
                                       io::FileWriter* file_writer)
467
1.03M
        : ColumnWriter(std::move(column), opts.meta->is_nullable(), opts.meta),
468
1.03M
          _opts(opts),
469
1.03M
          _file_writer(file_writer),
470
1.03M
          _data_size(0) {
471
    // these opts.meta fields should be set by client
472
1.03M
    DCHECK(opts.meta->has_column_id());
473
1.03M
    DCHECK(opts.meta->has_unique_id());
474
1.03M
    DCHECK(opts.meta->has_type());
475
1.03M
    DCHECK(opts.meta->has_length());
476
1.03M
    DCHECK(opts.meta->has_encoding());
477
1.03M
    DCHECK(opts.meta->has_compression());
478
1.03M
    DCHECK(opts.meta->has_is_nullable());
479
1.03M
    DCHECK(file_writer != nullptr);
480
1.03M
    _inverted_index_builders.resize(_opts.inverted_indexes.size());
481
1.03M
}
482
483
1.03M
ScalarColumnWriter::~ScalarColumnWriter() {
484
    // delete all pages
485
1.03M
    _pages.clear();
486
1.03M
}
487
488
1.03M
Status ScalarColumnWriter::init() {
489
1.03M
    RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec));
490
491
1.03M
    PageBuilder* page_builder = nullptr;
492
493
    // Caller must set a concrete (non-DEFAULT) encoding on the meta before init.
494
1.03M
    if (_opts.meta->encoding() == DEFAULT_ENCODING) {
495
0
        return Status::InternalError(
496
0
                "ColumnMetaPB encoding is DEFAULT_ENCODING for column_id={}, type={}; caller must "
497
0
                "resolve to a concrete encoding before ScalarColumnWriter::init",
498
0
                _opts.meta->column_id(), get_column()->type());
499
0
    }
500
1.03M
    RETURN_IF_ERROR(
501
1.03M
            EncodingInfo::get(get_column()->type(), _opts.meta->encoding(), &_encoding_info));
502
    // create page builder
503
1.03M
    PageBuilderOptions opts;
504
1.03M
    opts.data_page_size = _opts.data_page_size;
505
1.03M
    opts.dict_page_size = _opts.dict_page_size;
506
1.03M
    opts.dict_binary_plain_encoding =
507
1.03M
            (_opts.storage_format == TabletStorageFormatPB::TABLET_STORAGE_FORMAT_V3)
508
1.03M
                    ? BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2
509
1.03M
                    : BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V1;
510
1.03M
    RETURN_IF_ERROR(_encoding_info->create_page_builder(opts, &page_builder));
511
1.03M
    if (page_builder == nullptr) {
512
0
        return Status::NotSupported("Failed to create page builder for type {} and encoding {}",
513
0
                                    get_column()->type(), _opts.meta->encoding());
514
0
    }
515
18.4E
    VLOG_DEBUG << fmt::format(
516
18.4E
            "[verbose] scalar column writer init, column_id={}, type={}, encoding={}, "
517
18.4E
            "is_nullable={}",
518
18.4E
            _opts.meta->column_id(), get_column()->type(),
519
18.4E
            EncodingTypePB_Name(_opts.meta->encoding()), _opts.meta->is_nullable());
520
1.03M
    _page_builder.reset(page_builder);
521
    // create ordinal builder
522
1.03M
    _ordinal_index_builder = std::make_unique<OrdinalIndexWriter>();
523
    // create null bitmap builder
524
1.03M
    if (is_nullable()) {
525
546k
        _null_bitmap_builder = std::make_unique<NullBitmapBuilder>();
526
546k
    }
527
1.03M
    if (_opts.need_zone_map) {
528
753k
        RETURN_IF_ERROR(
529
753k
                ZoneMapIndexWriter::create(_data_type, get_column(), _zone_map_index_builder));
530
753k
    }
531
532
1.03M
    if (_opts.need_inverted_index) {
533
39.4k
        do {
534
79.0k
            for (size_t i = 0; i < _opts.inverted_indexes.size(); i++) {
535
39.6k
                DBUG_EXECUTE_IF("column_writer.init", {
536
39.6k
                    class InvertedIndexColumnWriterEmpty final : public IndexColumnWriter {
537
39.6k
                    public:
538
39.6k
                        Status init() override { return Status::OK(); }
539
39.6k
                        Status add_values(const std::string name, const void* values,
540
39.6k
                                          size_t count) override {
541
39.6k
                            return Status::OK();
542
39.6k
                        }
543
39.6k
                        Status add_array_values(size_t field_size, const void* value_ptr,
544
39.6k
                                                const uint8_t* null_map, const uint8_t* offsets_ptr,
545
39.6k
                                                size_t count) override {
546
39.6k
                            return Status::OK();
547
39.6k
                        }
548
39.6k
                        Status add_nulls(uint32_t count) override { return Status::OK(); }
549
39.6k
                        Status add_array_nulls(const uint8_t* null_map, size_t num_rows) override {
550
39.6k
                            return Status::OK();
551
39.6k
                        }
552
39.6k
                        Status finish() override { return Status::OK(); }
553
39.6k
                        int64_t size() const override { return 0; }
554
39.6k
                        void close_on_error() override {}
555
39.6k
                    };
556
557
39.6k
                    _inverted_index_builders[i] =
558
39.6k
                            std::make_unique<InvertedIndexColumnWriterEmpty>();
559
560
39.6k
                    break;
561
39.6k
                });
562
563
39.6k
                RETURN_IF_ERROR(IndexColumnWriter::create(
564
39.6k
                        get_column(), &_inverted_index_builders[i], _opts.index_file_writer,
565
39.6k
                        _opts.inverted_indexes[i]));
566
39.6k
            }
567
39.4k
        } while (false);
568
39.4k
    }
569
1.03M
    if (_opts.need_bloom_filter) {
570
5.32k
        if (_opts.is_ngram_bf_index) {
571
3.38k
            RETURN_IF_ERROR(NGramBloomFilterIndexWriterImpl::create(
572
3.38k
                    BloomFilterOptions(), get_column()->type(), _opts.gram_size, _opts.gram_bf_size,
573
3.38k
                    &_bloom_filter_index_builder));
574
3.38k
        } else {
575
1.93k
            RETURN_IF_ERROR(BloomFilterIndexWriter::create(_opts.bf_options, get_column()->type(),
576
1.93k
                                                           &_bloom_filter_index_builder));
577
1.93k
        }
578
5.32k
    }
579
1.03M
    return Status::OK();
580
1.03M
}
581
582
3.84M
Status ScalarColumnWriter::append_nulls(size_t num_rows) {
583
3.84M
    _null_bitmap_builder->add_run(true, num_rows);
584
3.84M
    _next_rowid += num_rows;
585
3.84M
    if (_opts.need_zone_map) {
586
3.76M
        _zone_map_index_builder->add_nulls(cast_set<uint32_t>(num_rows));
587
3.76M
    }
588
3.84M
    if (_opts.need_inverted_index) {
589
2.35M
        for (const auto& builder : _inverted_index_builders) {
590
2.35M
            RETURN_IF_ERROR(builder->add_nulls(cast_set<uint32_t>(num_rows)));
591
2.35M
        }
592
2.35M
    }
593
3.84M
    if (_opts.need_bloom_filter) {
594
566
        _bloom_filter_index_builder->add_nulls(cast_set<uint32_t>(num_rows));
595
566
    }
596
3.84M
    return Status::OK();
597
3.84M
}
598
599
// append data to page builder. this function will make sure that
600
// num_rows must be written before return. And ptr will be modified
601
// to next data should be written
602
5.19M
Status ScalarColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
603
5.19M
    size_t remaining = num_rows;
604
10.4M
    while (remaining > 0) {
605
5.27M
        size_t num_written = remaining;
606
5.27M
        RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written));
607
608
5.27M
        remaining -= num_written;
609
610
5.27M
        if (_page_builder->is_page_full()) {
611
71.9k
            RETURN_IF_ERROR(finish_current_page());
612
71.9k
        }
613
5.27M
    }
614
5.19M
    return Status::OK();
615
5.19M
}
616
617
Status ScalarColumnWriter::_internal_append_data_in_current_page(const uint8_t* data,
618
5.34M
                                                                 size_t* num_written) {
619
5.34M
    RETURN_IF_ERROR(_page_builder->add(data, num_written));
620
5.34M
    if (_opts.need_zone_map) {
621
5.02M
        _zone_map_index_builder->add_values(data, *num_written);
622
5.02M
    }
623
5.34M
    if (_opts.need_inverted_index) {
624
2.38M
        for (const auto& builder : _inverted_index_builders) {
625
2.38M
            RETURN_IF_ERROR(builder->add_values(get_column()->name(), data, *num_written));
626
2.38M
        }
627
2.37M
    }
628
5.34M
    if (_opts.need_bloom_filter) {
629
5.60k
        RETURN_IF_ERROR(_bloom_filter_index_builder->add_values(data, *num_written));
630
5.60k
    }
631
632
5.34M
    _next_rowid += *num_written;
633
634
    // we must write null bits after write data, because we don't
635
    // know how many rows can be written into current page
636
5.34M
    if (is_nullable()) {
637
4.80M
        _null_bitmap_builder->add_run(false, *num_written);
638
4.80M
    }
639
5.34M
    return Status::OK();
640
5.34M
}
641
642
5.34M
Status ScalarColumnWriter::append_data_in_current_page(const uint8_t** data, size_t* num_written) {
643
5.34M
    RETURN_IF_ERROR(append_data_in_current_page(*data, num_written));
644
5.34M
    *data += cell_size() * (*num_written);
645
5.34M
    return Status::OK();
646
5.34M
}
647
648
Status ScalarColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
649
1.71M
                                           size_t num_rows) {
650
    // When optimization is disabled, use base class implementation
651
1.71M
    if (!config::enable_rle_batch_put_optimization) {
652
0
        return ColumnWriter::append_nullable(null_map, ptr, num_rows);
653
0
    }
654
655
1.71M
    if (UNLIKELY(num_rows == 0)) {
656
0
        return Status::OK();
657
0
    }
658
659
    // Build run-length encoded null runs using memchr for fast boundary detection
660
1.71M
    _null_run_buffer.clear();
661
1.71M
    if (_null_run_buffer.capacity() < num_rows) {
662
549k
        _null_run_buffer.reserve(std::min(num_rows, size_t(256)));
663
549k
    }
664
665
1.71M
    size_t non_null_count = 0;
666
1.71M
    size_t offset = 0;
667
8.43M
    while (offset < num_rows) {
668
6.72M
        bool is_null = null_map[offset] != 0;
669
6.72M
        size_t remaining = num_rows - offset;
670
6.72M
        const uint8_t* run_end =
671
6.72M
                static_cast<const uint8_t*>(memchr(null_map + offset, is_null ? 0 : 1, remaining));
672
6.72M
        size_t run_length = run_end != nullptr ? (run_end - (null_map + offset)) : remaining;
673
6.72M
        _null_run_buffer.push_back(NullRun {is_null, static_cast<uint32_t>(run_length)});
674
6.72M
        if (!is_null) {
675
4.14M
            non_null_count += run_length;
676
4.14M
        }
677
6.72M
        offset += run_length;
678
6.72M
    }
679
680
    // Pre-allocate buffer based on estimated size
681
1.71M
    if (_null_bitmap_builder != nullptr) {
682
1.71M
        size_t current_rows = _next_rowid - _first_rowid;
683
1.71M
        size_t expected_rows = current_rows + num_rows;
684
1.71M
        size_t est_non_null = non_null_count;
685
1.71M
        if (num_rows > 0 && expected_rows > num_rows) {
686
1.23M
            est_non_null = (non_null_count * expected_rows) / num_rows;
687
1.23M
        }
688
1.71M
        _null_bitmap_builder->reserve_for_write(expected_rows, est_non_null);
689
1.71M
    }
690
691
1.71M
    if (non_null_count == 0) {
692
        // All NULL: skip data writing, only update null bitmap and indexes
693
56.5k
        RETURN_IF_ERROR(append_nulls(num_rows));
694
56.5k
        *ptr += cell_size() * num_rows;
695
56.5k
        return Status::OK();
696
56.5k
    }
697
698
1.65M
    if (non_null_count == num_rows) {
699
        // All non-NULL: use normal append_data which handles both data and null bitmap
700
1.61M
        return append_data(ptr, num_rows);
701
1.61M
    }
702
703
    // Process by runs
704
5.03M
    for (const auto& run : _null_run_buffer) {
705
5.03M
        size_t run_length = run.len;
706
5.03M
        if (run.is_null) {
707
2.52M
            RETURN_IF_ERROR(append_nulls(run_length));
708
2.52M
            *ptr += cell_size() * run_length;
709
2.52M
        } else {
710
            // TODO:
711
            //  1. `*ptr += cell_size() * step;` should do in this function, not append_data;
712
            //  2. support array vectorized load and ptr offset add
713
2.50M
            RETURN_IF_ERROR(append_data(ptr, run_length));
714
2.50M
        }
715
5.03M
    }
716
717
44.2k
    return Status::OK();
718
44.2k
}
719
720
102k
uint64_t ScalarColumnWriter::estimate_buffer_size() {
721
102k
    uint64_t size = _data_size;
722
102k
    size += _page_builder->size();
723
102k
    if (is_nullable()) {
724
57.8k
        size += _null_bitmap_builder->size();
725
57.8k
    }
726
102k
    size += _ordinal_index_builder->size();
727
102k
    if (_opts.need_zone_map) {
728
86.8k
        size += _zone_map_index_builder->size();
729
86.8k
    }
730
102k
    if (_opts.need_bloom_filter) {
731
267
        size += _bloom_filter_index_builder->size();
732
267
    }
733
102k
    return size;
734
102k
}
735
736
1.03M
Status ScalarColumnWriter::finish() {
737
1.03M
    RETURN_IF_ERROR(finish_current_page());
738
1.03M
    _opts.meta->set_num_rows(_next_rowid);
739
1.03M
    return Status::OK();
740
1.03M
}
741
742
1.03M
Status ScalarColumnWriter::write_data() {
743
1.03M
    auto offset = _file_writer->bytes_appended();
744
1.39M
    auto collect_uncompressed_bytes = [](const PageFooterPB& footer) {
745
1.39M
        return footer.uncompressed_size() + footer.ByteSizeLong() +
746
1.39M
               sizeof(uint32_t) /* footer size */ + sizeof(uint32_t) /* checksum */;
747
1.39M
    };
748
1.05M
    for (auto& page : _pages) {
749
1.05M
        _total_uncompressed_data_pages_size += collect_uncompressed_bytes(page->footer);
750
1.05M
        RETURN_IF_ERROR(_write_data_page(page.get()));
751
1.05M
    }
752
1.03M
    _pages.clear();
753
    // write column dict
754
1.03M
    if (_encoding_info->encoding() == DICT_ENCODING) {
755
337k
        OwnedSlice dict_body;
756
337k
        RETURN_IF_ERROR(_page_builder->get_dictionary_page(&dict_body));
757
337k
        EncodingTypePB dict_word_page_encoding;
758
337k
        RETURN_IF_ERROR(_page_builder->get_dictionary_page_encoding(&dict_word_page_encoding));
759
760
337k
        PageFooterPB footer;
761
337k
        footer.set_type(DICTIONARY_PAGE);
762
337k
        footer.set_uncompressed_size(cast_set<uint32_t>(dict_body.slice().get_size()));
763
337k
        footer.mutable_dict_page_footer()->set_encoding(dict_word_page_encoding);
764
337k
        _total_uncompressed_data_pages_size += collect_uncompressed_bytes(footer);
765
766
337k
        PagePointer dict_pp;
767
337k
        RETURN_IF_ERROR(PageIO::compress_and_write_page(
768
337k
                _compress_codec, _opts.compression_min_space_saving, _file_writer,
769
337k
                {dict_body.slice()}, footer, &dict_pp));
770
337k
        dict_pp.to_proto(_opts.meta->mutable_dict_page());
771
337k
    }
772
1.03M
    _total_compressed_data_pages_size += _file_writer->bytes_appended() - offset;
773
1.03M
    _page_builder.reset();
774
1.03M
    return Status::OK();
775
1.03M
}
776
777
986k
Status ScalarColumnWriter::write_ordinal_index() {
778
986k
    return _ordinal_index_builder->finish(_file_writer, _opts.meta->add_indexes());
779
986k
}
780
781
806k
Status ScalarColumnWriter::write_zone_map() {
782
806k
    if (_opts.need_zone_map) {
783
752k
        return _zone_map_index_builder->finish(_file_writer, _opts.meta->add_indexes());
784
752k
    }
785
53.3k
    return Status::OK();
786
806k
}
787
788
739k
Status ScalarColumnWriter::write_inverted_index() {
789
739k
    if (_opts.need_inverted_index) {
790
39.5k
        for (const auto& builder : _inverted_index_builders) {
791
39.5k
            RETURN_IF_ERROR(builder->finish());
792
39.5k
        }
793
39.3k
    }
794
739k
    return Status::OK();
795
739k
}
796
797
730k
Status ScalarColumnWriter::write_bloom_filter_index() {
798
730k
    if (_opts.need_bloom_filter) {
799
5.32k
        return _bloom_filter_index_builder->finish(_file_writer, _opts.meta->add_indexes());
800
5.32k
    }
801
725k
    return Status::OK();
802
730k
}
803
804
// write a data page into file and update ordinal index
805
1.05M
Status ScalarColumnWriter::_write_data_page(Page* page) {
806
1.05M
    PagePointer pp;
807
1.05M
    std::vector<Slice> compressed_body;
808
2.01M
    for (auto& data : page->data) {
809
2.01M
        compressed_body.push_back(data.slice());
810
2.01M
    }
811
1.05M
    RETURN_IF_ERROR(PageIO::write_page(_file_writer, compressed_body, page->footer, &pp));
812
1.05M
    _ordinal_index_builder->append_entry(page->footer.data_page_footer().first_ordinal(), pp);
813
1.05M
    return Status::OK();
814
1.05M
}
815
816
1.10M
Status ScalarColumnWriter::finish_current_page() {
817
1.10M
    if (_next_rowid == _first_rowid) {
818
49.6k
        return Status::OK();
819
49.6k
    }
820
1.05M
    if (_opts.need_zone_map) {
821
        // If the number of rows in the current page is less than the threshold,
822
        // we will invalidate zone map index for this page by set pass_all to true.
823
817k
        if (_next_rowid - _first_rowid < config::zone_map_row_num_threshold) {
824
579k
            _zone_map_index_builder->invalid_page_zone_map();
825
579k
        }
826
817k
        RETURN_IF_ERROR(_zone_map_index_builder->flush());
827
817k
    }
828
829
1.05M
    if (_opts.need_bloom_filter) {
830
5.56k
        RETURN_IF_ERROR(_bloom_filter_index_builder->flush());
831
5.56k
    }
832
833
1.05M
    _raw_data_bytes += _page_builder->get_raw_data_size();
834
835
    // build data page body : encoded values + [nullmap]
836
1.05M
    std::vector<Slice> body;
837
1.05M
    OwnedSlice encoded_values;
838
1.05M
    RETURN_IF_ERROR(_page_builder->finish(&encoded_values));
839
1.05M
    RETURN_IF_ERROR(_page_builder->reset());
840
1.05M
    body.push_back(encoded_values.slice());
841
842
1.05M
    OwnedSlice nullmap;
843
1.05M
    if (_null_bitmap_builder != nullptr) {
844
545k
        if (is_nullable() && _null_bitmap_builder->has_null()) {
845
149k
            RETURN_IF_ERROR(_null_bitmap_builder->finish(&nullmap));
846
149k
            body.push_back(nullmap.slice());
847
149k
        }
848
545k
        _null_bitmap_builder->reset();
849
545k
    }
850
851
    // prepare data page footer
852
1.05M
    std::unique_ptr<Page> page(new Page());
853
1.05M
    page->footer.set_type(DATA_PAGE);
854
1.05M
    page->footer.set_uncompressed_size(cast_set<uint32_t>(Slice::compute_total_size(body)));
855
1.05M
    auto* data_page_footer = page->footer.mutable_data_page_footer();
856
1.05M
    data_page_footer->set_first_ordinal(_first_rowid);
857
1.05M
    data_page_footer->set_num_values(_next_rowid - _first_rowid);
858
1.05M
    data_page_footer->set_nullmap_size(cast_set<uint32_t>(nullmap.slice().size));
859
1.05M
    if (_new_page_callback != nullptr) {
860
73.3k
        _new_page_callback->put_extra_info_in_page(data_page_footer);
861
73.3k
    }
862
    // trying to compress page body
863
1.05M
    OwnedSlice compressed_body;
864
1.05M
    RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving,
865
1.05M
                                               body, &compressed_body));
866
1.05M
    if (compressed_body.slice().empty()) {
867
        // page body is uncompressed
868
959k
        page->data.emplace_back(std::move(encoded_values));
869
959k
        page->data.emplace_back(std::move(nullmap));
870
959k
    } else {
871
        // page body is compressed
872
99.4k
        page->data.emplace_back(std::move(compressed_body));
873
99.4k
    }
874
875
1.05M
    _push_back_page(std::move(page));
876
1.05M
    _first_rowid = _next_rowid;
877
1.05M
    return Status::OK();
878
1.05M
}
879
880
////////////////////////////////////////////////////////////////////////////////
881
882
////////////////////////////////////////////////////////////////////////////////
883
// offset column writer
884
////////////////////////////////////////////////////////////////////////////////
885
886
OffsetColumnWriter::OffsetColumnWriter(const ColumnWriterOptions& opts, TabletColumnPtr column,
887
                                       io::FileWriter* file_writer)
888
73.3k
        : ScalarColumnWriter(opts, std::move(column), file_writer) {
889
    // now we only explain data in offset column as uint64
890
73.3k
    DCHECK(get_column()->type() == FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT);
891
73.3k
}
892
893
73.5k
OffsetColumnWriter::~OffsetColumnWriter() = default;
894
895
73.3k
Status OffsetColumnWriter::init() {
896
73.3k
    RETURN_IF_ERROR(ScalarColumnWriter::init());
897
73.3k
    register_flush_page_callback(this);
898
73.3k
    _next_offset = 0;
899
73.3k
    return Status::OK();
900
73.3k
}
901
902
73.4k
Status OffsetColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
903
73.4k
    size_t remaining = num_rows;
904
147k
    while (remaining > 0) {
905
74.0k
        size_t num_written = remaining;
906
74.0k
        RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written));
907
        // Callers provide one extra tail offset after the written rows so the page footer can
908
        // store the next array item ordinal for the current page.
909
74.0k
        _next_offset = *(const uint64_t*)(*ptr);
910
74.0k
        remaining -= num_written;
911
912
74.0k
        if (_page_builder->is_page_full()) {
913
            // get next data for next array_item_rowid
914
608
            RETURN_IF_ERROR(finish_current_page());
915
608
        }
916
74.0k
    }
917
73.4k
    return Status::OK();
918
73.4k
}
919
920
73.3k
void OffsetColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
921
73.3k
    footer->set_next_array_item_ordinal(_next_offset);
922
73.3k
}
923
924
StructColumnWriter::StructColumnWriter(
925
        const ColumnWriterOptions& opts, TabletColumnPtr column, ScalarColumnWriter* null_writer,
926
        std::vector<std::unique_ptr<ColumnWriter>>& sub_column_writers)
927
2.34k
        : ColumnWriter(std::move(column), opts.meta->is_nullable(), opts.meta), _opts(opts) {
928
10.6k
    for (auto& sub_column_writer : sub_column_writers) {
929
10.6k
        _sub_column_writers.push_back(std::move(sub_column_writer));
930
10.6k
    }
931
2.34k
    _num_sub_column_writers = _sub_column_writers.size();
932
2.34k
    DCHECK(_num_sub_column_writers >= 1);
933
2.34k
    if (is_nullable()) {
934
2.07k
        _null_writer.reset(null_writer);
935
2.07k
    }
936
2.34k
}
937
938
2.34k
Status StructColumnWriter::init() {
939
10.6k
    for (auto& column_writer : _sub_column_writers) {
940
10.6k
        RETURN_IF_ERROR(column_writer->init());
941
10.6k
    }
942
2.34k
    if (is_nullable()) {
943
2.07k
        RETURN_IF_ERROR(_null_writer->init());
944
2.07k
    }
945
2.34k
    return Status::OK();
946
2.34k
}
947
948
1.72k
Status StructColumnWriter::write_inverted_index() {
949
1.72k
    if (_opts.need_inverted_index) {
950
0
        for (auto& column_writer : _sub_column_writers) {
951
0
            RETURN_IF_ERROR(column_writer->write_inverted_index());
952
0
        }
953
0
    }
954
1.72k
    return Status::OK();
955
1.72k
}
956
957
Status StructColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
958
2.03k
                                           size_t num_rows) {
959
2.03k
    RETURN_IF_ERROR(append_data(ptr, num_rows));
960
2.03k
    RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
961
2.03k
    return Status::OK();
962
2.03k
}
963
964
2.30k
Status StructColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
965
2.30k
    const auto* results = reinterpret_cast<const uint64_t*>(*ptr);
966
12.6k
    for (size_t i = 0; i < _num_sub_column_writers; ++i) {
967
10.2k
        auto nullmap = *(results + _num_sub_column_writers + i);
968
10.2k
        auto data = *(results + i);
969
10.2k
        RETURN_IF_ERROR(_sub_column_writers[i]->append(reinterpret_cast<const uint8_t*>(nullmap),
970
10.2k
                                                       reinterpret_cast<const void*>(data),
971
10.2k
                                                       num_rows));
972
10.2k
    }
973
2.30k
    return Status::OK();
974
2.30k
}
975
976
186
uint64_t StructColumnWriter::estimate_buffer_size() {
977
186
    uint64_t size = 0;
978
780
    for (auto& column_writer : _sub_column_writers) {
979
780
        size += column_writer->estimate_buffer_size();
980
780
    }
981
186
    size += is_nullable() ? _null_writer->estimate_buffer_size() : 0;
982
186
    return size;
983
186
}
984
985
2.34k
Status StructColumnWriter::finish() {
986
10.6k
    for (auto& column_writer : _sub_column_writers) {
987
10.6k
        RETURN_IF_ERROR(column_writer->finish());
988
10.6k
    }
989
2.34k
    if (is_nullable()) {
990
2.07k
        RETURN_IF_ERROR(_null_writer->finish());
991
2.07k
    }
992
2.34k
    _opts.meta->set_num_rows(get_next_rowid());
993
2.34k
    return Status::OK();
994
2.34k
}
995
996
2.34k
Status StructColumnWriter::write_data() {
997
10.6k
    for (auto& column_writer : _sub_column_writers) {
998
10.6k
        RETURN_IF_ERROR(column_writer->write_data());
999
10.6k
    }
1000
2.34k
    if (is_nullable()) {
1001
2.07k
        RETURN_IF_ERROR(_null_writer->write_data());
1002
2.07k
    }
1003
2.34k
    return Status::OK();
1004
2.34k
}
1005
1006
2.30k
Status StructColumnWriter::write_ordinal_index() {
1007
10.2k
    for (auto& column_writer : _sub_column_writers) {
1008
10.2k
        RETURN_IF_ERROR(column_writer->write_ordinal_index());
1009
10.2k
    }
1010
2.30k
    if (is_nullable()) {
1011
2.03k
        RETURN_IF_ERROR(_null_writer->write_ordinal_index());
1012
2.03k
    }
1013
2.30k
    return Status::OK();
1014
2.30k
}
1015
1016
0
Status StructColumnWriter::append_nulls(size_t num_rows) {
1017
0
    for (auto& column_writer : _sub_column_writers) {
1018
0
        RETURN_IF_ERROR(column_writer->append_nulls(num_rows));
1019
0
    }
1020
0
    if (is_nullable()) {
1021
0
        std::vector<UInt8> null_signs(num_rows, 1);
1022
0
        const uint8_t* null_sign_ptr = null_signs.data();
1023
0
        RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, num_rows));
1024
0
    }
1025
0
    return Status::OK();
1026
0
}
1027
1028
0
Status StructColumnWriter::finish_current_page() {
1029
0
    return Status::NotSupported("struct writer has no data, can not finish_current_page");
1030
0
}
1031
1032
ArrayColumnWriter::ArrayColumnWriter(const ColumnWriterOptions& opts, TabletColumnPtr column,
1033
                                     OffsetColumnWriter* offset_writer,
1034
                                     ScalarColumnWriter* null_writer,
1035
                                     std::unique_ptr<ColumnWriter> item_writer)
1036
44.3k
        : ColumnWriter(std::move(column), opts.meta->is_nullable(), opts.meta),
1037
44.3k
          _item_writer(std::move(item_writer)),
1038
44.3k
          _opts(opts) {
1039
44.3k
    _offset_writer.reset(offset_writer);
1040
44.3k
    if (is_nullable()) {
1041
28.5k
        _null_writer.reset(null_writer);
1042
28.5k
    }
1043
44.3k
}
1044
1045
44.2k
Status ArrayColumnWriter::init() {
1046
44.2k
    RETURN_IF_ERROR(_offset_writer->init());
1047
44.2k
    if (is_nullable()) {
1048
28.6k
        RETURN_IF_ERROR(_null_writer->init());
1049
28.6k
    }
1050
44.2k
    RETURN_IF_ERROR(_item_writer->init());
1051
44.2k
    if (_opts.need_inverted_index) {
1052
1.79k
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1053
1.79k
        if (writer != nullptr) {
1054
1.79k
            RETURN_IF_ERROR(IndexColumnWriter::create(get_column(), &_inverted_index_writer,
1055
1.79k
                                                      _opts.index_file_writer,
1056
1.79k
                                                      _opts.inverted_indexes[0]));
1057
1.79k
        }
1058
1.79k
    }
1059
44.2k
    if (_opts.need_ann_index) {
1060
64
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1061
64
        if (writer != nullptr) {
1062
64
            _ann_index_writer = std::make_unique<AnnIndexColumnWriter>(_opts.index_file_writer,
1063
64
                                                                       _opts.ann_index);
1064
64
            RETURN_IF_ERROR(_ann_index_writer->init());
1065
64
        }
1066
64
    }
1067
44.2k
    return Status::OK();
1068
44.2k
}
1069
1070
41.2k
Status ArrayColumnWriter::write_inverted_index() {
1071
41.2k
    if (_opts.need_inverted_index) {
1072
1.79k
        return _inverted_index_writer->finish();
1073
1.79k
    }
1074
39.4k
    return Status::OK();
1075
41.2k
}
1076
1077
41.2k
Status ArrayColumnWriter::write_ann_index() {
1078
41.2k
    if (_opts.need_ann_index) {
1079
61
        return _ann_index_writer->finish();
1080
61
    }
1081
41.1k
    return Status::OK();
1082
41.2k
}
1083
1084
// batch append data for array
1085
44.2k
Status ArrayColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
1086
    // data_ptr contains
1087
    // [size, offset_ptr, item_data_ptr, item_nullmap_ptr]
1088
44.2k
    auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
1089
    // total number length
1090
44.2k
    size_t element_cnt = size_t((unsigned long)(*data_ptr));
1091
44.2k
    auto offset_data = *(data_ptr + 1);
1092
44.2k
    const uint8_t* offsets_ptr = (const uint8_t*)offset_data;
1093
44.2k
    auto data = *(data_ptr + 2);
1094
44.2k
    auto nested_null_map = *(data_ptr + 3);
1095
44.2k
    if (element_cnt > 0) {
1096
27.7k
        RETURN_IF_ERROR(_item_writer->append(reinterpret_cast<const uint8_t*>(nested_null_map),
1097
27.7k
                                             reinterpret_cast<const void*>(data), element_cnt));
1098
27.7k
    }
1099
44.2k
    if (_opts.need_inverted_index) {
1100
1.88k
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1101
        // now only support nested type is scala
1102
1.88k
        if (writer != nullptr) {
1103
            //NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr
1104
1.88k
            RETURN_IF_ERROR(_inverted_index_writer->add_array_values(
1105
1.88k
                    field_type_size(_item_writer->get_column()->type()),
1106
1.88k
                    reinterpret_cast<const void*>(data),
1107
1.88k
                    reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
1108
1.88k
        }
1109
1.88k
    }
1110
1111
44.2k
    if (_opts.need_ann_index) {
1112
64
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1113
        // now only support nested type is scala
1114
64
        if (writer != nullptr) {
1115
            //NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr
1116
64
            RETURN_IF_ERROR(_ann_index_writer->add_array_values(
1117
64
                    field_type_size(_item_writer->get_column()->type()),
1118
64
                    reinterpret_cast<const void*>(data),
1119
64
                    reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
1120
64
        } else {
1121
0
            return Status::NotSupported(
1122
0
                    "Ann index can only be build on array with scalar type. but got {} as "
1123
0
                    "nested",
1124
0
                    _item_writer->get_column()->type());
1125
0
        }
1126
64
    }
1127
1128
44.2k
    RETURN_IF_ERROR(_offset_writer->append_data(&offsets_ptr, num_rows));
1129
44.2k
    return Status::OK();
1130
44.2k
}
1131
1132
1.00k
uint64_t ArrayColumnWriter::estimate_buffer_size() {
1133
1.00k
    return _offset_writer->estimate_buffer_size() +
1134
1.00k
           (is_nullable() ? _null_writer->estimate_buffer_size() : 0) +
1135
1.00k
           _item_writer->estimate_buffer_size();
1136
1.00k
}
1137
1138
Status ArrayColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
1139
28.3k
                                          size_t num_rows) {
1140
28.3k
    RETURN_IF_ERROR(append_data(ptr, num_rows));
1141
28.3k
    if (is_nullable()) {
1142
28.3k
        if (_opts.need_inverted_index) {
1143
1.48k
            RETURN_IF_ERROR(_inverted_index_writer->add_array_nulls(null_map, num_rows));
1144
1.48k
        }
1145
28.3k
        RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
1146
28.3k
    }
1147
28.3k
    return Status::OK();
1148
28.3k
}
1149
1150
44.4k
Status ArrayColumnWriter::finish() {
1151
44.4k
    RETURN_IF_ERROR(_offset_writer->finish());
1152
44.4k
    if (is_nullable()) {
1153
28.6k
        RETURN_IF_ERROR(_null_writer->finish());
1154
28.6k
    }
1155
44.4k
    RETURN_IF_ERROR(_item_writer->finish());
1156
44.4k
    _opts.meta->set_num_rows(get_next_rowid());
1157
44.4k
    return Status::OK();
1158
44.4k
}
1159
1160
44.4k
Status ArrayColumnWriter::write_data() {
1161
44.4k
    RETURN_IF_ERROR(_offset_writer->write_data());
1162
44.4k
    if (is_nullable()) {
1163
28.6k
        RETURN_IF_ERROR(_null_writer->write_data());
1164
28.6k
    }
1165
44.4k
    RETURN_IF_ERROR(_item_writer->write_data());
1166
44.4k
    return Status::OK();
1167
44.4k
}
1168
1169
43.9k
Status ArrayColumnWriter::write_ordinal_index() {
1170
43.9k
    RETURN_IF_ERROR(_offset_writer->write_ordinal_index());
1171
43.9k
    if (is_nullable()) {
1172
28.1k
        RETURN_IF_ERROR(_null_writer->write_ordinal_index());
1173
28.1k
    }
1174
43.9k
    if (!has_empty_items()) {
1175
27.5k
        RETURN_IF_ERROR(_item_writer->write_ordinal_index());
1176
27.5k
    }
1177
43.9k
    return Status::OK();
1178
43.9k
}
1179
1180
0
Status ArrayColumnWriter::append_nulls(size_t num_rows) {
1181
0
    const UInt64 offset = cast_set<UInt64>(_item_writer->get_next_rowid());
1182
0
    std::vector<UInt64> offsets_data(num_rows + 1, offset);
1183
0
    const uint8_t* offsets_ptr = reinterpret_cast<const uint8_t*>(offsets_data.data());
1184
0
    RETURN_IF_ERROR(_offset_writer->append_data(&offsets_ptr, num_rows));
1185
0
    return write_null_column(num_rows, true);
1186
0
}
1187
1188
0
Status ArrayColumnWriter::write_null_column(size_t num_rows, bool is_null) {
1189
0
    uint8_t null_sign = is_null ? 1 : 0;
1190
0
    while (is_nullable() && num_rows > 0) {
1191
        // TODO llj bulk write
1192
0
        const uint8_t* null_sign_ptr = &null_sign;
1193
0
        RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, 1));
1194
0
        --num_rows;
1195
0
    }
1196
0
    return Status::OK();
1197
0
}
1198
1199
0
Status ArrayColumnWriter::finish_current_page() {
1200
0
    return Status::NotSupported("array writer has no data, can not finish_current_page");
1201
0
}
1202
1203
/// ============================= MapColumnWriter =====================////
1204
MapColumnWriter::MapColumnWriter(const ColumnWriterOptions& opts, TabletColumnPtr column,
1205
                                 ScalarColumnWriter* null_writer, OffsetColumnWriter* offset_writer,
1206
                                 std::vector<std::unique_ptr<ColumnWriter>>& kv_writers)
1207
29.0k
        : ColumnWriter(std::move(column), opts.meta->is_nullable(), opts.meta), _opts(opts) {
1208
29.0k
    CHECK_EQ(kv_writers.size(), 2);
1209
29.0k
    _offsets_writer.reset(offset_writer);
1210
29.0k
    if (is_nullable()) {
1211
9.03k
        _null_writer.reset(null_writer);
1212
9.03k
    }
1213
58.0k
    for (auto& sub_writers : kv_writers) {
1214
58.0k
        _kv_writers.push_back(std::move(sub_writers));
1215
58.0k
    }
1216
29.0k
}
1217
1218
29.0k
Status MapColumnWriter::init() {
1219
29.0k
    RETURN_IF_ERROR(_offsets_writer->init());
1220
29.0k
    if (is_nullable()) {
1221
9.04k
        RETURN_IF_ERROR(_null_writer->init());
1222
9.04k
    }
1223
    // here register_flush_page_callback to call this.put_extra_info_in_page()
1224
    // when finish cur data page
1225
58.1k
    for (auto& sub_writer : _kv_writers) {
1226
58.1k
        RETURN_IF_ERROR(sub_writer->init());
1227
58.1k
    }
1228
29.0k
    return Status::OK();
1229
29.0k
}
1230
1231
1.33k
uint64_t MapColumnWriter::estimate_buffer_size() {
1232
1.33k
    size_t estimate = 0;
1233
2.67k
    for (auto& sub_writer : _kv_writers) {
1234
2.67k
        estimate += sub_writer->estimate_buffer_size();
1235
2.67k
    }
1236
1.33k
    estimate += _offsets_writer->estimate_buffer_size();
1237
1.33k
    if (is_nullable()) {
1238
1.31k
        estimate += _null_writer->estimate_buffer_size();
1239
1.31k
    }
1240
1.33k
    return estimate;
1241
1.33k
}
1242
1243
29.0k
Status MapColumnWriter::finish() {
1244
29.0k
    RETURN_IF_ERROR(_offsets_writer->finish());
1245
29.0k
    if (is_nullable()) {
1246
9.04k
        RETURN_IF_ERROR(_null_writer->finish());
1247
9.04k
    }
1248
58.1k
    for (auto& sub_writer : _kv_writers) {
1249
58.1k
        RETURN_IF_ERROR(sub_writer->finish());
1250
58.1k
    }
1251
29.0k
    _opts.meta->set_num_rows(get_next_rowid());
1252
29.0k
    return Status::OK();
1253
29.0k
}
1254
1255
Status MapColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
1256
9.07k
                                        size_t num_rows) {
1257
9.07k
    RETURN_IF_ERROR(append_data(ptr, num_rows));
1258
9.07k
    if (is_nullable()) {
1259
9.07k
        RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
1260
9.07k
    }
1261
9.07k
    return Status::OK();
1262
9.07k
}
1263
1264
// write key value data with offsets
1265
29.2k
Status MapColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
1266
    // data_ptr contains
1267
    // [size, offset_ptr, key_data_ptr, val_data_ptr, k_nullmap_ptr, v_nullmap_pr]
1268
    // which converted results from olap_map_convertor and later will use a structure to replace it
1269
29.2k
    auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
1270
    // total number length
1271
29.2k
    size_t element_cnt = size_t((unsigned long)(*data_ptr));
1272
29.2k
    auto offset_data = *(data_ptr + 1);
1273
29.2k
    const uint8_t* offsets_ptr = (const uint8_t*)offset_data;
1274
1275
29.2k
    if (element_cnt > 0) {
1276
41.5k
        for (size_t i = 0; i < 2; ++i) {
1277
27.6k
            auto data = *(data_ptr + 2 + i);
1278
27.6k
            auto nested_null_map = *(data_ptr + 2 + 2 + i);
1279
27.6k
            RETURN_IF_ERROR(
1280
27.6k
                    _kv_writers[i]->append(reinterpret_cast<const uint8_t*>(nested_null_map),
1281
27.6k
                                           reinterpret_cast<const void*>(data), element_cnt));
1282
27.6k
        }
1283
13.8k
    }
1284
    // make sure the order : offset writer flush next_array_item_ordinal after kv_writers append_data
1285
    // because we use _kv_writers[0]->get_next_rowid() to set next_array_item_ordinal in offset page footer
1286
29.2k
    RETURN_IF_ERROR(_offsets_writer->append_data(&offsets_ptr, num_rows));
1287
29.2k
    return Status::OK();
1288
29.2k
}
1289
1290
29.0k
Status MapColumnWriter::write_data() {
1291
29.0k
    RETURN_IF_ERROR(_offsets_writer->write_data());
1292
29.0k
    if (is_nullable()) {
1293
9.04k
        RETURN_IF_ERROR(_null_writer->write_data());
1294
9.04k
    }
1295
58.1k
    for (auto& sub_writer : _kv_writers) {
1296
58.1k
        RETURN_IF_ERROR(sub_writer->write_data());
1297
58.1k
    }
1298
29.0k
    return Status::OK();
1299
29.0k
}
1300
1301
28.7k
Status MapColumnWriter::write_ordinal_index() {
1302
28.7k
    RETURN_IF_ERROR(_offsets_writer->write_ordinal_index());
1303
28.7k
    if (is_nullable()) {
1304
8.78k
        RETURN_IF_ERROR(_null_writer->write_ordinal_index());
1305
8.78k
    }
1306
57.5k
    for (auto& sub_writer : _kv_writers) {
1307
57.5k
        if (sub_writer->get_next_rowid() != 0) {
1308
26.7k
            RETURN_IF_ERROR(sub_writer->write_ordinal_index());
1309
26.7k
        }
1310
57.5k
    }
1311
28.7k
    return Status::OK();
1312
28.7k
}
1313
1314
0
Status MapColumnWriter::append_nulls(size_t num_rows) {
1315
0
    for (auto& sub_writer : _kv_writers) {
1316
0
        RETURN_IF_ERROR(sub_writer->append_nulls(num_rows));
1317
0
    }
1318
0
    const UInt64 offset = cast_set<UInt64>(_kv_writers[0]->get_next_rowid());
1319
0
    std::vector<UInt64> offsets_data(num_rows + 1, offset);
1320
0
    const uint8_t* offsets_ptr = reinterpret_cast<const uint8_t*>(offsets_data.data());
1321
0
    RETURN_IF_ERROR(_offsets_writer->append_data(&offsets_ptr, num_rows));
1322
1323
0
    if (is_nullable()) {
1324
0
        std::vector<UInt8> null_signs(num_rows, 1);
1325
0
        const uint8_t* null_sign_ptr = null_signs.data();
1326
0
        RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, num_rows));
1327
0
    }
1328
0
    return Status::OK();
1329
0
}
1330
1331
0
Status MapColumnWriter::finish_current_page() {
1332
0
    return Status::NotSupported("map writer has no data, can not finish_current_page");
1333
0
}
1334
1335
10.8k
Status MapColumnWriter::write_inverted_index() {
1336
10.8k
    if (_opts.need_inverted_index) {
1337
0
        return _index_builder->finish();
1338
0
    }
1339
10.8k
    return Status::OK();
1340
10.8k
}
1341
1342
VariantColumnWriter::VariantColumnWriter(const ColumnWriterOptions& opts, TabletColumnPtr column)
1343
5.90k
        : ColumnWriter(std::move(column), opts.meta->is_nullable(), opts.meta) {
1344
5.90k
    _impl = std::make_unique<VariantColumnWriterImpl>(opts, get_column());
1345
5.90k
}
1346
1347
5.88k
Status VariantColumnWriter::init() {
1348
5.88k
    return _impl->init();
1349
5.88k
}
1350
1351
674
Status VariantColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
1352
674
    _next_rowid += num_rows;
1353
674
    return _impl->append_data(ptr, num_rows);
1354
674
}
1355
1356
871
uint64_t VariantColumnWriter::estimate_buffer_size() {
1357
871
    return _impl->estimate_buffer_size();
1358
871
}
1359
1360
5.92k
Status VariantColumnWriter::finish() {
1361
5.92k
    return _impl->finish();
1362
5.92k
}
1363
5.92k
Status VariantColumnWriter::write_data() {
1364
5.92k
    return _impl->write_data();
1365
5.92k
}
1366
5.92k
Status VariantColumnWriter::write_ordinal_index() {
1367
5.92k
    return _impl->write_ordinal_index();
1368
5.92k
}
1369
1370
5.91k
Status VariantColumnWriter::write_zone_map() {
1371
5.91k
    return _impl->write_zone_map();
1372
5.91k
}
1373
1374
5.90k
Status VariantColumnWriter::write_inverted_index() {
1375
5.90k
    return _impl->write_inverted_index();
1376
5.90k
}
1377
5.90k
Status VariantColumnWriter::write_bloom_filter_index() {
1378
5.90k
    return _impl->write_bloom_filter_index();
1379
5.90k
}
1380
1381
Status VariantColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
1382
5.47k
                                            size_t num_rows) {
1383
5.47k
    return _impl->append_nullable(null_map, ptr, num_rows);
1384
5.47k
}
1385
1386
} // namespace doris::segment_v2