Coverage Report

Created: 2026-05-21 06:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/column_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/column_writer.h"
19
20
#include <gen_cpp/segment_v2.pb.h>
21
22
#include <algorithm>
23
#include <cstring>
24
#include <filesystem>
25
#include <memory>
26
27
#include "common/config.h"
28
#include "common/logging.h"
29
#include "core/data_type/data_type_agg_state.h"
30
#include "core/data_type/data_type_factory.hpp"
31
#include "core/types.h"
32
#include "io/fs/file_writer.h"
33
#include "storage/index/bloom_filter/bloom_filter_index_writer.h"
34
#include "storage/index/inverted/inverted_index_writer.h"
35
#include "storage/index/ordinal_page_index.h"
36
#include "storage/index/zone_map/zone_map_index.h"
37
#include "storage/olap_common.h"
38
#include "storage/segment/encoding_info.h"
39
#include "storage/segment/options.h"
40
#include "storage/segment/page_builder.h"
41
#include "storage/segment/page_io.h"
42
#include "storage/segment/page_pointer.h"
43
#include "storage/segment/variant/variant_column_writer_impl.h"
44
#include "storage/tablet/tablet_schema.h"
45
#include "storage/types.h"
46
#include "util/block_compression.h"
47
#include "util/debug_points.h"
48
#include "util/faststring.h"
49
#include "util/rle_encoding.h"
50
#include "util/simd/bits.h"
51
52
namespace doris::segment_v2 {
53
54
class NullBitmapBuilder {
55
public:
56
617k
    NullBitmapBuilder() : _has_null(false), _bitmap_buf(512), _rle_encoder(&_bitmap_buf, 1) {}
57
58
    explicit NullBitmapBuilder(size_t reserve_bits)
59
            : _has_null(false),
60
              _bitmap_buf(BitmapSize(reserve_bits)),
61
0
              _rle_encoder(&_bitmap_buf, 1) {}
62
63
1.99M
    void reserve_for_write(size_t num_rows, size_t non_null_count) {
64
1.99M
        if (num_rows == 0) {
65
0
            return;
66
0
        }
67
1.99M
        if (non_null_count == 0 || (non_null_count == num_rows && !_has_null)) {
68
470k
            if (_bitmap_buf.capacity() < kSmallReserveBytes) {
69
44
                _bitmap_buf.reserve(kSmallReserveBytes);
70
44
            }
71
470k
            return;
72
470k
        }
73
1.52M
        size_t raw_bytes = BitmapSize(num_rows);
74
1.52M
        size_t run_est = std::min(num_rows, non_null_count * 2 + 1);
75
1.52M
        size_t run_bytes_est = run_est * kBytesPerRun + kReserveSlackBytes;
76
1.52M
        size_t raw_overhead = raw_bytes / 63 + 1;
77
1.52M
        size_t raw_est = raw_bytes + raw_overhead + kReserveSlackBytes;
78
1.52M
        size_t reserve_bytes = std::min(raw_est, run_bytes_est);
79
1.52M
        if (_bitmap_buf.capacity() < reserve_bytes) {
80
3.93k
            const size_t cap = _bitmap_buf.capacity();
81
3.93k
            const size_t grow = cap + cap / 2;
82
3.93k
            const size_t new_cap = std::max(reserve_bytes, grow);
83
3.93k
            _bitmap_buf.reserve(new_cap);
84
3.93k
        }
85
1.52M
    }
86
87
6.18M
    void add_run(bool value, size_t run) {
88
6.18M
        _has_null |= value;
89
6.18M
        _rle_encoder.Put(value, run);
90
6.18M
    }
91
92
    // Returns whether the building nullmap contains nullptr
93
616k
    bool has_null() const { return _has_null; }
94
95
192k
    Status finish(OwnedSlice* slice) {
96
192k
        _rle_encoder.Flush();
97
192k
        RETURN_IF_CATCH_EXCEPTION({ *slice = _bitmap_buf.build(); });
98
192k
        return Status::OK();
99
192k
    }
100
101
616k
    void reset() {
102
616k
        _has_null = false;
103
616k
        _rle_encoder.Clear();
104
616k
    }
105
106
49.8k
    uint64_t size() { return _bitmap_buf.size(); }
107
108
private:
109
    static constexpr size_t kSmallReserveBytes = 64;
110
    static constexpr size_t kReserveSlackBytes = 16;
111
    static constexpr size_t kBytesPerRun = 6;
112
113
    bool _has_null;
114
    faststring _bitmap_buf;
115
    RleEncoder<bool> _rle_encoder;
116
};
117
118
inline ScalarColumnWriter* get_null_writer(const ColumnWriterOptions& opts,
119
76.9k
                                           io::FileWriter* file_writer, uint32_t id) {
120
76.9k
    if (!opts.meta->is_nullable()) {
121
35.1k
        return nullptr;
122
35.1k
    }
123
124
41.8k
    FieldType null_type = FieldType::OLAP_FIELD_TYPE_TINYINT;
125
41.8k
    ColumnWriterOptions null_options;
126
41.8k
    null_options.meta = opts.meta->add_children_columns();
127
41.8k
    null_options.meta->set_column_id(id);
128
41.8k
    null_options.meta->set_unique_id(id);
129
41.8k
    null_options.meta->set_type(int(null_type));
130
41.8k
    null_options.meta->set_is_nullable(false);
131
41.8k
    null_options.meta->set_length(
132
41.8k
            cast_set<int32_t>(field_type_size(FieldType::OLAP_FIELD_TYPE_TINYINT)));
133
41.8k
    null_options.meta->set_encoding(DEFAULT_ENCODING);
134
41.8k
    null_options.meta->set_compression(opts.meta->compression());
135
136
41.8k
    null_options.need_zone_map = false;
137
41.8k
    null_options.need_bloom_filter = false;
138
41.8k
    null_options.encoding_preference = opts.encoding_preference;
139
140
41.8k
    auto null_column_ptr = std::make_shared<TabletColumn>(
141
41.8k
            FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, null_type, false,
142
41.8k
            null_options.meta->unique_id(), null_options.meta->length());
143
41.8k
    null_column_ptr->set_name("nullable");
144
41.8k
    null_column_ptr->set_index_length(-1); // no short key index
145
41.8k
    return new ScalarColumnWriter(null_options, std::move(null_column_ptr), file_writer);
146
76.9k
}
147
148
ColumnWriter::ColumnWriter(TabletColumnPtr column, bool is_nullable, ColumnMetaPB* meta)
149
1.17M
        : _column(std::move(column)), _is_nullable(is_nullable), _column_meta(meta) {
150
1.17M
    _data_type = DataTypeFactory::instance().create_data_type(*_column_meta);
151
1.17M
}
152
Status ColumnWriter::create_struct_writer(const ColumnWriterOptions& opts,
153
                                          const TabletColumn* column, io::FileWriter* file_writer,
154
2.35k
                                          std::unique_ptr<ColumnWriter>* writer) {
155
    // not support empty struct
156
2.35k
    DCHECK(column->get_subtype_count() >= 1);
157
2.35k
    std::vector<std::unique_ptr<ColumnWriter>> sub_column_writers;
158
2.35k
    sub_column_writers.reserve(column->get_subtype_count());
159
13.0k
    for (uint32_t i = 0; i < column->get_subtype_count(); i++) {
160
10.6k
        const TabletColumn& sub_column = column->get_sub_column(i);
161
10.6k
        RETURN_IF_ERROR(sub_column.check_valid());
162
163
        // create sub writer
164
10.6k
        ColumnWriterOptions column_options;
165
10.6k
        column_options.meta = opts.meta->mutable_children_columns(i);
166
10.6k
        column_options.need_zone_map = false;
167
10.6k
        column_options.need_bloom_filter = sub_column.is_bf_column();
168
10.6k
        column_options.encoding_preference = opts.encoding_preference;
169
10.6k
        std::unique_ptr<ColumnWriter> sub_column_writer;
170
10.6k
        RETURN_IF_ERROR(
171
10.6k
                ColumnWriter::create(column_options, &sub_column, file_writer, &sub_column_writer));
172
10.6k
        sub_column_writers.push_back(std::move(sub_column_writer));
173
10.6k
    }
174
175
2.35k
    ScalarColumnWriter* null_writer =
176
2.35k
            get_null_writer(opts, file_writer, column->get_subtype_count() + 1);
177
178
2.35k
    *writer = std::unique_ptr<ColumnWriter>(new StructColumnWriter(
179
2.35k
            opts, std::make_shared<TabletColumn>(*column), null_writer, sub_column_writers));
180
2.35k
    return Status::OK();
181
2.35k
}
182
183
Status ColumnWriter::create_array_writer(const ColumnWriterOptions& opts,
184
                                         const TabletColumn* column, io::FileWriter* file_writer,
185
46.5k
                                         std::unique_ptr<ColumnWriter>* writer) {
186
46.5k
    DCHECK(column->get_subtype_count() == 1);
187
46.5k
    const TabletColumn& item_column = column->get_sub_column(0);
188
46.5k
    RETURN_IF_ERROR(item_column.check_valid());
189
190
    // create item writer
191
46.5k
    ColumnWriterOptions item_options;
192
46.5k
    item_options.meta = opts.meta->mutable_children_columns(0);
193
46.5k
    item_options.need_zone_map = false;
194
46.5k
    item_options.need_bloom_filter = item_column.is_bf_column();
195
46.5k
    item_options.encoding_preference = opts.encoding_preference;
196
46.5k
    std::unique_ptr<ColumnWriter> item_writer;
197
46.5k
    RETURN_IF_ERROR(ColumnWriter::create(item_options, &item_column, file_writer, &item_writer));
198
199
    // create length writer
200
46.5k
    FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT;
201
202
46.5k
    ColumnWriterOptions length_options;
203
46.5k
    length_options.meta = opts.meta->add_children_columns();
204
46.5k
    length_options.meta->set_column_id(2);
205
46.5k
    length_options.meta->set_unique_id(2);
206
46.5k
    length_options.meta->set_type(int(length_type));
207
46.5k
    length_options.meta->set_is_nullable(false);
208
46.5k
    length_options.meta->set_length(
209
46.5k
            cast_set<int32_t>(field_type_size(FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT)));
210
46.5k
    length_options.meta->set_encoding(DEFAULT_ENCODING);
211
46.5k
    length_options.meta->set_compression(opts.meta->compression());
212
213
46.5k
    length_options.need_zone_map = false;
214
46.5k
    length_options.need_bloom_filter = false;
215
46.5k
    length_options.encoding_preference = opts.encoding_preference;
216
217
46.5k
    auto length_column_ptr = std::make_shared<TabletColumn>(
218
46.5k
            FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, length_type,
219
46.5k
            length_options.meta->is_nullable(), length_options.meta->unique_id(),
220
46.5k
            length_options.meta->length());
221
46.5k
    length_column_ptr->set_name("length");
222
46.5k
    length_column_ptr->set_index_length(-1); // no short key index
223
46.5k
    auto* length_writer =
224
46.5k
            new OffsetColumnWriter(length_options, std::move(length_column_ptr), file_writer);
225
226
46.5k
    ScalarColumnWriter* null_writer = get_null_writer(opts, file_writer, 3);
227
228
46.5k
    *writer = std::unique_ptr<ColumnWriter>(
229
46.5k
            new ArrayColumnWriter(opts, std::make_shared<TabletColumn>(*column), length_writer,
230
46.5k
                                  null_writer, std::move(item_writer)));
231
46.5k
    return Status::OK();
232
46.5k
}
233
234
Status ColumnWriter::create_map_writer(const ColumnWriterOptions& opts, const TabletColumn* column,
235
                                       io::FileWriter* file_writer,
236
28.1k
                                       std::unique_ptr<ColumnWriter>* writer) {
237
28.1k
    DCHECK(column->get_subtype_count() == 2);
238
28.1k
    if (column->get_subtype_count() < 2) {
239
0
        return Status::InternalError(
240
0
                "If you upgraded from version 1.2.*, please DROP the MAP columns and then "
241
0
                "ADD the MAP columns back.");
242
0
    }
243
    // create key & value writer
244
28.1k
    std::vector<std::unique_ptr<ColumnWriter>> inner_writer_list;
245
84.3k
    for (int i = 0; i < 2; ++i) {
246
56.2k
        const TabletColumn& item_column = column->get_sub_column(i);
247
56.2k
        RETURN_IF_ERROR(item_column.check_valid());
248
249
        // create item writer
250
56.2k
        ColumnWriterOptions item_options;
251
56.2k
        item_options.meta = opts.meta->mutable_children_columns(i);
252
56.2k
        item_options.need_zone_map = false;
253
56.2k
        item_options.need_bloom_filter = item_column.is_bf_column();
254
56.2k
        item_options.encoding_preference = opts.encoding_preference;
255
56.2k
        std::unique_ptr<ColumnWriter> item_writer;
256
56.2k
        RETURN_IF_ERROR(
257
56.2k
                ColumnWriter::create(item_options, &item_column, file_writer, &item_writer));
258
56.2k
        inner_writer_list.push_back(std::move(item_writer));
259
56.2k
    }
260
261
    // create offset writer
262
28.1k
    FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT;
263
264
    // Be Cautious: column unique id is used for column reader creation
265
28.1k
    ColumnWriterOptions length_options;
266
28.1k
    length_options.meta = opts.meta->add_children_columns();
267
28.1k
    length_options.meta->set_column_id(column->get_subtype_count() + 1);
268
28.1k
    length_options.meta->set_unique_id(column->get_subtype_count() + 1);
269
28.1k
    length_options.meta->set_type(int(length_type));
270
28.1k
    length_options.meta->set_is_nullable(false);
271
28.1k
    length_options.meta->set_length(
272
28.1k
            cast_set<int32_t>(field_type_size(FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT)));
273
28.1k
    length_options.meta->set_encoding(DEFAULT_ENCODING);
274
28.1k
    length_options.meta->set_compression(opts.meta->compression());
275
276
28.1k
    length_options.need_zone_map = false;
277
28.1k
    length_options.need_bloom_filter = false;
278
28.1k
    length_options.encoding_preference = opts.encoding_preference;
279
280
28.1k
    auto length_column_ptr = std::make_shared<TabletColumn>(
281
28.1k
            FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, length_type,
282
28.1k
            length_options.meta->is_nullable(), length_options.meta->unique_id(),
283
28.1k
            length_options.meta->length());
284
28.1k
    length_column_ptr->set_name("length");
285
28.1k
    length_column_ptr->set_index_length(-1); // no short key index
286
28.1k
    auto* length_writer =
287
28.1k
            new OffsetColumnWriter(length_options, std::move(length_column_ptr), file_writer);
288
289
28.1k
    ScalarColumnWriter* null_writer =
290
28.1k
            get_null_writer(opts, file_writer, column->get_subtype_count() + 2);
291
292
28.1k
    *writer = std::unique_ptr<ColumnWriter>(
293
28.1k
            new MapColumnWriter(opts, std::make_shared<TabletColumn>(*column), null_writer,
294
28.1k
                                length_writer, inner_writer_list));
295
296
28.1k
    return Status::OK();
297
28.1k
}
298
299
Status ColumnWriter::create_agg_state_writer(const ColumnWriterOptions& opts,
300
                                             const TabletColumn* column,
301
                                             io::FileWriter* file_writer,
302
1.27k
                                             std::unique_ptr<ColumnWriter>* writer) {
303
1.27k
    auto data_type = DataTypeFactory::instance().create_data_type(*column);
304
1.27k
    const auto* agg_state_type = assert_cast<const DataTypeAggState*>(data_type.get());
305
1.27k
    auto type = agg_state_type->get_serialized_type()->get_primitive_type();
306
1.27k
    if (type == PrimitiveType::TYPE_STRING || type == PrimitiveType::INVALID_TYPE ||
307
1.27k
        type == PrimitiveType::TYPE_FIXED_LENGTH_OBJECT || type == PrimitiveType::TYPE_BITMAP) {
308
1.18k
        *writer = std::unique_ptr<ColumnWriter>(
309
1.18k
                new ScalarColumnWriter(opts, std::make_shared<TabletColumn>(*column), file_writer));
310
1.18k
    } else if (type == PrimitiveType::TYPE_ARRAY) {
311
22
        RETURN_IF_ERROR(create_array_writer(opts, column, file_writer, writer));
312
64
    } else if (type == PrimitiveType::TYPE_MAP) {
313
53
        RETURN_IF_ERROR(create_map_writer(opts, column, file_writer, writer));
314
53
    } else {
315
11
        throw Exception(ErrorCode::INTERNAL_ERROR,
316
11
                        "OLAP_FIELD_TYPE_AGG_STATE meet unsupported type: {}",
317
11
                        agg_state_type->get_name());
318
11
    }
319
1.26k
    return Status::OK();
320
1.27k
}
321
322
Status ColumnWriter::create_variant_writer(const ColumnWriterOptions& opts,
323
                                           const TabletColumn* column, io::FileWriter* file_writer,
324
6.62k
                                           std::unique_ptr<ColumnWriter>* writer) {
325
    // Variant extracted columns have two kinds of physical writers:
326
    // - Doc-value snapshot column (`...__DORIS_VARIANT_DOC_VALUE__...`): use `VariantDocCompactWriter`
327
    //   to store the doc snapshot in a compact binary form.
328
    // - Regular extracted subcolumns: use `VariantSubcolumnWriter`.
329
    // The root VARIANT column itself uses `VariantColumnWriter`.
330
6.62k
    if (column->is_extracted_column()) {
331
675
        if (column->name().find(DOC_VALUE_COLUMN_PATH) != std::string::npos) {
332
665
            *writer = std::make_unique<VariantDocCompactWriter>(
333
665
                    opts, std::make_shared<TabletColumn>(*column));
334
665
            return Status::OK();
335
665
        }
336
10
        VLOG_DEBUG << "gen subwriter for " << column->path_info_ptr()->get_path();
337
10
        *writer = std::make_unique<VariantSubcolumnWriter>(opts,
338
10
                                                           std::make_shared<TabletColumn>(*column));
339
10
        return Status::OK();
340
675
    }
341
5.94k
    *writer = std::make_unique<VariantColumnWriter>(opts, std::make_shared<TabletColumn>(*column));
342
5.94k
    return Status::OK();
343
6.62k
}
344
345
//Todo(Amory): here should according nullable and offset and need sub to simply this function
346
Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* column,
347
1.03M
                            io::FileWriter* file_writer, std::unique_ptr<ColumnWriter>* writer) {
348
1.03M
    auto column_ptr = std::make_shared<TabletColumn>(*column);
349
1.03M
    if (is_scalar_type(column->type())) {
350
971k
        *writer = std::unique_ptr<ColumnWriter>(
351
971k
                new ScalarColumnWriter(opts, std::move(column_ptr), file_writer));
352
971k
        return Status::OK();
353
971k
    } else {
354
67.9k
        switch (column->type()) {
355
1.27k
        case FieldType::OLAP_FIELD_TYPE_AGG_STATE: {
356
1.27k
            RETURN_IF_ERROR(create_agg_state_writer(opts, column, file_writer, writer));
357
1.27k
            return Status::OK();
358
1.27k
        }
359
2.35k
        case FieldType::OLAP_FIELD_TYPE_STRUCT: {
360
2.35k
            RETURN_IF_ERROR(create_struct_writer(opts, column, file_writer, writer));
361
2.35k
            return Status::OK();
362
2.35k
        }
363
46.5k
        case FieldType::OLAP_FIELD_TYPE_ARRAY: {
364
46.5k
            RETURN_IF_ERROR(create_array_writer(opts, column, file_writer, writer));
365
46.5k
            return Status::OK();
366
46.5k
        }
367
11.3k
        case FieldType::OLAP_FIELD_TYPE_MAP: {
368
11.3k
            RETURN_IF_ERROR(create_map_writer(opts, column, file_writer, writer));
369
11.3k
            return Status::OK();
370
11.3k
        }
371
6.62k
        case FieldType::OLAP_FIELD_TYPE_VARIANT: {
372
            // Process columns with sparse column
373
6.62k
            RETURN_IF_ERROR(create_variant_writer(opts, column, file_writer, writer));
374
6.62k
            return Status::OK();
375
6.62k
        }
376
0
        default:
377
0
            return Status::NotSupported("unsupported type for ColumnWriter: {}",
378
0
                                        std::to_string(int(column_ptr->type())));
379
67.9k
        }
380
67.9k
    }
381
1.03M
}
382
383
Status ColumnWriter::append_nullable(const uint8_t* is_null_bits, const void* data,
384
651k
                                     size_t num_rows) {
385
651k
    const auto* ptr = (const uint8_t*)data;
386
651k
    BitmapIterator null_iter(is_null_bits, num_rows);
387
651k
    bool is_null = false;
388
651k
    size_t this_run = 0;
389
1.30M
    while ((this_run = null_iter.Next(&is_null)) > 0) {
390
651k
        if (is_null) {
391
0
            RETURN_IF_ERROR(append_nulls(this_run));
392
651k
        } else {
393
651k
            RETURN_IF_ERROR(append_data(&ptr, this_run));
394
651k
        }
395
651k
    }
396
651k
    return Status::OK();
397
651k
}
398
399
Status ColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
400
0
                                     size_t num_rows) {
401
    // Fast path: use SIMD to detect all-NULL or all-non-NULL columns
402
0
    if (config::enable_rle_batch_put_optimization) {
403
0
        size_t non_null_count =
404
0
                simd::count_zero_num(reinterpret_cast<const int8_t*>(null_map), num_rows);
405
406
0
        if (non_null_count == 0) {
407
            // All NULL: skip run-length iteration, directly append all nulls
408
0
            RETURN_IF_ERROR(append_nulls(num_rows));
409
0
            *ptr += cell_size() * num_rows;
410
0
            return Status::OK();
411
0
        }
412
413
0
        if (non_null_count == num_rows) {
414
            // All non-NULL: skip run-length iteration, directly append all data
415
0
            return append_data(ptr, num_rows);
416
0
        }
417
0
    }
418
419
    // Mixed case or sparse optimization disabled: use run-length processing
420
0
    size_t offset = 0;
421
0
    auto next_run_step = [&]() {
422
0
        size_t step = 1;
423
0
        for (auto i = offset + 1; i < num_rows; ++i) {
424
0
            if (null_map[offset] == null_map[i]) {
425
0
                step++;
426
0
            } else {
427
0
                break;
428
0
            }
429
0
        }
430
0
        return step;
431
0
    };
432
433
0
    do {
434
0
        auto step = next_run_step();
435
0
        if (null_map[offset]) {
436
0
            RETURN_IF_ERROR(append_nulls(step));
437
0
            *ptr += cell_size() * step;
438
0
        } else {
439
            // TODO:
440
            //  1. `*ptr += cell_size() * step;` should do in this function, not append_data;
441
            //  2. support array vectorized load and ptr offset add
442
0
            RETURN_IF_ERROR(append_data(ptr, step));
443
0
        }
444
0
        offset += step;
445
0
    } while (offset < num_rows);
446
447
0
    return Status::OK();
448
0
}
449
450
2.43M
Status ColumnWriter::append(const uint8_t* nullmap, const void* data, size_t num_rows) {
451
2.43M
    assert(data && num_rows > 0);
452
2.43M
    const auto* ptr = (const uint8_t*)data;
453
2.43M
    if (nullmap) {
454
2.03M
        return append_nullable(nullmap, &ptr, num_rows);
455
2.03M
    } else {
456
402k
        return append_data(&ptr, num_rows);
457
402k
    }
458
2.43M
}
459
460
///////////////////////////////////////////////////////////////////////////////////
461
462
ScalarColumnWriter::ScalarColumnWriter(const ColumnWriterOptions& opts, TabletColumnPtr column,
463
                                       io::FileWriter* file_writer)
464
1.09M
        : ColumnWriter(std::move(column), opts.meta->is_nullable(), opts.meta),
465
1.09M
          _opts(opts),
466
1.09M
          _file_writer(file_writer),
467
1.09M
          _data_size(0) {
468
    // these opts.meta fields should be set by client
469
1.09M
    DCHECK(opts.meta->has_column_id());
470
1.09M
    DCHECK(opts.meta->has_unique_id());
471
1.09M
    DCHECK(opts.meta->has_type());
472
1.09M
    DCHECK(opts.meta->has_length());
473
1.09M
    DCHECK(opts.meta->has_encoding());
474
1.09M
    DCHECK(opts.meta->has_compression());
475
1.09M
    DCHECK(opts.meta->has_is_nullable());
476
1.09M
    DCHECK(file_writer != nullptr);
477
1.09M
    _inverted_index_builders.resize(_opts.inverted_indexes.size());
478
1.09M
}
479
480
1.09M
ScalarColumnWriter::~ScalarColumnWriter() {
481
    // delete all pages
482
1.09M
    _pages.clear();
483
1.09M
}
484
485
1.09M
Status ScalarColumnWriter::init() {
486
1.09M
    RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec));
487
488
1.09M
    PageBuilder* page_builder = nullptr;
489
490
1.09M
    RETURN_IF_ERROR(EncodingInfo::get(get_column()->type(), _opts.meta->encoding(),
491
1.09M
                                      _opts.encoding_preference, &_encoding_info));
492
1.09M
    _opts.meta->set_encoding(_encoding_info->encoding());
493
    // create page builder
494
1.09M
    PageBuilderOptions opts;
495
1.09M
    opts.data_page_size = _opts.data_page_size;
496
1.09M
    opts.dict_page_size = _opts.dict_page_size;
497
1.09M
    opts.encoding_preference = _opts.encoding_preference;
498
1.09M
    RETURN_IF_ERROR(_encoding_info->create_page_builder(opts, &page_builder));
499
1.09M
    if (page_builder == nullptr) {
500
0
        return Status::NotSupported("Failed to create page builder for type {} and encoding {}",
501
0
                                    get_column()->type(), _opts.meta->encoding());
502
0
    }
503
    // should store more concrete encoding type instead of DEFAULT_ENCODING
504
    // because the default encoding of a data type can be changed in the future
505
1.09M
    DCHECK_NE(_opts.meta->encoding(), DEFAULT_ENCODING);
506
18.4E
    VLOG_DEBUG << fmt::format(
507
18.4E
            "[verbose] scalar column writer init, column_id={}, type={}, encoding={}, "
508
18.4E
            "is_nullable={}",
509
18.4E
            _opts.meta->column_id(), get_column()->type(),
510
18.4E
            EncodingTypePB_Name(_opts.meta->encoding()), _opts.meta->is_nullable());
511
1.09M
    _page_builder.reset(page_builder);
512
    // create ordinal builder
513
1.09M
    _ordinal_index_builder = std::make_unique<OrdinalIndexWriter>();
514
    // create null bitmap builder
515
1.09M
    if (is_nullable()) {
516
617k
        _null_bitmap_builder = std::make_unique<NullBitmapBuilder>();
517
617k
    }
518
1.09M
    if (_opts.need_zone_map) {
519
809k
        RETURN_IF_ERROR(
520
809k
                ZoneMapIndexWriter::create(_data_type, get_column(), _zone_map_index_builder));
521
809k
    }
522
523
1.09M
    if (_opts.need_inverted_index) {
524
37.5k
        do {
525
75.2k
            for (size_t i = 0; i < _opts.inverted_indexes.size(); i++) {
526
37.7k
                DBUG_EXECUTE_IF("column_writer.init", {
527
37.7k
                    class InvertedIndexColumnWriterEmpty final : public IndexColumnWriter {
528
37.7k
                    public:
529
37.7k
                        Status init() override { return Status::OK(); }
530
37.7k
                        Status add_values(const std::string name, const void* values,
531
37.7k
                                          size_t count) override {
532
37.7k
                            return Status::OK();
533
37.7k
                        }
534
37.7k
                        Status add_array_values(size_t field_size, const void* value_ptr,
535
37.7k
                                                const uint8_t* null_map, const uint8_t* offsets_ptr,
536
37.7k
                                                size_t count) override {
537
37.7k
                            return Status::OK();
538
37.7k
                        }
539
37.7k
                        Status add_nulls(uint32_t count) override { return Status::OK(); }
540
37.7k
                        Status add_array_nulls(const uint8_t* null_map, size_t num_rows) override {
541
37.7k
                            return Status::OK();
542
37.7k
                        }
543
37.7k
                        Status finish() override { return Status::OK(); }
544
37.7k
                        int64_t size() const override { return 0; }
545
37.7k
                        void close_on_error() override {}
546
37.7k
                    };
547
548
37.7k
                    _inverted_index_builders[i] =
549
37.7k
                            std::make_unique<InvertedIndexColumnWriterEmpty>();
550
551
37.7k
                    break;
552
37.7k
                });
553
554
37.7k
                RETURN_IF_ERROR(IndexColumnWriter::create(
555
37.7k
                        get_column(), &_inverted_index_builders[i], _opts.index_file_writer,
556
37.7k
                        _opts.inverted_indexes[i]));
557
37.7k
            }
558
37.5k
        } while (false);
559
37.5k
    }
560
1.09M
    if (_opts.need_bloom_filter) {
561
8.28k
        if (_opts.is_ngram_bf_index) {
562
3.40k
            RETURN_IF_ERROR(NGramBloomFilterIndexWriterImpl::create(
563
3.40k
                    BloomFilterOptions(), get_column()->type(), _opts.gram_size, _opts.gram_bf_size,
564
3.40k
                    &_bloom_filter_index_builder));
565
4.88k
        } else {
566
4.88k
            RETURN_IF_ERROR(BloomFilterIndexWriter::create(_opts.bf_options, get_column()->type(),
567
4.88k
                                                           &_bloom_filter_index_builder));
568
4.88k
        }
569
8.28k
    }
570
1.09M
    return Status::OK();
571
1.09M
}
572
573
2.63M
Status ScalarColumnWriter::append_nulls(size_t num_rows) {
574
2.63M
    _null_bitmap_builder->add_run(true, num_rows);
575
2.63M
    _next_rowid += num_rows;
576
2.63M
    if (_opts.need_zone_map) {
577
2.56M
        _zone_map_index_builder->add_nulls(cast_set<uint32_t>(num_rows));
578
2.56M
    }
579
2.63M
    if (_opts.need_inverted_index) {
580
854k
        for (const auto& builder : _inverted_index_builders) {
581
854k
            RETURN_IF_ERROR(builder->add_nulls(cast_set<uint32_t>(num_rows)));
582
854k
        }
583
854k
    }
584
2.63M
    if (_opts.need_bloom_filter) {
585
575
        _bloom_filter_index_builder->add_nulls(cast_set<uint32_t>(num_rows));
586
575
    }
587
2.63M
    return Status::OK();
588
2.63M
}
589
590
// append data to page builder. this function will make sure that
591
// num_rows must be written before return. And ptr will be modified
592
// to next data should be written
593
3.96M
Status ScalarColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
594
3.96M
    size_t remaining = num_rows;
595
8.00M
    while (remaining > 0) {
596
4.04M
        size_t num_written = remaining;
597
4.04M
        RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written));
598
599
4.04M
        remaining -= num_written;
600
601
4.04M
        if (_page_builder->is_page_full()) {
602
74.0k
            RETURN_IF_ERROR(finish_current_page());
603
74.0k
        }
604
4.04M
    }
605
3.96M
    return Status::OK();
606
3.96M
}
607
608
Status ScalarColumnWriter::_internal_append_data_in_current_page(const uint8_t* data,
609
4.11M
                                                                 size_t* num_written) {
610
4.11M
    RETURN_IF_ERROR(_page_builder->add(data, num_written));
611
4.11M
    if (_opts.need_zone_map) {
612
3.80M
        _zone_map_index_builder->add_values(data, *num_written);
613
3.80M
    }
614
4.11M
    if (_opts.need_inverted_index) {
615
884k
        for (const auto& builder : _inverted_index_builders) {
616
884k
            RETURN_IF_ERROR(builder->add_values(get_column()->name(), data, *num_written));
617
884k
        }
618
884k
    }
619
4.11M
    if (_opts.need_bloom_filter) {
620
8.58k
        RETURN_IF_ERROR(_bloom_filter_index_builder->add_values(data, *num_written));
621
8.58k
    }
622
623
4.11M
    _next_rowid += *num_written;
624
625
    // we must write null bits after write data, because we don't
626
    // know how many rows can be written into current page
627
4.11M
    if (is_nullable()) {
628
3.57M
        _null_bitmap_builder->add_run(false, *num_written);
629
3.57M
    }
630
4.11M
    return Status::OK();
631
4.11M
}
632
633
4.11M
Status ScalarColumnWriter::append_data_in_current_page(const uint8_t** data, size_t* num_written) {
634
4.11M
    RETURN_IF_ERROR(append_data_in_current_page(*data, num_written));
635
4.11M
    *data += cell_size() * (*num_written);
636
4.11M
    return Status::OK();
637
4.11M
}
638
639
Status ScalarColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
640
1.98M
                                           size_t num_rows) {
641
    // When optimization is disabled, use base class implementation
642
1.98M
    if (!config::enable_rle_batch_put_optimization) {
643
0
        return ColumnWriter::append_nullable(null_map, ptr, num_rows);
644
0
    }
645
646
1.98M
    if (UNLIKELY(num_rows == 0)) {
647
0
        return Status::OK();
648
0
    }
649
650
    // Build run-length encoded null runs using memchr for fast boundary detection
651
1.98M
    _null_run_buffer.clear();
652
1.98M
    if (_null_run_buffer.capacity() < num_rows) {
653
628k
        _null_run_buffer.reserve(std::min(num_rows, size_t(256)));
654
628k
    }
655
656
1.98M
    size_t non_null_count = 0;
657
1.98M
    size_t offset = 0;
658
5.98M
    while (offset < num_rows) {
659
3.99M
        bool is_null = null_map[offset] != 0;
660
3.99M
        size_t remaining = num_rows - offset;
661
3.99M
        const uint8_t* run_end =
662
3.99M
                static_cast<const uint8_t*>(memchr(null_map + offset, is_null ? 0 : 1, remaining));
663
3.99M
        size_t run_length = run_end != nullptr ? (run_end - (null_map + offset)) : remaining;
664
3.99M
        _null_run_buffer.push_back(NullRun {is_null, static_cast<uint32_t>(run_length)});
665
3.99M
        if (!is_null) {
666
2.91M
            non_null_count += run_length;
667
2.91M
        }
668
3.99M
        offset += run_length;
669
3.99M
    }
670
671
    // Pre-allocate buffer based on estimated size
672
1.99M
    if (_null_bitmap_builder != nullptr) {
673
1.99M
        size_t current_rows = _next_rowid - _first_rowid;
674
1.99M
        size_t expected_rows = current_rows + num_rows;
675
1.99M
        size_t est_non_null = non_null_count;
676
1.99M
        if (num_rows > 0 && expected_rows > num_rows) {
677
1.49M
            est_non_null = (non_null_count * expected_rows) / num_rows;
678
1.49M
        }
679
1.99M
        _null_bitmap_builder->reserve_for_write(expected_rows, est_non_null);
680
1.99M
    }
681
682
1.98M
    if (non_null_count == 0) {
683
        // All NULL: skip data writing, only update null bitmap and indexes
684
53.2k
        RETURN_IF_ERROR(append_nulls(num_rows));
685
53.2k
        *ptr += cell_size() * num_rows;
686
53.2k
        return Status::OK();
687
53.2k
    }
688
689
1.93M
    if (non_null_count == num_rows) {
690
        // All non-NULL: use normal append_data which handles both data and null bitmap
691
1.88M
        return append_data(ptr, num_rows);
692
1.88M
    }
693
694
    // Process by runs
695
2.03M
    for (const auto& run : _null_run_buffer) {
696
2.03M
        size_t run_length = run.len;
697
2.03M
        if (run.is_null) {
698
1.02M
            RETURN_IF_ERROR(append_nulls(run_length));
699
1.02M
            *ptr += cell_size() * run_length;
700
1.02M
        } else {
701
            // TODO:
702
            //  1. `*ptr += cell_size() * step;` should do in this function, not append_data;
703
            //  2. support array vectorized load and ptr offset add
704
1.01M
            RETURN_IF_ERROR(append_data(ptr, run_length));
705
1.01M
        }
706
2.03M
    }
707
708
48.3k
    return Status::OK();
709
48.3k
}
710
711
83.3k
uint64_t ScalarColumnWriter::estimate_buffer_size() {
712
83.3k
    uint64_t size = _data_size;
713
83.3k
    size += _page_builder->size();
714
83.3k
    if (is_nullable()) {
715
49.8k
        size += _null_bitmap_builder->size();
716
49.8k
    }
717
83.3k
    size += _ordinal_index_builder->size();
718
83.3k
    if (_opts.need_zone_map) {
719
67.9k
        size += _zone_map_index_builder->size();
720
67.9k
    }
721
83.3k
    if (_opts.need_bloom_filter) {
722
267
        size += _bloom_filter_index_builder->size();
723
267
    }
724
83.3k
    return size;
725
83.3k
}
726
727
1.09M
Status ScalarColumnWriter::finish() {
728
1.09M
    RETURN_IF_ERROR(finish_current_page());
729
1.09M
    _opts.meta->set_num_rows(_next_rowid);
730
1.09M
    return Status::OK();
731
1.09M
}
732
733
1.09M
Status ScalarColumnWriter::write_data() {
734
1.09M
    auto offset = _file_writer->bytes_appended();
735
1.47M
    auto collect_uncompressed_bytes = [](const PageFooterPB& footer) {
736
1.47M
        return footer.uncompressed_size() + footer.ByteSizeLong() +
737
1.47M
               sizeof(uint32_t) /* footer size */ + sizeof(uint32_t) /* checksum */;
738
1.47M
    };
739
1.12M
    for (auto& page : _pages) {
740
1.12M
        _total_uncompressed_data_pages_size += collect_uncompressed_bytes(page->footer);
741
1.12M
        RETURN_IF_ERROR(_write_data_page(page.get()));
742
1.12M
    }
743
1.09M
    _pages.clear();
744
    // write column dict
745
1.09M
    if (_encoding_info->encoding() == DICT_ENCODING) {
746
352k
        OwnedSlice dict_body;
747
352k
        RETURN_IF_ERROR(_page_builder->get_dictionary_page(&dict_body));
748
352k
        EncodingTypePB dict_word_page_encoding;
749
352k
        RETURN_IF_ERROR(_page_builder->get_dictionary_page_encoding(&dict_word_page_encoding));
750
751
352k
        PageFooterPB footer;
752
352k
        footer.set_type(DICTIONARY_PAGE);
753
352k
        footer.set_uncompressed_size(cast_set<uint32_t>(dict_body.slice().get_size()));
754
352k
        footer.mutable_dict_page_footer()->set_encoding(dict_word_page_encoding);
755
352k
        _total_uncompressed_data_pages_size += collect_uncompressed_bytes(footer);
756
757
352k
        PagePointer dict_pp;
758
352k
        RETURN_IF_ERROR(PageIO::compress_and_write_page(
759
352k
                _compress_codec, _opts.compression_min_space_saving, _file_writer,
760
352k
                {dict_body.slice()}, footer, &dict_pp));
761
352k
        dict_pp.to_proto(_opts.meta->mutable_dict_page());
762
352k
    }
763
1.09M
    _total_compressed_data_pages_size += _file_writer->bytes_appended() - offset;
764
1.09M
    _page_builder.reset();
765
1.09M
    return Status::OK();
766
1.09M
}
767
768
1.04M
Status ScalarColumnWriter::write_ordinal_index() {
769
1.04M
    return _ordinal_index_builder->finish(_file_writer, _opts.meta->add_indexes());
770
1.04M
}
771
772
862k
Status ScalarColumnWriter::write_zone_map() {
773
862k
    if (_opts.need_zone_map) {
774
809k
        return _zone_map_index_builder->finish(_file_writer, _opts.meta->add_indexes());
775
809k
    }
776
53.0k
    return Status::OK();
777
862k
}
778
779
715k
Status ScalarColumnWriter::write_inverted_index() {
780
715k
    if (_opts.need_inverted_index) {
781
37.7k
        for (const auto& builder : _inverted_index_builders) {
782
37.7k
            RETURN_IF_ERROR(builder->finish());
783
37.7k
        }
784
37.5k
    }
785
715k
    return Status::OK();
786
715k
}
787
788
712k
Status ScalarColumnWriter::write_bloom_filter_index() {
789
712k
    if (_opts.need_bloom_filter) {
790
8.28k
        return _bloom_filter_index_builder->finish(_file_writer, _opts.meta->add_indexes());
791
8.28k
    }
792
704k
    return Status::OK();
793
712k
}
794
795
// write a data page into file and update ordinal index
796
1.12M
Status ScalarColumnWriter::_write_data_page(Page* page) {
797
1.12M
    PagePointer pp;
798
1.12M
    std::vector<Slice> compressed_body;
799
2.15M
    for (auto& data : page->data) {
800
2.15M
        compressed_body.push_back(data.slice());
801
2.15M
    }
802
1.12M
    RETURN_IF_ERROR(PageIO::write_page(_file_writer, compressed_body, page->footer, &pp));
803
1.12M
    _ordinal_index_builder->append_entry(page->footer.data_page_footer().first_ordinal(), pp);
804
1.12M
    return Status::OK();
805
1.12M
}
806
807
1.17M
Status ScalarColumnWriter::finish_current_page() {
808
1.17M
    if (_next_rowid == _first_rowid) {
809
47.0k
        return Status::OK();
810
47.0k
    }
811
1.12M
    if (_opts.need_zone_map) {
812
        // If the number of rows in the current page is less than the threshold,
813
        // we will invalidate zone map index for this page by set pass_all to true.
814
874k
        if (_next_rowid - _first_rowid < config::zone_map_row_num_threshold) {
815
590k
            _zone_map_index_builder->invalid_page_zone_map();
816
590k
        }
817
874k
        RETURN_IF_ERROR(_zone_map_index_builder->flush());
818
874k
    }
819
820
1.12M
    if (_opts.need_bloom_filter) {
821
8.53k
        RETURN_IF_ERROR(_bloom_filter_index_builder->flush());
822
8.53k
    }
823
824
1.12M
    _raw_data_bytes += _page_builder->get_raw_data_size();
825
826
    // build data page body : encoded values + [nullmap]
827
1.12M
    std::vector<Slice> body;
828
1.12M
    OwnedSlice encoded_values;
829
1.12M
    RETURN_IF_ERROR(_page_builder->finish(&encoded_values));
830
1.12M
    RETURN_IF_ERROR(_page_builder->reset());
831
1.12M
    body.push_back(encoded_values.slice());
832
833
1.12M
    OwnedSlice nullmap;
834
1.12M
    if (_null_bitmap_builder != nullptr) {
835
616k
        if (is_nullable() && _null_bitmap_builder->has_null()) {
836
192k
            RETURN_IF_ERROR(_null_bitmap_builder->finish(&nullmap));
837
192k
            body.push_back(nullmap.slice());
838
192k
        }
839
616k
        _null_bitmap_builder->reset();
840
616k
    }
841
842
    // prepare data page footer
843
1.12M
    std::unique_ptr<Page> page(new Page());
844
1.12M
    page->footer.set_type(DATA_PAGE);
845
1.12M
    page->footer.set_uncompressed_size(cast_set<uint32_t>(Slice::compute_total_size(body)));
846
1.12M
    auto* data_page_footer = page->footer.mutable_data_page_footer();
847
1.12M
    data_page_footer->set_first_ordinal(_first_rowid);
848
1.12M
    data_page_footer->set_num_values(_next_rowid - _first_rowid);
849
1.12M
    data_page_footer->set_nullmap_size(cast_set<uint32_t>(nullmap.slice().size));
850
1.12M
    if (_new_page_callback != nullptr) {
851
74.6k
        _new_page_callback->put_extra_info_in_page(data_page_footer);
852
74.6k
    }
853
    // trying to compress page body
854
1.12M
    OwnedSlice compressed_body;
855
1.12M
    RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving,
856
1.12M
                                               body, &compressed_body));
857
1.12M
    if (compressed_body.slice().empty()) {
858
        // page body is uncompressed
859
1.03M
        page->data.emplace_back(std::move(encoded_values));
860
1.03M
        page->data.emplace_back(std::move(nullmap));
861
1.03M
    } else {
862
        // page body is compressed
863
93.6k
        page->data.emplace_back(std::move(compressed_body));
864
93.6k
    }
865
866
1.12M
    _push_back_page(std::move(page));
867
1.12M
    _first_rowid = _next_rowid;
868
1.12M
    return Status::OK();
869
1.12M
}
870
871
////////////////////////////////////////////////////////////////////////////////
872
873
////////////////////////////////////////////////////////////////////////////////
874
// offset column writer
875
////////////////////////////////////////////////////////////////////////////////
876
877
OffsetColumnWriter::OffsetColumnWriter(const ColumnWriterOptions& opts, TabletColumnPtr column,
878
                                       io::FileWriter* file_writer)
879
74.6k
        : ScalarColumnWriter(opts, std::move(column), file_writer) {
880
    // now we only explain data in offset column as uint64
881
74.6k
    DCHECK(get_column()->type() == FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT);
882
74.6k
}
883
884
74.6k
OffsetColumnWriter::~OffsetColumnWriter() = default;
885
886
74.5k
Status OffsetColumnWriter::init() {
887
74.5k
    RETURN_IF_ERROR(ScalarColumnWriter::init());
888
74.5k
    register_flush_page_callback(this);
889
74.5k
    _next_offset = 0;
890
74.5k
    return Status::OK();
891
74.5k
}
892
893
74.5k
Status OffsetColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
894
74.5k
    size_t remaining = num_rows;
895
149k
    while (remaining > 0) {
896
75.2k
        size_t num_written = remaining;
897
75.2k
        RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written));
898
        // Callers provide one extra tail offset after the written rows so the page footer can
899
        // store the next array item ordinal for the current page.
900
75.2k
        _next_offset = *(const uint64_t*)(*ptr);
901
75.2k
        remaining -= num_written;
902
903
75.2k
        if (_page_builder->is_page_full()) {
904
            // get next data for next array_item_rowid
905
758
            RETURN_IF_ERROR(finish_current_page());
906
758
        }
907
75.2k
    }
908
74.5k
    return Status::OK();
909
74.5k
}
910
911
74.6k
void OffsetColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
912
74.6k
    footer->set_next_array_item_ordinal(_next_offset);
913
74.6k
}
914
915
StructColumnWriter::StructColumnWriter(
916
        const ColumnWriterOptions& opts, TabletColumnPtr column, ScalarColumnWriter* null_writer,
917
        std::vector<std::unique_ptr<ColumnWriter>>& sub_column_writers)
918
2.35k
        : ColumnWriter(std::move(column), opts.meta->is_nullable(), opts.meta), _opts(opts) {
919
10.6k
    for (auto& sub_column_writer : sub_column_writers) {
920
10.6k
        _sub_column_writers.push_back(std::move(sub_column_writer));
921
10.6k
    }
922
2.35k
    _num_sub_column_writers = _sub_column_writers.size();
923
2.35k
    DCHECK(_num_sub_column_writers >= 1);
924
2.35k
    if (is_nullable()) {
925
2.08k
        _null_writer.reset(null_writer);
926
2.08k
    }
927
2.35k
}
928
929
2.35k
Status StructColumnWriter::init() {
930
10.6k
    for (auto& column_writer : _sub_column_writers) {
931
10.6k
        RETURN_IF_ERROR(column_writer->init());
932
10.6k
    }
933
2.35k
    if (is_nullable()) {
934
2.08k
        RETURN_IF_ERROR(_null_writer->init());
935
2.08k
    }
936
2.35k
    return Status::OK();
937
2.35k
}
938
939
1.73k
Status StructColumnWriter::write_inverted_index() {
940
1.73k
    if (_opts.need_inverted_index) {
941
0
        for (auto& column_writer : _sub_column_writers) {
942
0
            RETURN_IF_ERROR(column_writer->write_inverted_index());
943
0
        }
944
0
    }
945
1.73k
    return Status::OK();
946
1.73k
}
947
948
Status StructColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
949
2.04k
                                           size_t num_rows) {
950
2.04k
    RETURN_IF_ERROR(append_data(ptr, num_rows));
951
2.04k
    RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
952
2.04k
    return Status::OK();
953
2.04k
}
954
955
2.31k
Status StructColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
956
2.31k
    const auto* results = reinterpret_cast<const uint64_t*>(*ptr);
957
12.6k
    for (size_t i = 0; i < _num_sub_column_writers; ++i) {
958
10.3k
        auto nullmap = *(results + _num_sub_column_writers + i);
959
10.3k
        auto data = *(results + i);
960
10.3k
        RETURN_IF_ERROR(_sub_column_writers[i]->append(reinterpret_cast<const uint8_t*>(nullmap),
961
10.3k
                                                       reinterpret_cast<const void*>(data),
962
10.3k
                                                       num_rows));
963
10.3k
    }
964
2.31k
    return Status::OK();
965
2.31k
}
966
967
186
uint64_t StructColumnWriter::estimate_buffer_size() {
968
186
    uint64_t size = 0;
969
780
    for (auto& column_writer : _sub_column_writers) {
970
780
        size += column_writer->estimate_buffer_size();
971
780
    }
972
186
    size += is_nullable() ? _null_writer->estimate_buffer_size() : 0;
973
186
    return size;
974
186
}
975
976
2.35k
Status StructColumnWriter::finish() {
977
10.6k
    for (auto& column_writer : _sub_column_writers) {
978
10.6k
        RETURN_IF_ERROR(column_writer->finish());
979
10.6k
    }
980
2.35k
    if (is_nullable()) {
981
2.08k
        RETURN_IF_ERROR(_null_writer->finish());
982
2.08k
    }
983
2.35k
    _opts.meta->set_num_rows(get_next_rowid());
984
2.35k
    return Status::OK();
985
2.35k
}
986
987
2.35k
Status StructColumnWriter::write_data() {
988
10.6k
    for (auto& column_writer : _sub_column_writers) {
989
10.6k
        RETURN_IF_ERROR(column_writer->write_data());
990
10.6k
    }
991
2.35k
    if (is_nullable()) {
992
2.08k
        RETURN_IF_ERROR(_null_writer->write_data());
993
2.08k
    }
994
2.35k
    return Status::OK();
995
2.35k
}
996
997
2.31k
Status StructColumnWriter::write_ordinal_index() {
998
10.3k
    for (auto& column_writer : _sub_column_writers) {
999
10.3k
        RETURN_IF_ERROR(column_writer->write_ordinal_index());
1000
10.3k
    }
1001
2.31k
    if (is_nullable()) {
1002
2.04k
        RETURN_IF_ERROR(_null_writer->write_ordinal_index());
1003
2.04k
    }
1004
2.31k
    return Status::OK();
1005
2.31k
}
1006
1007
0
Status StructColumnWriter::append_nulls(size_t num_rows) {
1008
0
    for (auto& column_writer : _sub_column_writers) {
1009
0
        RETURN_IF_ERROR(column_writer->append_nulls(num_rows));
1010
0
    }
1011
0
    if (is_nullable()) {
1012
0
        std::vector<UInt8> null_signs(num_rows, 1);
1013
0
        const uint8_t* null_sign_ptr = null_signs.data();
1014
0
        RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, num_rows));
1015
0
    }
1016
0
    return Status::OK();
1017
0
}
1018
1019
0
Status StructColumnWriter::finish_current_page() {
1020
0
    return Status::NotSupported("struct writer has no data, can not finish_current_page");
1021
0
}
1022
1023
ArrayColumnWriter::ArrayColumnWriter(const ColumnWriterOptions& opts, TabletColumnPtr column,
1024
                                     OffsetColumnWriter* offset_writer,
1025
                                     ScalarColumnWriter* null_writer,
1026
                                     std::unique_ptr<ColumnWriter> item_writer)
1027
46.5k
        : ColumnWriter(std::move(column), opts.meta->is_nullable(), opts.meta),
1028
46.5k
          _item_writer(std::move(item_writer)),
1029
46.5k
          _opts(opts) {
1030
46.5k
    _offset_writer.reset(offset_writer);
1031
46.5k
    if (is_nullable()) {
1032
30.7k
        _null_writer.reset(null_writer);
1033
30.7k
    }
1034
46.5k
}
1035
1036
46.5k
Status ArrayColumnWriter::init() {
1037
46.5k
    RETURN_IF_ERROR(_offset_writer->init());
1038
46.5k
    if (is_nullable()) {
1039
30.7k
        RETURN_IF_ERROR(_null_writer->init());
1040
30.7k
    }
1041
46.5k
    RETURN_IF_ERROR(_item_writer->init());
1042
46.5k
    if (_opts.need_inverted_index) {
1043
1.79k
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1044
1.79k
        if (writer != nullptr) {
1045
1.79k
            RETURN_IF_ERROR(IndexColumnWriter::create(get_column(), &_inverted_index_writer,
1046
1.79k
                                                      _opts.index_file_writer,
1047
1.79k
                                                      _opts.inverted_indexes[0]));
1048
1.79k
        }
1049
1.79k
    }
1050
46.5k
    if (_opts.need_ann_index) {
1051
64
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1052
64
        if (writer != nullptr) {
1053
64
            _ann_index_writer = std::make_unique<AnnIndexColumnWriter>(_opts.index_file_writer,
1054
64
                                                                       _opts.ann_index);
1055
64
            RETURN_IF_ERROR(_ann_index_writer->init());
1056
64
        }
1057
64
    }
1058
46.5k
    return Status::OK();
1059
46.5k
}
1060
1061
41.2k
Status ArrayColumnWriter::write_inverted_index() {
1062
41.2k
    if (_opts.need_inverted_index) {
1063
1.79k
        return _inverted_index_writer->finish();
1064
1.79k
    }
1065
39.4k
    return Status::OK();
1066
41.2k
}
1067
1068
41.2k
Status ArrayColumnWriter::write_ann_index() {
1069
41.2k
    if (_opts.need_ann_index) {
1070
61
        return _ann_index_writer->finish();
1071
61
    }
1072
41.1k
    return Status::OK();
1073
41.2k
}
1074
1075
// batch append data for array
1076
46.3k
Status ArrayColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
1077
    // data_ptr contains
1078
    // [size, offset_ptr, item_data_ptr, item_nullmap_ptr]
1079
46.3k
    auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
1080
    // total number length
1081
46.3k
    size_t element_cnt = size_t((unsigned long)(*data_ptr));
1082
46.3k
    auto offset_data = *(data_ptr + 1);
1083
46.3k
    const uint8_t* offsets_ptr = (const uint8_t*)offset_data;
1084
46.3k
    auto data = *(data_ptr + 2);
1085
46.3k
    auto nested_null_map = *(data_ptr + 3);
1086
46.3k
    if (element_cnt > 0) {
1087
29.9k
        RETURN_IF_ERROR(_item_writer->append(reinterpret_cast<const uint8_t*>(nested_null_map),
1088
29.9k
                                             reinterpret_cast<const void*>(data), element_cnt));
1089
29.9k
    }
1090
46.3k
    if (_opts.need_inverted_index) {
1091
1.88k
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1092
        // now only support nested type is scala
1093
1.88k
        if (writer != nullptr) {
1094
            //NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr
1095
1.88k
            RETURN_IF_ERROR(_inverted_index_writer->add_array_values(
1096
1.88k
                    field_type_size(_item_writer->get_column()->type()),
1097
1.88k
                    reinterpret_cast<const void*>(data),
1098
1.88k
                    reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
1099
1.88k
        }
1100
1.88k
    }
1101
1102
46.3k
    if (_opts.need_ann_index) {
1103
64
        auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
1104
        // now only support nested type is scala
1105
64
        if (writer != nullptr) {
1106
            //NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr
1107
64
            RETURN_IF_ERROR(_ann_index_writer->add_array_values(
1108
64
                    field_type_size(_item_writer->get_column()->type()),
1109
64
                    reinterpret_cast<const void*>(data),
1110
64
                    reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
1111
64
        } else {
1112
0
            return Status::NotSupported(
1113
0
                    "Ann index can only be build on array with scalar type. but got {} as "
1114
0
                    "nested",
1115
0
                    _item_writer->get_column()->type());
1116
0
        }
1117
64
    }
1118
1119
46.3k
    RETURN_IF_ERROR(_offset_writer->append_data(&offsets_ptr, num_rows));
1120
46.3k
    return Status::OK();
1121
46.3k
}
1122
1123
883
uint64_t ArrayColumnWriter::estimate_buffer_size() {
1124
883
    return _offset_writer->estimate_buffer_size() +
1125
883
           (is_nullable() ? _null_writer->estimate_buffer_size() : 0) +
1126
883
           _item_writer->estimate_buffer_size();
1127
883
}
1128
1129
Status ArrayColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
1130
30.4k
                                          size_t num_rows) {
1131
30.4k
    RETURN_IF_ERROR(append_data(ptr, num_rows));
1132
30.4k
    if (is_nullable()) {
1133
30.4k
        if (_opts.need_inverted_index) {
1134
1.48k
            RETURN_IF_ERROR(_inverted_index_writer->add_array_nulls(null_map, num_rows));
1135
1.48k
        }
1136
30.4k
        RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
1137
30.4k
    }
1138
30.4k
    return Status::OK();
1139
30.4k
}
1140
1141
46.6k
Status ArrayColumnWriter::finish() {
1142
46.6k
    RETURN_IF_ERROR(_offset_writer->finish());
1143
46.6k
    if (is_nullable()) {
1144
30.7k
        RETURN_IF_ERROR(_null_writer->finish());
1145
30.7k
    }
1146
46.6k
    RETURN_IF_ERROR(_item_writer->finish());
1147
46.6k
    _opts.meta->set_num_rows(get_next_rowid());
1148
46.6k
    return Status::OK();
1149
46.6k
}
1150
1151
46.5k
Status ArrayColumnWriter::write_data() {
1152
46.5k
    RETURN_IF_ERROR(_offset_writer->write_data());
1153
46.5k
    if (is_nullable()) {
1154
30.7k
        RETURN_IF_ERROR(_null_writer->write_data());
1155
30.7k
    }
1156
46.5k
    RETURN_IF_ERROR(_item_writer->write_data());
1157
46.5k
    return Status::OK();
1158
46.5k
}
1159
1160
46.0k
Status ArrayColumnWriter::write_ordinal_index() {
1161
46.0k
    RETURN_IF_ERROR(_offset_writer->write_ordinal_index());
1162
46.0k
    if (is_nullable()) {
1163
30.2k
        RETURN_IF_ERROR(_null_writer->write_ordinal_index());
1164
30.2k
    }
1165
46.0k
    if (!has_empty_items()) {
1166
29.7k
        RETURN_IF_ERROR(_item_writer->write_ordinal_index());
1167
29.7k
    }
1168
46.0k
    return Status::OK();
1169
46.0k
}
1170
1171
0
Status ArrayColumnWriter::append_nulls(size_t num_rows) {
1172
0
    const UInt64 offset = cast_set<UInt64>(_item_writer->get_next_rowid());
1173
0
    std::vector<UInt64> offsets_data(num_rows + 1, offset);
1174
0
    const uint8_t* offsets_ptr = reinterpret_cast<const uint8_t*>(offsets_data.data());
1175
0
    RETURN_IF_ERROR(_offset_writer->append_data(&offsets_ptr, num_rows));
1176
0
    return write_null_column(num_rows, true);
1177
0
}
1178
1179
0
Status ArrayColumnWriter::write_null_column(size_t num_rows, bool is_null) {
1180
0
    uint8_t null_sign = is_null ? 1 : 0;
1181
0
    while (is_nullable() && num_rows > 0) {
1182
        // TODO llj bulk write
1183
0
        const uint8_t* null_sign_ptr = &null_sign;
1184
0
        RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, 1));
1185
0
        --num_rows;
1186
0
    }
1187
0
    return Status::OK();
1188
0
}
1189
1190
0
Status ArrayColumnWriter::finish_current_page() {
1191
0
    return Status::NotSupported("array writer has no data, can not finish_current_page");
1192
0
}
1193
1194
/// ============================= MapColumnWriter =====================////
1195
MapColumnWriter::MapColumnWriter(const ColumnWriterOptions& opts, TabletColumnPtr column,
1196
                                 ScalarColumnWriter* null_writer, OffsetColumnWriter* offset_writer,
1197
                                 std::vector<std::unique_ptr<ColumnWriter>>& kv_writers)
1198
28.0k
        : ColumnWriter(std::move(column), opts.meta->is_nullable(), opts.meta), _opts(opts) {
1199
28.0k
    CHECK_EQ(kv_writers.size(), 2);
1200
28.0k
    _offsets_writer.reset(offset_writer);
1201
28.0k
    if (is_nullable()) {
1202
9.02k
        _null_writer.reset(null_writer);
1203
9.02k
    }
1204
56.1k
    for (auto& sub_writers : kv_writers) {
1205
56.1k
        _kv_writers.push_back(std::move(sub_writers));
1206
56.1k
    }
1207
28.0k
}
1208
1209
28.0k
Status MapColumnWriter::init() {
1210
28.0k
    RETURN_IF_ERROR(_offsets_writer->init());
1211
28.0k
    if (is_nullable()) {
1212
9.03k
        RETURN_IF_ERROR(_null_writer->init());
1213
9.03k
    }
1214
    // here register_flush_page_callback to call this.put_extra_info_in_page()
1215
    // when finish cur data page
1216
56.2k
    for (auto& sub_writer : _kv_writers) {
1217
56.2k
        RETURN_IF_ERROR(sub_writer->init());
1218
56.2k
    }
1219
28.0k
    return Status::OK();
1220
28.0k
}
1221
1222
1.31k
uint64_t MapColumnWriter::estimate_buffer_size() {
1223
1.31k
    size_t estimate = 0;
1224
2.62k
    for (auto& sub_writer : _kv_writers) {
1225
2.62k
        estimate += sub_writer->estimate_buffer_size();
1226
2.62k
    }
1227
1.31k
    estimate += _offsets_writer->estimate_buffer_size();
1228
1.31k
    if (is_nullable()) {
1229
1.29k
        estimate += _null_writer->estimate_buffer_size();
1230
1.29k
    }
1231
1.31k
    return estimate;
1232
1.31k
}
1233
1234
28.1k
Status MapColumnWriter::finish() {
1235
28.1k
    RETURN_IF_ERROR(_offsets_writer->finish());
1236
28.1k
    if (is_nullable()) {
1237
9.03k
        RETURN_IF_ERROR(_null_writer->finish());
1238
9.03k
    }
1239
56.2k
    for (auto& sub_writer : _kv_writers) {
1240
56.2k
        RETURN_IF_ERROR(sub_writer->finish());
1241
56.2k
    }
1242
28.1k
    _opts.meta->set_num_rows(get_next_rowid());
1243
28.1k
    return Status::OK();
1244
28.1k
}
1245
1246
Status MapColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
1247
9.05k
                                        size_t num_rows) {
1248
9.05k
    RETURN_IF_ERROR(append_data(ptr, num_rows));
1249
9.05k
    if (is_nullable()) {
1250
9.05k
        RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
1251
9.05k
    }
1252
9.05k
    return Status::OK();
1253
9.05k
}
1254
1255
// write key value data with offsets
1256
28.2k
Status MapColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
1257
    // data_ptr contains
1258
    // [size, offset_ptr, key_data_ptr, val_data_ptr, k_nullmap_ptr, v_nullmap_pr]
1259
    // which converted results from olap_map_convertor and later will use a structure to replace it
1260
28.2k
    auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
1261
    // total number length
1262
28.2k
    size_t element_cnt = size_t((unsigned long)(*data_ptr));
1263
28.2k
    auto offset_data = *(data_ptr + 1);
1264
28.2k
    const uint8_t* offsets_ptr = (const uint8_t*)offset_data;
1265
1266
28.2k
    if (element_cnt > 0) {
1267
42.2k
        for (size_t i = 0; i < 2; ++i) {
1268
28.1k
            auto data = *(data_ptr + 2 + i);
1269
28.1k
            auto nested_null_map = *(data_ptr + 2 + 2 + i);
1270
28.1k
            RETURN_IF_ERROR(
1271
28.1k
                    _kv_writers[i]->append(reinterpret_cast<const uint8_t*>(nested_null_map),
1272
28.1k
                                           reinterpret_cast<const void*>(data), element_cnt));
1273
28.1k
        }
1274
14.0k
    }
1275
    // make sure the order : offset writer flush next_array_item_ordinal after kv_writers append_data
1276
    // because we use _kv_writers[0]->get_next_rowid() to set next_array_item_ordinal in offset page footer
1277
28.2k
    RETURN_IF_ERROR(_offsets_writer->append_data(&offsets_ptr, num_rows));
1278
28.2k
    return Status::OK();
1279
28.2k
}
1280
1281
28.1k
Status MapColumnWriter::write_data() {
1282
28.1k
    RETURN_IF_ERROR(_offsets_writer->write_data());
1283
28.1k
    if (is_nullable()) {
1284
9.03k
        RETURN_IF_ERROR(_null_writer->write_data());
1285
9.03k
    }
1286
56.2k
    for (auto& sub_writer : _kv_writers) {
1287
56.2k
        RETURN_IF_ERROR(sub_writer->write_data());
1288
56.2k
    }
1289
28.1k
    return Status::OK();
1290
28.1k
}
1291
1292
27.8k
Status MapColumnWriter::write_ordinal_index() {
1293
27.8k
    RETURN_IF_ERROR(_offsets_writer->write_ordinal_index());
1294
27.8k
    if (is_nullable()) {
1295
8.77k
        RETURN_IF_ERROR(_null_writer->write_ordinal_index());
1296
8.77k
    }
1297
55.6k
    for (auto& sub_writer : _kv_writers) {
1298
55.6k
        if (sub_writer->get_next_rowid() != 0) {
1299
27.4k
            RETURN_IF_ERROR(sub_writer->write_ordinal_index());
1300
27.4k
        }
1301
55.6k
    }
1302
27.8k
    return Status::OK();
1303
27.8k
}
1304
1305
0
Status MapColumnWriter::append_nulls(size_t num_rows) {
1306
0
    for (auto& sub_writer : _kv_writers) {
1307
0
        RETURN_IF_ERROR(sub_writer->append_nulls(num_rows));
1308
0
    }
1309
0
    const UInt64 offset = cast_set<UInt64>(_kv_writers[0]->get_next_rowid());
1310
0
    std::vector<UInt64> offsets_data(num_rows + 1, offset);
1311
0
    const uint8_t* offsets_ptr = reinterpret_cast<const uint8_t*>(offsets_data.data());
1312
0
    RETURN_IF_ERROR(_offsets_writer->append_data(&offsets_ptr, num_rows));
1313
1314
0
    if (is_nullable()) {
1315
0
        std::vector<UInt8> null_signs(num_rows, 1);
1316
0
        const uint8_t* null_sign_ptr = null_signs.data();
1317
0
        RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, num_rows));
1318
0
    }
1319
0
    return Status::OK();
1320
0
}
1321
1322
0
Status MapColumnWriter::finish_current_page() {
1323
0
    return Status::NotSupported("map writer has no data, can not finish_current_page");
1324
0
}
1325
1326
10.9k
Status MapColumnWriter::write_inverted_index() {
1327
10.9k
    if (_opts.need_inverted_index) {
1328
0
        return _index_builder->finish();
1329
0
    }
1330
10.9k
    return Status::OK();
1331
10.9k
}
1332
1333
VariantColumnWriter::VariantColumnWriter(const ColumnWriterOptions& opts, TabletColumnPtr column)
1334
5.94k
        : ColumnWriter(std::move(column), opts.meta->is_nullable(), opts.meta) {
1335
5.94k
    _impl = std::make_unique<VariantColumnWriterImpl>(opts, get_column());
1336
5.94k
}
1337
1338
5.92k
Status VariantColumnWriter::init() {
1339
5.92k
    return _impl->init();
1340
5.92k
}
1341
1342
671
Status VariantColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
1343
671
    _next_rowid += num_rows;
1344
671
    return _impl->append_data(ptr, num_rows);
1345
671
}
1346
1347
866
uint64_t VariantColumnWriter::estimate_buffer_size() {
1348
866
    return _impl->estimate_buffer_size();
1349
866
}
1350
1351
5.96k
Status VariantColumnWriter::finish() {
1352
5.96k
    return _impl->finish();
1353
5.96k
}
1354
5.96k
Status VariantColumnWriter::write_data() {
1355
5.96k
    return _impl->write_data();
1356
5.96k
}
1357
5.96k
Status VariantColumnWriter::write_ordinal_index() {
1358
5.96k
    return _impl->write_ordinal_index();
1359
5.96k
}
1360
1361
5.95k
Status VariantColumnWriter::write_zone_map() {
1362
5.95k
    return _impl->write_zone_map();
1363
5.95k
}
1364
1365
5.94k
Status VariantColumnWriter::write_inverted_index() {
1366
5.94k
    return _impl->write_inverted_index();
1367
5.94k
}
1368
5.94k
Status VariantColumnWriter::write_bloom_filter_index() {
1369
5.94k
    return _impl->write_bloom_filter_index();
1370
5.94k
}
1371
1372
Status VariantColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
1373
5.53k
                                            size_t num_rows) {
1374
5.53k
    return _impl->append_nullable(null_map, ptr, num_rows);
1375
5.53k
}
1376
1377
} // namespace doris::segment_v2