be/src/storage/segment/column_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/segment/column_writer.h" |
19 | | |
20 | | #include <gen_cpp/segment_v2.pb.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <cstring> |
24 | | #include <filesystem> |
25 | | #include <memory> |
26 | | |
27 | | #include "common/config.h" |
28 | | #include "common/logging.h" |
29 | | #include "core/data_type/data_type_agg_state.h" |
30 | | #include "core/data_type/data_type_factory.hpp" |
31 | | #include "core/types.h" |
32 | | #include "io/fs/file_writer.h" |
33 | | #include "runtime/collection_value.h" |
34 | | #include "storage/field.h" |
35 | | #include "storage/index/bloom_filter/bloom_filter_index_writer.h" |
36 | | #include "storage/index/inverted/inverted_index_writer.h" |
37 | | #include "storage/index/ordinal_page_index.h" |
38 | | #include "storage/index/zone_map/zone_map_index.h" |
39 | | #include "storage/olap_common.h" |
40 | | #include "storage/segment/encoding_info.h" |
41 | | #include "storage/segment/options.h" |
42 | | #include "storage/segment/page_builder.h" |
43 | | #include "storage/segment/page_io.h" |
44 | | #include "storage/segment/page_pointer.h" |
45 | | #include "storage/segment/variant/variant_column_writer_impl.h" |
46 | | #include "storage/tablet/tablet_schema.h" |
47 | | #include "storage/types.h" |
48 | | #include "util/block_compression.h" |
49 | | #include "util/debug_points.h" |
50 | | #include "util/faststring.h" |
51 | | #include "util/rle_encoding.h" |
52 | | #include "util/simd/bits.h" |
53 | | |
54 | | namespace doris::segment_v2 { |
55 | | |
56 | | class NullBitmapBuilder { |
57 | | public: |
58 | 486k | NullBitmapBuilder() : _has_null(false), _bitmap_buf(512), _rle_encoder(&_bitmap_buf, 1) {} |
59 | | |
60 | | explicit NullBitmapBuilder(size_t reserve_bits) |
61 | | : _has_null(false), |
62 | | _bitmap_buf(BitmapSize(reserve_bits)), |
63 | 0 | _rle_encoder(&_bitmap_buf, 1) {} |
64 | | |
65 | 2.56M | void reserve_for_write(size_t num_rows, size_t non_null_count) { |
66 | 2.56M | if (num_rows == 0) { |
67 | 0 | return; |
68 | 0 | } |
69 | 2.56M | if (non_null_count == 0 || (non_null_count == num_rows && !_has_null)) { |
70 | 377k | if (_bitmap_buf.capacity() < kSmallReserveBytes) { |
71 | 0 | _bitmap_buf.reserve(kSmallReserveBytes); |
72 | 0 | } |
73 | 377k | return; |
74 | 377k | } |
75 | 2.18M | size_t raw_bytes = BitmapSize(num_rows); |
76 | 2.18M | size_t run_est = std::min(num_rows, non_null_count * 2 + 1); |
77 | 2.18M | size_t run_bytes_est = run_est * kBytesPerRun + kReserveSlackBytes; |
78 | 2.18M | size_t raw_overhead = raw_bytes / 63 + 1; |
79 | 2.18M | size_t raw_est = raw_bytes + raw_overhead + kReserveSlackBytes; |
80 | 2.18M | size_t reserve_bytes = std::min(raw_est, run_bytes_est); |
81 | 2.18M | if (_bitmap_buf.capacity() < reserve_bytes) { |
82 | 8.85k | const size_t cap = _bitmap_buf.capacity(); |
83 | 8.85k | const size_t grow = cap + cap / 2; |
84 | 8.85k | const size_t new_cap = std::max(reserve_bytes, grow); |
85 | 8.85k | _bitmap_buf.reserve(new_cap); |
86 | 8.85k | } |
87 | 2.18M | } |
88 | | |
89 | 7.27M | void add_run(bool value, size_t run) { |
90 | 7.27M | _has_null |= value; |
91 | 7.27M | _rle_encoder.Put(value, run); |
92 | 7.27M | } |
93 | | |
94 | | // Returns whether the building nullmap contains nullptr |
95 | 485k | bool has_null() const { return _has_null; } |
96 | | |
97 | 156k | Status finish(OwnedSlice* slice) { |
98 | 156k | _rle_encoder.Flush(); |
99 | 156k | RETURN_IF_CATCH_EXCEPTION({ *slice = _bitmap_buf.build(); }); |
100 | 156k | return Status::OK(); |
101 | 156k | } |
102 | | |
103 | 485k | void reset() { |
104 | 485k | _has_null = false; |
105 | 485k | _rle_encoder.Clear(); |
106 | 485k | } |
107 | | |
108 | 29.0k | uint64_t size() { return _bitmap_buf.size(); } |
109 | | |
110 | | private: |
111 | | static constexpr size_t kSmallReserveBytes = 64; |
112 | | static constexpr size_t kReserveSlackBytes = 16; |
113 | | static constexpr size_t kBytesPerRun = 6; |
114 | | |
115 | | bool _has_null; |
116 | | faststring _bitmap_buf; |
117 | | RleEncoder<bool> _rle_encoder; |
118 | | }; |
119 | | |
120 | | inline ScalarColumnWriter* get_null_writer(const ColumnWriterOptions& opts, |
121 | 68.8k | io::FileWriter* file_writer, uint32_t id) { |
122 | 68.8k | if (!opts.meta->is_nullable()) { |
123 | 31.1k | return nullptr; |
124 | 31.1k | } |
125 | | |
126 | 37.6k | FieldType null_type = FieldType::OLAP_FIELD_TYPE_TINYINT; |
127 | 37.6k | ColumnWriterOptions null_options; |
128 | 37.6k | null_options.meta = opts.meta->add_children_columns(); |
129 | 37.6k | null_options.meta->set_column_id(id); |
130 | 37.6k | null_options.meta->set_unique_id(id); |
131 | 37.6k | null_options.meta->set_type(int(null_type)); |
132 | 37.6k | null_options.meta->set_is_nullable(false); |
133 | 37.6k | null_options.meta->set_length( |
134 | 37.6k | cast_set<int32_t>(get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_TINYINT>()->size())); |
135 | 37.6k | null_options.meta->set_encoding(DEFAULT_ENCODING); |
136 | 37.6k | null_options.meta->set_compression(opts.meta->compression()); |
137 | | |
138 | 37.6k | null_options.need_zone_map = false; |
139 | 37.6k | null_options.need_bloom_filter = false; |
140 | 37.6k | null_options.encoding_preference = opts.encoding_preference; |
141 | | |
142 | 37.6k | TabletColumn null_column = |
143 | 37.6k | TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, null_type, false, |
144 | 37.6k | null_options.meta->unique_id(), null_options.meta->length()); |
145 | 37.6k | null_column.set_name("nullable"); |
146 | 37.6k | null_column.set_index_length(-1); // no short key index |
147 | 37.6k | std::unique_ptr<StorageField> null_field(StorageFieldFactory::create(null_column)); |
148 | 37.6k | return new ScalarColumnWriter(null_options, std::move(null_field), file_writer); |
149 | 68.8k | } |
150 | | |
151 | | ColumnWriter::ColumnWriter(std::unique_ptr<StorageField> field, bool is_nullable, |
152 | | ColumnMetaPB* meta) |
153 | 982k | : _field(std::move(field)), _is_nullable(is_nullable), _column_meta(meta) { |
154 | 982k | _data_type = DataTypeFactory::instance().create_data_type(*_column_meta); |
155 | 982k | } |
156 | | Status ColumnWriter::create_struct_writer(const ColumnWriterOptions& opts, |
157 | | const TabletColumn* column, io::FileWriter* file_writer, |
158 | 2.10k | std::unique_ptr<ColumnWriter>* writer) { |
159 | | // not support empty struct |
160 | 2.10k | DCHECK(column->get_subtype_count() >= 1); |
161 | 2.10k | std::vector<std::unique_ptr<ColumnWriter>> sub_column_writers; |
162 | 2.10k | sub_column_writers.reserve(column->get_subtype_count()); |
163 | 11.3k | for (uint32_t i = 0; i < column->get_subtype_count(); i++) { |
164 | 9.22k | const TabletColumn& sub_column = column->get_sub_column(i); |
165 | 9.22k | RETURN_IF_ERROR(sub_column.check_valid()); |
166 | | |
167 | | // create sub writer |
168 | 9.22k | ColumnWriterOptions column_options; |
169 | 9.22k | column_options.meta = opts.meta->mutable_children_columns(i); |
170 | 9.22k | column_options.need_zone_map = false; |
171 | 9.22k | column_options.need_bloom_filter = sub_column.is_bf_column(); |
172 | 9.22k | column_options.encoding_preference = opts.encoding_preference; |
173 | 9.22k | std::unique_ptr<ColumnWriter> sub_column_writer; |
174 | 9.22k | RETURN_IF_ERROR( |
175 | 9.22k | ColumnWriter::create(column_options, &sub_column, file_writer, &sub_column_writer)); |
176 | 9.22k | sub_column_writers.push_back(std::move(sub_column_writer)); |
177 | 9.22k | } |
178 | | |
179 | 2.10k | ScalarColumnWriter* null_writer = |
180 | 2.10k | get_null_writer(opts, file_writer, column->get_subtype_count() + 1); |
181 | | |
182 | 2.10k | *writer = std::unique_ptr<ColumnWriter>(new StructColumnWriter( |
183 | 2.10k | opts, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)), null_writer, |
184 | 2.10k | sub_column_writers)); |
185 | 2.10k | return Status::OK(); |
186 | 2.10k | } |
187 | | |
188 | | Status ColumnWriter::create_array_writer(const ColumnWriterOptions& opts, |
189 | | const TabletColumn* column, io::FileWriter* file_writer, |
190 | 43.8k | std::unique_ptr<ColumnWriter>* writer) { |
191 | 43.8k | DCHECK(column->get_subtype_count() == 1); |
192 | 43.8k | const TabletColumn& item_column = column->get_sub_column(0); |
193 | 43.8k | RETURN_IF_ERROR(item_column.check_valid()); |
194 | | |
195 | | // create item writer |
196 | 43.8k | ColumnWriterOptions item_options; |
197 | 43.8k | item_options.meta = opts.meta->mutable_children_columns(0); |
198 | 43.8k | item_options.need_zone_map = false; |
199 | 43.8k | item_options.need_bloom_filter = item_column.is_bf_column(); |
200 | 43.8k | item_options.encoding_preference = opts.encoding_preference; |
201 | 43.8k | std::unique_ptr<ColumnWriter> item_writer; |
202 | 43.8k | RETURN_IF_ERROR(ColumnWriter::create(item_options, &item_column, file_writer, &item_writer)); |
203 | | |
204 | | // create length writer |
205 | 43.8k | FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT; |
206 | | |
207 | 43.8k | ColumnWriterOptions length_options; |
208 | 43.8k | length_options.meta = opts.meta->add_children_columns(); |
209 | 43.8k | length_options.meta->set_column_id(2); |
210 | 43.8k | length_options.meta->set_unique_id(2); |
211 | 43.8k | length_options.meta->set_type(int(length_type)); |
212 | 43.8k | length_options.meta->set_is_nullable(false); |
213 | 43.8k | length_options.meta->set_length(cast_set<int32_t>( |
214 | 43.8k | get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>()->size())); |
215 | 43.8k | length_options.meta->set_encoding(DEFAULT_ENCODING); |
216 | 43.8k | length_options.meta->set_compression(opts.meta->compression()); |
217 | | |
218 | 43.8k | length_options.need_zone_map = false; |
219 | 43.8k | length_options.need_bloom_filter = false; |
220 | 43.8k | length_options.encoding_preference = opts.encoding_preference; |
221 | | |
222 | 43.8k | TabletColumn length_column = |
223 | 43.8k | TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, length_type, |
224 | 43.8k | length_options.meta->is_nullable(), length_options.meta->unique_id(), |
225 | 43.8k | length_options.meta->length()); |
226 | 43.8k | length_column.set_name("length"); |
227 | 43.8k | length_column.set_index_length(-1); // no short key index |
228 | 43.8k | std::unique_ptr<StorageField> bigint_field(StorageFieldFactory::create(length_column)); |
229 | 43.8k | auto* length_writer = |
230 | 43.8k | new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer); |
231 | | |
232 | 43.8k | ScalarColumnWriter* null_writer = get_null_writer(opts, file_writer, 3); |
233 | | |
234 | 43.8k | *writer = std::unique_ptr<ColumnWriter>(new ArrayColumnWriter( |
235 | 43.8k | opts, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)), |
236 | 43.8k | length_writer, null_writer, std::move(item_writer))); |
237 | 43.8k | return Status::OK(); |
238 | 43.8k | } |
239 | | |
240 | | Status ColumnWriter::create_map_writer(const ColumnWriterOptions& opts, const TabletColumn* column, |
241 | | io::FileWriter* file_writer, |
242 | 22.9k | std::unique_ptr<ColumnWriter>* writer) { |
243 | 22.9k | DCHECK(column->get_subtype_count() == 2); |
244 | 22.9k | if (column->get_subtype_count() < 2) { |
245 | 0 | return Status::InternalError( |
246 | 0 | "If you upgraded from version 1.2.*, please DROP the MAP columns and then " |
247 | 0 | "ADD the MAP columns back."); |
248 | 0 | } |
249 | | // create key & value writer |
250 | 22.9k | std::vector<std::unique_ptr<ColumnWriter>> inner_writer_list; |
251 | 68.7k | for (int i = 0; i < 2; ++i) { |
252 | 45.8k | const TabletColumn& item_column = column->get_sub_column(i); |
253 | 45.8k | RETURN_IF_ERROR(item_column.check_valid()); |
254 | | |
255 | | // create item writer |
256 | 45.8k | ColumnWriterOptions item_options; |
257 | 45.8k | item_options.meta = opts.meta->mutable_children_columns(i); |
258 | 45.8k | item_options.need_zone_map = false; |
259 | 45.8k | item_options.need_bloom_filter = item_column.is_bf_column(); |
260 | 45.8k | item_options.encoding_preference = opts.encoding_preference; |
261 | 45.8k | std::unique_ptr<ColumnWriter> item_writer; |
262 | 45.8k | RETURN_IF_ERROR( |
263 | 45.8k | ColumnWriter::create(item_options, &item_column, file_writer, &item_writer)); |
264 | 45.8k | inner_writer_list.push_back(std::move(item_writer)); |
265 | 45.8k | } |
266 | | |
267 | | // create offset writer |
268 | 22.9k | FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT; |
269 | | |
270 | | // Be Cautious: column unique id is used for column reader creation |
271 | 22.9k | ColumnWriterOptions length_options; |
272 | 22.9k | length_options.meta = opts.meta->add_children_columns(); |
273 | 22.9k | length_options.meta->set_column_id(column->get_subtype_count() + 1); |
274 | 22.9k | length_options.meta->set_unique_id(column->get_subtype_count() + 1); |
275 | 22.9k | length_options.meta->set_type(int(length_type)); |
276 | 22.9k | length_options.meta->set_is_nullable(false); |
277 | 22.9k | length_options.meta->set_length(cast_set<int32_t>( |
278 | 22.9k | get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>()->size())); |
279 | 22.9k | length_options.meta->set_encoding(DEFAULT_ENCODING); |
280 | 22.9k | length_options.meta->set_compression(opts.meta->compression()); |
281 | | |
282 | 22.9k | length_options.need_zone_map = false; |
283 | 22.9k | length_options.need_bloom_filter = false; |
284 | 22.9k | length_options.encoding_preference = opts.encoding_preference; |
285 | | |
286 | 22.9k | TabletColumn length_column = |
287 | 22.9k | TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, length_type, |
288 | 22.9k | length_options.meta->is_nullable(), length_options.meta->unique_id(), |
289 | 22.9k | length_options.meta->length()); |
290 | 22.9k | length_column.set_name("length"); |
291 | 22.9k | length_column.set_index_length(-1); // no short key index |
292 | 22.9k | std::unique_ptr<StorageField> bigint_field(StorageFieldFactory::create(length_column)); |
293 | 22.9k | auto* length_writer = |
294 | 22.9k | new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer); |
295 | | |
296 | 22.9k | ScalarColumnWriter* null_writer = |
297 | 22.9k | get_null_writer(opts, file_writer, column->get_subtype_count() + 2); |
298 | | |
299 | 22.9k | *writer = std::unique_ptr<ColumnWriter>(new MapColumnWriter( |
300 | 22.9k | opts, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)), null_writer, |
301 | 22.9k | length_writer, inner_writer_list)); |
302 | | |
303 | 22.9k | return Status::OK(); |
304 | 22.9k | } |
305 | | |
306 | | Status ColumnWriter::create_agg_state_writer(const ColumnWriterOptions& opts, |
307 | | const TabletColumn* column, |
308 | | io::FileWriter* file_writer, |
309 | 1.27k | std::unique_ptr<ColumnWriter>* writer) { |
310 | 1.27k | auto data_type = DataTypeFactory::instance().create_data_type(*column); |
311 | 1.27k | const auto* agg_state_type = assert_cast<const DataTypeAggState*>(data_type.get()); |
312 | 1.27k | auto type = agg_state_type->get_serialized_type()->get_primitive_type(); |
313 | 1.27k | if (type == PrimitiveType::TYPE_STRING || type == PrimitiveType::INVALID_TYPE || |
314 | 1.27k | type == PrimitiveType::TYPE_FIXED_LENGTH_OBJECT || type == PrimitiveType::TYPE_BITMAP) { |
315 | 1.19k | *writer = std::unique_ptr<ColumnWriter>(new ScalarColumnWriter( |
316 | 1.19k | opts, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column)), |
317 | 1.19k | file_writer)); |
318 | 1.19k | } else if (type == PrimitiveType::TYPE_ARRAY) { |
319 | 22 | RETURN_IF_ERROR(create_array_writer(opts, column, file_writer, writer)); |
320 | 56 | } else if (type == PrimitiveType::TYPE_MAP) { |
321 | 52 | RETURN_IF_ERROR(create_map_writer(opts, column, file_writer, writer)); |
322 | 52 | } else { |
323 | 4 | throw Exception(ErrorCode::INTERNAL_ERROR, |
324 | 4 | "OLAP_FIELD_TYPE_AGG_STATE meet unsupported type: {}", |
325 | 4 | agg_state_type->get_name()); |
326 | 4 | } |
327 | 1.26k | return Status::OK(); |
328 | 1.27k | } |
329 | | |
330 | | Status ColumnWriter::create_variant_writer(const ColumnWriterOptions& opts, |
331 | | const TabletColumn* column, io::FileWriter* file_writer, |
332 | 6.43k | std::unique_ptr<ColumnWriter>* writer) { |
333 | | // Variant extracted columns have two kinds of physical writers: |
334 | | // - Doc-value snapshot column (`...__DORIS_VARIANT_DOC_VALUE__...`): use `VariantDocCompactWriter` |
335 | | // to store the doc snapshot in a compact binary form. |
336 | | // - Regular extracted subcolumns: use `VariantSubcolumnWriter`. |
337 | | // The root VARIANT column itself uses `VariantColumnWriter`. |
338 | 6.43k | if (column->is_extracted_column()) { |
339 | 468 | if (column->name().find(DOC_VALUE_COLUMN_PATH) != std::string::npos) { |
340 | 378 | *writer = std::make_unique<VariantDocCompactWriter>( |
341 | 378 | opts, column, |
342 | 378 | std::unique_ptr<StorageField>(StorageFieldFactory::create(*column))); |
343 | 378 | return Status::OK(); |
344 | 378 | } |
345 | 90 | VLOG_DEBUG << "gen subwriter for " << column->path_info_ptr()->get_path(); |
346 | 90 | *writer = std::make_unique<VariantSubcolumnWriter>( |
347 | 90 | opts, column, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column))); |
348 | 90 | return Status::OK(); |
349 | 468 | } |
350 | 5.96k | *writer = std::make_unique<VariantColumnWriter>( |
351 | 5.96k | opts, column, std::unique_ptr<StorageField>(StorageFieldFactory::create(*column))); |
352 | 5.96k | return Status::OK(); |
353 | 6.43k | } |
354 | | |
355 | | //Todo(Amory): here should according nullable and offset and need sub to simply this function |
356 | | Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* column, |
357 | 859k | io::FileWriter* file_writer, std::unique_ptr<ColumnWriter>* writer) { |
358 | 859k | std::unique_ptr<StorageField> field(StorageFieldFactory::create(*column)); |
359 | 859k | DCHECK(field.get() != nullptr); |
360 | 859k | if (is_scalar_type(column->type())) { |
361 | 795k | *writer = std::unique_ptr<ColumnWriter>( |
362 | 795k | new ScalarColumnWriter(opts, std::move(field), file_writer)); |
363 | 795k | return Status::OK(); |
364 | 795k | } else { |
365 | 63.5k | switch (column->type()) { |
366 | 1.26k | case FieldType::OLAP_FIELD_TYPE_AGG_STATE: { |
367 | 1.26k | RETURN_IF_ERROR(create_agg_state_writer(opts, column, file_writer, writer)); |
368 | 1.26k | return Status::OK(); |
369 | 1.26k | } |
370 | 2.10k | case FieldType::OLAP_FIELD_TYPE_STRUCT: { |
371 | 2.10k | RETURN_IF_ERROR(create_struct_writer(opts, column, file_writer, writer)); |
372 | 2.10k | return Status::OK(); |
373 | 2.10k | } |
374 | 43.8k | case FieldType::OLAP_FIELD_TYPE_ARRAY: { |
375 | 43.8k | RETURN_IF_ERROR(create_array_writer(opts, column, file_writer, writer)); |
376 | 43.8k | return Status::OK(); |
377 | 43.8k | } |
378 | 10.0k | case FieldType::OLAP_FIELD_TYPE_MAP: { |
379 | 10.0k | RETURN_IF_ERROR(create_map_writer(opts, column, file_writer, writer)); |
380 | 10.0k | return Status::OK(); |
381 | 10.0k | } |
382 | 6.42k | case FieldType::OLAP_FIELD_TYPE_VARIANT: { |
383 | | // Process columns with sparse column |
384 | 6.42k | RETURN_IF_ERROR(create_variant_writer(opts, column, file_writer, writer)); |
385 | 6.42k | return Status::OK(); |
386 | 6.42k | } |
387 | 0 | default: |
388 | 0 | return Status::NotSupported("unsupported type for ColumnWriter: {}", |
389 | 0 | std::to_string(int(field->type()))); |
390 | 63.5k | } |
391 | 63.5k | } |
392 | 859k | } |
393 | | |
394 | | Status ColumnWriter::append_nullable(const uint8_t* is_null_bits, const void* data, |
395 | 651k | size_t num_rows) { |
396 | 651k | const auto* ptr = (const uint8_t*)data; |
397 | 651k | BitmapIterator null_iter(is_null_bits, num_rows); |
398 | 651k | bool is_null = false; |
399 | 651k | size_t this_run = 0; |
400 | 1.30M | while ((this_run = null_iter.Next(&is_null)) > 0) { |
401 | 651k | if (is_null) { |
402 | 0 | RETURN_IF_ERROR(append_nulls(this_run)); |
403 | 651k | } else { |
404 | 651k | RETURN_IF_ERROR(append_data(&ptr, this_run)); |
405 | 651k | } |
406 | 651k | } |
407 | 651k | return Status::OK(); |
408 | 651k | } |
409 | | |
410 | | Status ColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr, |
411 | 0 | size_t num_rows) { |
412 | | // Fast path: use SIMD to detect all-NULL or all-non-NULL columns |
413 | 0 | if (config::enable_rle_batch_put_optimization) { |
414 | 0 | size_t non_null_count = |
415 | 0 | simd::count_zero_num(reinterpret_cast<const int8_t*>(null_map), num_rows); |
416 | |
|
417 | 0 | if (non_null_count == 0) { |
418 | | // All NULL: skip run-length iteration, directly append all nulls |
419 | 0 | RETURN_IF_ERROR(append_nulls(num_rows)); |
420 | 0 | *ptr += get_field()->size() * num_rows; |
421 | 0 | return Status::OK(); |
422 | 0 | } |
423 | | |
424 | 0 | if (non_null_count == num_rows) { |
425 | | // All non-NULL: skip run-length iteration, directly append all data |
426 | 0 | return append_data(ptr, num_rows); |
427 | 0 | } |
428 | 0 | } |
429 | | |
430 | | // Mixed case or sparse optimization disabled: use run-length processing |
431 | 0 | size_t offset = 0; |
432 | 0 | auto next_run_step = [&]() { |
433 | 0 | size_t step = 1; |
434 | 0 | for (auto i = offset + 1; i < num_rows; ++i) { |
435 | 0 | if (null_map[offset] == null_map[i]) { |
436 | 0 | step++; |
437 | 0 | } else { |
438 | 0 | break; |
439 | 0 | } |
440 | 0 | } |
441 | 0 | return step; |
442 | 0 | }; |
443 | |
|
444 | 0 | do { |
445 | 0 | auto step = next_run_step(); |
446 | 0 | if (null_map[offset]) { |
447 | 0 | RETURN_IF_ERROR(append_nulls(step)); |
448 | 0 | *ptr += get_field()->size() * step; |
449 | 0 | } else { |
450 | | // TODO: |
451 | | // 1. `*ptr += get_field()->size() * step;` should do in this function, not append_data; |
452 | | // 2. support array vectorized load and ptr offset add |
453 | 0 | RETURN_IF_ERROR(append_data(ptr, step)); |
454 | 0 | } |
455 | 0 | offset += step; |
456 | 0 | } while (offset < num_rows); |
457 | | |
458 | 0 | return Status::OK(); |
459 | 0 | } |
460 | | |
461 | 3.00M | Status ColumnWriter::append(const uint8_t* nullmap, const void* data, size_t num_rows) { |
462 | 3.00M | assert(data && num_rows > 0); |
463 | 3.00M | const auto* ptr = (const uint8_t*)data; |
464 | 3.00M | if (nullmap) { |
465 | 2.59M | return append_nullable(nullmap, &ptr, num_rows); |
466 | 2.59M | } else { |
467 | 405k | return append_data(&ptr, num_rows); |
468 | 405k | } |
469 | 3.00M | } |
470 | | |
471 | | /////////////////////////////////////////////////////////////////////////////////// |
472 | | |
473 | | ScalarColumnWriter::ScalarColumnWriter(const ColumnWriterOptions& opts, |
474 | | std::unique_ptr<StorageField> field, |
475 | | io::FileWriter* file_writer) |
476 | 907k | : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta), |
477 | 907k | _opts(opts), |
478 | 907k | _file_writer(file_writer), |
479 | 907k | _data_size(0) { |
480 | | // these opts.meta fields should be set by client |
481 | 907k | DCHECK(opts.meta->has_column_id()); |
482 | 907k | DCHECK(opts.meta->has_unique_id()); |
483 | 907k | DCHECK(opts.meta->has_type()); |
484 | 907k | DCHECK(opts.meta->has_length()); |
485 | 907k | DCHECK(opts.meta->has_encoding()); |
486 | 907k | DCHECK(opts.meta->has_compression()); |
487 | 907k | DCHECK(opts.meta->has_is_nullable()); |
488 | 907k | DCHECK(file_writer != nullptr); |
489 | 907k | _inverted_index_builders.resize(_opts.inverted_indexes.size()); |
490 | 907k | } |
491 | | |
492 | 908k | ScalarColumnWriter::~ScalarColumnWriter() { |
493 | | // delete all pages |
494 | 908k | _pages.clear(); |
495 | 908k | } |
496 | | |
497 | 907k | Status ScalarColumnWriter::init() { |
498 | 907k | RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec)); |
499 | | |
500 | 907k | PageBuilder* page_builder = nullptr; |
501 | | |
502 | 907k | RETURN_IF_ERROR(EncodingInfo::get(get_field()->type(), _opts.meta->encoding(), |
503 | 907k | _opts.encoding_preference, &_encoding_info)); |
504 | 907k | _opts.meta->set_encoding(_encoding_info->encoding()); |
505 | | // create page builder |
506 | 907k | PageBuilderOptions opts; |
507 | 907k | opts.data_page_size = _opts.data_page_size; |
508 | 907k | opts.dict_page_size = _opts.dict_page_size; |
509 | 907k | opts.encoding_preference = _opts.encoding_preference; |
510 | 907k | RETURN_IF_ERROR(_encoding_info->create_page_builder(opts, &page_builder)); |
511 | 907k | if (page_builder == nullptr) { |
512 | 0 | return Status::NotSupported("Failed to create page builder for type {} and encoding {}", |
513 | 0 | get_field()->type(), _opts.meta->encoding()); |
514 | 0 | } |
515 | | // should store more concrete encoding type instead of DEFAULT_ENCODING |
516 | | // because the default encoding of a data type can be changed in the future |
517 | 907k | DCHECK_NE(_opts.meta->encoding(), DEFAULT_ENCODING); |
518 | 18.4E | VLOG_DEBUG << fmt::format( |
519 | 18.4E | "[verbose] scalar column writer init, column_id={}, type={}, encoding={}, " |
520 | 18.4E | "is_nullable={}", |
521 | 18.4E | _opts.meta->column_id(), get_field()->type(), |
522 | 18.4E | EncodingTypePB_Name(_opts.meta->encoding()), _opts.meta->is_nullable()); |
523 | 907k | _page_builder.reset(page_builder); |
524 | | // create ordinal builder |
525 | 907k | _ordinal_index_builder = std::make_unique<OrdinalIndexWriter>(); |
526 | | // create null bitmap builder |
527 | 907k | if (is_nullable()) { |
528 | 486k | _null_bitmap_builder = std::make_unique<NullBitmapBuilder>(); |
529 | 486k | } |
530 | 907k | if (_opts.need_zone_map) { |
531 | 649k | RETURN_IF_ERROR( |
532 | 649k | ZoneMapIndexWriter::create(_data_type, get_field(), _zone_map_index_builder)); |
533 | 649k | } |
534 | | |
535 | 907k | if (_opts.need_inverted_index) { |
536 | 40.5k | do { |
537 | 81.3k | for (size_t i = 0; i < _opts.inverted_indexes.size(); i++) { |
538 | 40.7k | DBUG_EXECUTE_IF("column_writer.init", { |
539 | 40.7k | class InvertedIndexColumnWriterEmpty final : public IndexColumnWriter { |
540 | 40.7k | public: |
541 | 40.7k | Status init() override { return Status::OK(); } |
542 | 40.7k | Status add_values(const std::string name, const void* values, |
543 | 40.7k | size_t count) override { |
544 | 40.7k | return Status::OK(); |
545 | 40.7k | } |
546 | 40.7k | Status add_array_values(size_t field_size, const CollectionValue* values, |
547 | 40.7k | size_t count) override { |
548 | 40.7k | return Status::OK(); |
549 | 40.7k | } |
550 | 40.7k | Status add_array_values(size_t field_size, const void* value_ptr, |
551 | 40.7k | const uint8_t* null_map, const uint8_t* offsets_ptr, |
552 | 40.7k | size_t count) override { |
553 | 40.7k | return Status::OK(); |
554 | 40.7k | } |
555 | 40.7k | Status add_nulls(uint32_t count) override { return Status::OK(); } |
556 | 40.7k | Status add_array_nulls(const uint8_t* null_map, size_t num_rows) override { |
557 | 40.7k | return Status::OK(); |
558 | 40.7k | } |
559 | 40.7k | Status finish() override { return Status::OK(); } |
560 | 40.7k | int64_t size() const override { return 0; } |
561 | 40.7k | void close_on_error() override {} |
562 | 40.7k | }; |
563 | | |
564 | 40.7k | _inverted_index_builders[i] = |
565 | 40.7k | std::make_unique<InvertedIndexColumnWriterEmpty>(); |
566 | | |
567 | 40.7k | break; |
568 | 40.7k | }); |
569 | | |
570 | 40.7k | RETURN_IF_ERROR(IndexColumnWriter::create(get_field(), &_inverted_index_builders[i], |
571 | 40.7k | _opts.index_file_writer, |
572 | 40.7k | _opts.inverted_indexes[i])); |
573 | 40.7k | } |
574 | 40.5k | } while (false); |
575 | 40.5k | } |
576 | 907k | if (_opts.need_bloom_filter) { |
577 | 5.40k | if (_opts.is_ngram_bf_index) { |
578 | 3.39k | RETURN_IF_ERROR(NGramBloomFilterIndexWriterImpl::create( |
579 | 3.39k | BloomFilterOptions(), get_field()->type_info(), _opts.gram_size, |
580 | 3.39k | _opts.gram_bf_size, &_bloom_filter_index_builder)); |
581 | 3.39k | } else { |
582 | 2.01k | RETURN_IF_ERROR(BloomFilterIndexWriter::create( |
583 | 2.01k | _opts.bf_options, get_field()->type_info(), &_bloom_filter_index_builder)); |
584 | 2.01k | } |
585 | 5.40k | } |
586 | 907k | return Status::OK(); |
587 | 907k | } |
588 | | |
589 | 3.20M | Status ScalarColumnWriter::append_nulls(size_t num_rows) { |
590 | 3.20M | _null_bitmap_builder->add_run(true, num_rows); |
591 | 3.20M | _next_rowid += num_rows; |
592 | 3.20M | if (_opts.need_zone_map) { |
593 | 3.13M | _zone_map_index_builder->add_nulls(cast_set<uint32_t>(num_rows)); |
594 | 3.13M | } |
595 | 3.20M | if (_opts.need_inverted_index) { |
596 | 1.48M | for (const auto& builder : _inverted_index_builders) { |
597 | 1.48M | RETURN_IF_ERROR(builder->add_nulls(cast_set<uint32_t>(num_rows))); |
598 | 1.48M | } |
599 | 1.48M | } |
600 | 3.20M | if (_opts.need_bloom_filter) { |
601 | 573 | _bloom_filter_index_builder->add_nulls(cast_set<uint32_t>(num_rows)); |
602 | 573 | } |
603 | 3.20M | return Status::OK(); |
604 | 3.20M | } |
605 | | |
606 | | // append data to page builder. this function will make sure that |
607 | | // num_rows must be written before return. And ptr will be modified |
608 | | // to next data should be written |
609 | 4.48M | Status ScalarColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { |
610 | 4.48M | size_t remaining = num_rows; |
611 | 9.04M | while (remaining > 0) { |
612 | 4.56M | size_t num_written = remaining; |
613 | 4.56M | RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written)); |
614 | | |
615 | 4.56M | remaining -= num_written; |
616 | | |
617 | 4.56M | if (_page_builder->is_page_full()) { |
618 | 76.2k | RETURN_IF_ERROR(finish_current_page()); |
619 | 76.2k | } |
620 | 4.56M | } |
621 | 4.48M | return Status::OK(); |
622 | 4.48M | } |
623 | | |
624 | | Status ScalarColumnWriter::_internal_append_data_in_current_page(const uint8_t* data, |
625 | 4.62M | size_t* num_written) { |
626 | 4.62M | RETURN_IF_ERROR(_page_builder->add(data, num_written)); |
627 | 4.62M | if (_opts.need_zone_map) { |
628 | 4.32M | _zone_map_index_builder->add_values(data, *num_written); |
629 | 4.32M | } |
630 | 4.62M | if (_opts.need_inverted_index) { |
631 | 1.50M | for (const auto& builder : _inverted_index_builders) { |
632 | 1.50M | RETURN_IF_ERROR(builder->add_values(get_field()->name(), data, *num_written)); |
633 | 1.50M | } |
634 | 1.50M | } |
635 | 4.62M | if (_opts.need_bloom_filter) { |
636 | 5.69k | RETURN_IF_ERROR(_bloom_filter_index_builder->add_values(data, *num_written)); |
637 | 5.69k | } |
638 | | |
639 | 4.62M | _next_rowid += *num_written; |
640 | | |
641 | | // we must write null bits after write data, because we don't |
642 | | // know how many rows can be written into current page |
643 | 4.62M | if (is_nullable()) { |
644 | 4.09M | _null_bitmap_builder->add_run(false, *num_written); |
645 | 4.09M | } |
646 | 4.62M | return Status::OK(); |
647 | 4.62M | } |
648 | | |
649 | 4.62M | Status ScalarColumnWriter::append_data_in_current_page(const uint8_t** data, size_t* num_written) { |
650 | 4.62M | RETURN_IF_ERROR(append_data_in_current_page(*data, num_written)); |
651 | 4.62M | *data += get_field()->size() * (*num_written); |
652 | 4.62M | return Status::OK(); |
653 | 4.62M | } |
654 | | |
655 | | Status ScalarColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr, |
656 | 2.55M | size_t num_rows) { |
657 | | // When optimization is disabled, use base class implementation |
658 | 2.55M | if (!config::enable_rle_batch_put_optimization) { |
659 | 0 | return ColumnWriter::append_nullable(null_map, ptr, num_rows); |
660 | 0 | } |
661 | | |
662 | 2.55M | if (UNLIKELY(num_rows == 0)) { |
663 | 0 | return Status::OK(); |
664 | 0 | } |
665 | | |
666 | | // Build run-length encoded null runs using memchr for fast boundary detection |
667 | 2.55M | _null_run_buffer.clear(); |
668 | 2.55M | if (_null_run_buffer.capacity() < num_rows) { |
669 | 503k | _null_run_buffer.reserve(std::min(num_rows, size_t(256))); |
670 | 503k | } |
671 | | |
672 | 2.55M | size_t non_null_count = 0; |
673 | 2.55M | size_t offset = 0; |
674 | 7.10M | while (offset < num_rows) { |
675 | 4.54M | bool is_null = null_map[offset] != 0; |
676 | 4.54M | size_t remaining = num_rows - offset; |
677 | 4.54M | const uint8_t* run_end = |
678 | 4.54M | static_cast<const uint8_t*>(memchr(null_map + offset, is_null ? 0 : 1, remaining)); |
679 | 4.54M | size_t run_length = run_end != nullptr ? (run_end - (null_map + offset)) : remaining; |
680 | 4.54M | _null_run_buffer.push_back(NullRun {is_null, static_cast<uint32_t>(run_length)}); |
681 | 4.54M | if (!is_null) { |
682 | 3.43M | non_null_count += run_length; |
683 | 3.43M | } |
684 | 4.54M | offset += run_length; |
685 | 4.54M | } |
686 | | |
687 | | // Pre-allocate buffer based on estimated size |
688 | 2.56M | if (_null_bitmap_builder != nullptr) { |
689 | 2.56M | size_t current_rows = _next_rowid - _first_rowid; |
690 | 2.56M | size_t expected_rows = current_rows + num_rows; |
691 | 2.56M | size_t est_non_null = non_null_count; |
692 | 2.56M | if (num_rows > 0 && expected_rows > num_rows) { |
693 | 2.11M | est_non_null = (non_null_count * expected_rows) / num_rows; |
694 | 2.11M | } |
695 | 2.56M | _null_bitmap_builder->reserve_for_write(expected_rows, est_non_null); |
696 | 2.56M | } |
697 | | |
698 | 2.55M | if (non_null_count == 0) { |
699 | | // All NULL: skip data writing, only update null bitmap and indexes |
700 | 54.6k | RETURN_IF_ERROR(append_nulls(num_rows)); |
701 | 54.6k | *ptr += get_field()->size() * num_rows; |
702 | 54.6k | return Status::OK(); |
703 | 54.6k | } |
704 | | |
705 | 2.50M | if (non_null_count == num_rows) { |
706 | | // All non-NULL: use normal append_data which handles both data and null bitmap |
707 | 2.41M | return append_data(ptr, num_rows); |
708 | 2.41M | } |
709 | | |
710 | | // Process by runs |
711 | 2.04M | for (const auto& run : _null_run_buffer) { |
712 | 2.04M | size_t run_length = run.len; |
713 | 2.04M | if (run.is_null) { |
714 | 1.05M | RETURN_IF_ERROR(append_nulls(run_length)); |
715 | 1.05M | *ptr += get_field()->size() * run_length; |
716 | 1.05M | } else { |
717 | | // TODO: |
718 | | // 1. `*ptr += get_field()->size() * step;` should do in this function, not append_data; |
719 | | // 2. support array vectorized load and ptr offset add |
720 | 994k | RETURN_IF_ERROR(append_data(ptr, run_length)); |
721 | 994k | } |
722 | 2.04M | } |
723 | | |
724 | 88.1k | return Status::OK(); |
725 | 88.1k | } |
726 | | |
727 | 43.4k | uint64_t ScalarColumnWriter::estimate_buffer_size() { |
728 | 43.4k | uint64_t size = _data_size; |
729 | 43.4k | size += _page_builder->size(); |
730 | 43.4k | if (is_nullable()) { |
731 | 29.0k | size += _null_bitmap_builder->size(); |
732 | 29.0k | } |
733 | 43.4k | size += _ordinal_index_builder->size(); |
734 | 43.4k | if (_opts.need_zone_map) { |
735 | 33.3k | size += _zone_map_index_builder->size(); |
736 | 33.3k | } |
737 | 43.4k | if (_opts.need_bloom_filter) { |
738 | 267 | size += _bloom_filter_index_builder->size(); |
739 | 267 | } |
740 | 43.4k | return size; |
741 | 43.4k | } |
742 | | |
743 | 908k | Status ScalarColumnWriter::finish() { |
744 | 908k | RETURN_IF_ERROR(finish_current_page()); |
745 | 908k | _opts.meta->set_num_rows(_next_rowid); |
746 | 908k | return Status::OK(); |
747 | 908k | } |
748 | | |
749 | 908k | Status ScalarColumnWriter::write_data() { |
750 | 908k | auto offset = _file_writer->bytes_appended(); |
751 | 1.23M | auto collect_uncompressed_bytes = [](const PageFooterPB& footer) { |
752 | 1.23M | return footer.uncompressed_size() + footer.ByteSizeLong() + |
753 | 1.23M | sizeof(uint32_t) /* footer size */ + sizeof(uint32_t) /* checksum */; |
754 | 1.23M | }; |
755 | 947k | for (auto& page : _pages) { |
756 | 947k | _total_uncompressed_data_pages_size += collect_uncompressed_bytes(page->footer); |
757 | 947k | RETURN_IF_ERROR(_write_data_page(page.get())); |
758 | 947k | } |
759 | 908k | _pages.clear(); |
760 | | // write column dict |
761 | 908k | if (_encoding_info->encoding() == DICT_ENCODING) { |
762 | 285k | OwnedSlice dict_body; |
763 | 285k | RETURN_IF_ERROR(_page_builder->get_dictionary_page(&dict_body)); |
764 | 285k | EncodingTypePB dict_word_page_encoding; |
765 | 285k | RETURN_IF_ERROR(_page_builder->get_dictionary_page_encoding(&dict_word_page_encoding)); |
766 | | |
767 | 285k | PageFooterPB footer; |
768 | 285k | footer.set_type(DICTIONARY_PAGE); |
769 | 285k | footer.set_uncompressed_size(cast_set<uint32_t>(dict_body.slice().get_size())); |
770 | 285k | footer.mutable_dict_page_footer()->set_encoding(dict_word_page_encoding); |
771 | 285k | _total_uncompressed_data_pages_size += collect_uncompressed_bytes(footer); |
772 | | |
773 | 285k | PagePointer dict_pp; |
774 | 285k | RETURN_IF_ERROR(PageIO::compress_and_write_page( |
775 | 285k | _compress_codec, _opts.compression_min_space_saving, _file_writer, |
776 | 285k | {dict_body.slice()}, footer, &dict_pp)); |
777 | 285k | dict_pp.to_proto(_opts.meta->mutable_dict_page()); |
778 | 285k | } |
779 | 908k | _total_compressed_data_pages_size += _file_writer->bytes_appended() - offset; |
780 | 908k | _page_builder.reset(); |
781 | 908k | return Status::OK(); |
782 | 908k | } |
783 | | |
784 | 870k | Status ScalarColumnWriter::write_ordinal_index() { |
785 | 870k | return _ordinal_index_builder->finish(_file_writer, _opts.meta->add_indexes()); |
786 | 870k | } |
787 | | |
788 | 701k | Status ScalarColumnWriter::write_zone_map() { |
789 | 701k | if (_opts.need_zone_map) { |
790 | 648k | return _zone_map_index_builder->finish(_file_writer, _opts.meta->add_indexes()); |
791 | 648k | } |
792 | 52.5k | return Status::OK(); |
793 | 701k | } |
794 | | |
795 | 620k | Status ScalarColumnWriter::write_inverted_index() { |
796 | 620k | if (_opts.need_inverted_index) { |
797 | 40.7k | for (const auto& builder : _inverted_index_builders) { |
798 | 40.7k | RETURN_IF_ERROR(builder->finish()); |
799 | 40.7k | } |
800 | 40.5k | } |
801 | 620k | return Status::OK(); |
802 | 620k | } |
803 | | |
804 | 610k | Status ScalarColumnWriter::write_bloom_filter_index() { |
805 | 610k | if (_opts.need_bloom_filter) { |
806 | 5.40k | return _bloom_filter_index_builder->finish(_file_writer, _opts.meta->add_indexes()); |
807 | 5.40k | } |
808 | 605k | return Status::OK(); |
809 | 610k | } |
810 | | |
811 | | // write a data page into file and update ordinal index |
812 | 947k | Status ScalarColumnWriter::_write_data_page(Page* page) { |
813 | 947k | PagePointer pp; |
814 | 947k | std::vector<Slice> compressed_body; |
815 | 1.79M | for (auto& data : page->data) { |
816 | 1.79M | compressed_body.push_back(data.slice()); |
817 | 1.79M | } |
818 | 947k | RETURN_IF_ERROR(PageIO::write_page(_file_writer, compressed_body, page->footer, &pp)); |
819 | 947k | _ordinal_index_builder->append_entry(page->footer.data_page_footer().first_ordinal(), pp); |
820 | 947k | return Status::OK(); |
821 | 947k | } |
822 | | |
823 | 985k | Status ScalarColumnWriter::finish_current_page() { |
824 | 985k | if (_next_rowid == _first_rowid) { |
825 | 38.1k | return Status::OK(); |
826 | 38.1k | } |
827 | 947k | if (_opts.need_zone_map) { |
828 | | // If the number of rows in the current page is less than the threshold, |
829 | | // we will invalidate zone map index for this page by set pass_all to true. |
830 | 714k | if (_next_rowid - _first_rowid < config::zone_map_row_num_threshold) { |
831 | 482k | _zone_map_index_builder->invalid_page_zone_map(); |
832 | 482k | } |
833 | 714k | RETURN_IF_ERROR(_zone_map_index_builder->flush()); |
834 | 714k | } |
835 | | |
836 | 947k | if (_opts.need_bloom_filter) { |
837 | 5.65k | RETURN_IF_ERROR(_bloom_filter_index_builder->flush()); |
838 | 5.65k | } |
839 | | |
840 | 947k | _raw_data_bytes += _page_builder->get_raw_data_size(); |
841 | | |
842 | | // build data page body : encoded values + [nullmap] |
843 | 947k | std::vector<Slice> body; |
844 | 947k | OwnedSlice encoded_values; |
845 | 947k | RETURN_IF_ERROR(_page_builder->finish(&encoded_values)); |
846 | 947k | RETURN_IF_ERROR(_page_builder->reset()); |
847 | 947k | body.push_back(encoded_values.slice()); |
848 | | |
849 | 947k | OwnedSlice nullmap; |
850 | 947k | if (_null_bitmap_builder != nullptr) { |
851 | 485k | if (is_nullable() && _null_bitmap_builder->has_null()) { |
852 | 156k | RETURN_IF_ERROR(_null_bitmap_builder->finish(&nullmap)); |
853 | 156k | body.push_back(nullmap.slice()); |
854 | 156k | } |
855 | 485k | _null_bitmap_builder->reset(); |
856 | 485k | } |
857 | | |
858 | | // prepare data page footer |
859 | 947k | std::unique_ptr<Page> page(new Page()); |
860 | 947k | page->footer.set_type(DATA_PAGE); |
861 | 947k | page->footer.set_uncompressed_size(cast_set<uint32_t>(Slice::compute_total_size(body))); |
862 | 947k | auto* data_page_footer = page->footer.mutable_data_page_footer(); |
863 | 947k | data_page_footer->set_first_ordinal(_first_rowid); |
864 | 947k | data_page_footer->set_num_values(_next_rowid - _first_rowid); |
865 | 947k | data_page_footer->set_nullmap_size(cast_set<uint32_t>(nullmap.slice().size)); |
866 | 947k | if (_new_page_callback != nullptr) { |
867 | 66.6k | _new_page_callback->put_extra_info_in_page(data_page_footer); |
868 | 66.6k | } |
869 | | // trying to compress page body |
870 | 947k | OwnedSlice compressed_body; |
871 | 947k | RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving, |
872 | 947k | body, &compressed_body)); |
873 | 947k | if (compressed_body.slice().empty()) { |
874 | | // page body is uncompressed |
875 | 844k | page->data.emplace_back(std::move(encoded_values)); |
876 | 844k | page->data.emplace_back(std::move(nullmap)); |
877 | 844k | } else { |
878 | | // page body is compressed |
879 | 102k | page->data.emplace_back(std::move(compressed_body)); |
880 | 102k | } |
881 | | |
882 | 947k | _push_back_page(std::move(page)); |
883 | 947k | _first_rowid = _next_rowid; |
884 | 947k | return Status::OK(); |
885 | 947k | } |
886 | | |
887 | | //////////////////////////////////////////////////////////////////////////////// |
888 | | |
889 | | //////////////////////////////////////////////////////////////////////////////// |
890 | | // offset column writer |
891 | | //////////////////////////////////////////////////////////////////////////////// |
892 | | |
893 | | OffsetColumnWriter::OffsetColumnWriter(const ColumnWriterOptions& opts, |
894 | | std::unique_ptr<StorageField> field, |
895 | | io::FileWriter* file_writer) |
896 | 66.7k | : ScalarColumnWriter(opts, std::move(field), file_writer) { |
897 | | // now we only explain data in offset column as uint64 |
898 | 66.7k | DCHECK(get_field()->type() == FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT); |
899 | 66.7k | } |
900 | | |
901 | 66.7k | OffsetColumnWriter::~OffsetColumnWriter() = default; |
902 | | |
903 | 66.6k | Status OffsetColumnWriter::init() { |
904 | 66.6k | RETURN_IF_ERROR(ScalarColumnWriter::init()); |
905 | 66.6k | register_flush_page_callback(this); |
906 | 66.6k | _next_offset = 0; |
907 | 66.6k | return Status::OK(); |
908 | 66.6k | } |
909 | | |
910 | 67.2k | Status OffsetColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { |
911 | 67.2k | size_t remaining = num_rows; |
912 | 135k | while (remaining > 0) { |
913 | 67.7k | size_t num_written = remaining; |
914 | 67.7k | RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written)); |
915 | | // _next_offset after append_data_in_current_page is the offset of next data, which will used in finish_current_page() to set next_array_item_ordinal |
916 | 67.7k | _next_offset = *(const uint64_t*)(*ptr); |
917 | 67.7k | remaining -= num_written; |
918 | | |
919 | 67.7k | if (_page_builder->is_page_full()) { |
920 | | // get next data for next array_item_rowid |
921 | 609 | RETURN_IF_ERROR(finish_current_page()); |
922 | 609 | } |
923 | 67.7k | } |
924 | 67.2k | return Status::OK(); |
925 | 67.2k | } |
926 | | |
927 | 66.6k | void OffsetColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) { |
928 | 66.6k | footer->set_next_array_item_ordinal(_next_offset); |
929 | 66.6k | } |
930 | | |
931 | | StructColumnWriter::StructColumnWriter( |
932 | | const ColumnWriterOptions& opts, std::unique_ptr<StorageField> field, |
933 | | ScalarColumnWriter* null_writer, |
934 | | std::vector<std::unique_ptr<ColumnWriter>>& sub_column_writers) |
935 | 2.10k | : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta), _opts(opts) { |
936 | 9.23k | for (auto& sub_column_writer : sub_column_writers) { |
937 | 9.23k | _sub_column_writers.push_back(std::move(sub_column_writer)); |
938 | 9.23k | } |
939 | 2.10k | _num_sub_column_writers = _sub_column_writers.size(); |
940 | 2.10k | DCHECK(_num_sub_column_writers >= 1); |
941 | 2.10k | if (is_nullable()) { |
942 | 1.83k | _null_writer.reset(null_writer); |
943 | 1.83k | } |
944 | 2.10k | } |
945 | | |
946 | 2.10k | Status StructColumnWriter::init() { |
947 | 9.22k | for (auto& column_writer : _sub_column_writers) { |
948 | 9.22k | RETURN_IF_ERROR(column_writer->init()); |
949 | 9.22k | } |
950 | 2.10k | if (is_nullable()) { |
951 | 1.83k | RETURN_IF_ERROR(_null_writer->init()); |
952 | 1.83k | } |
953 | 2.10k | return Status::OK(); |
954 | 2.10k | } |
955 | | |
956 | 1.49k | Status StructColumnWriter::write_inverted_index() { |
957 | 1.49k | if (_opts.need_inverted_index) { |
958 | 0 | for (auto& column_writer : _sub_column_writers) { |
959 | 0 | RETURN_IF_ERROR(column_writer->write_inverted_index()); |
960 | 0 | } |
961 | 0 | } |
962 | 1.49k | return Status::OK(); |
963 | 1.49k | } |
964 | | |
965 | | Status StructColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr, |
966 | 1.79k | size_t num_rows) { |
967 | 1.79k | RETURN_IF_ERROR(append_data(ptr, num_rows)); |
968 | 1.79k | RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows)); |
969 | 1.79k | return Status::OK(); |
970 | 1.79k | } |
971 | | |
972 | 2.06k | Status StructColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { |
973 | 2.06k | const auto* results = reinterpret_cast<const uint64_t*>(*ptr); |
974 | 10.9k | for (size_t i = 0; i < _num_sub_column_writers; ++i) { |
975 | 8.88k | auto nullmap = *(results + _num_sub_column_writers + i); |
976 | 8.88k | auto data = *(results + i); |
977 | 8.88k | RETURN_IF_ERROR(_sub_column_writers[i]->append(reinterpret_cast<const uint8_t*>(nullmap), |
978 | 8.88k | reinterpret_cast<const void*>(data), |
979 | 8.88k | num_rows)); |
980 | 8.88k | } |
981 | 2.06k | return Status::OK(); |
982 | 2.06k | } |
983 | | |
984 | 186 | uint64_t StructColumnWriter::estimate_buffer_size() { |
985 | 186 | uint64_t size = 0; |
986 | 780 | for (auto& column_writer : _sub_column_writers) { |
987 | 780 | size += column_writer->estimate_buffer_size(); |
988 | 780 | } |
989 | 186 | size += is_nullable() ? _null_writer->estimate_buffer_size() : 0; |
990 | 186 | return size; |
991 | 186 | } |
992 | | |
993 | 2.10k | Status StructColumnWriter::finish() { |
994 | 9.23k | for (auto& column_writer : _sub_column_writers) { |
995 | 9.23k | RETURN_IF_ERROR(column_writer->finish()); |
996 | 9.23k | } |
997 | 2.10k | if (is_nullable()) { |
998 | 1.83k | RETURN_IF_ERROR(_null_writer->finish()); |
999 | 1.83k | } |
1000 | 2.10k | _opts.meta->set_num_rows(get_next_rowid()); |
1001 | 2.10k | return Status::OK(); |
1002 | 2.10k | } |
1003 | | |
1004 | 2.10k | Status StructColumnWriter::write_data() { |
1005 | 9.22k | for (auto& column_writer : _sub_column_writers) { |
1006 | 9.22k | RETURN_IF_ERROR(column_writer->write_data()); |
1007 | 9.22k | } |
1008 | 2.10k | if (is_nullable()) { |
1009 | 1.83k | RETURN_IF_ERROR(_null_writer->write_data()); |
1010 | 1.83k | } |
1011 | 2.10k | return Status::OK(); |
1012 | 2.10k | } |
1013 | | |
1014 | 2.06k | Status StructColumnWriter::write_ordinal_index() { |
1015 | 8.88k | for (auto& column_writer : _sub_column_writers) { |
1016 | 8.88k | RETURN_IF_ERROR(column_writer->write_ordinal_index()); |
1017 | 8.88k | } |
1018 | 2.06k | if (is_nullable()) { |
1019 | 1.79k | RETURN_IF_ERROR(_null_writer->write_ordinal_index()); |
1020 | 1.79k | } |
1021 | 2.06k | return Status::OK(); |
1022 | 2.06k | } |
1023 | | |
1024 | 0 | Status StructColumnWriter::append_nulls(size_t num_rows) { |
1025 | 0 | for (auto& column_writer : _sub_column_writers) { |
1026 | 0 | RETURN_IF_ERROR(column_writer->append_nulls(num_rows)); |
1027 | 0 | } |
1028 | 0 | if (is_nullable()) { |
1029 | 0 | std::vector<UInt8> null_signs(num_rows, 1); |
1030 | 0 | const uint8_t* null_sign_ptr = null_signs.data(); |
1031 | 0 | RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, num_rows)); |
1032 | 0 | } |
1033 | 0 | return Status::OK(); |
1034 | 0 | } |
1035 | | |
1036 | 0 | Status StructColumnWriter::finish_current_page() { |
1037 | 0 | return Status::NotSupported("struct writer has no data, can not finish_current_page"); |
1038 | 0 | } |
1039 | | |
1040 | | ArrayColumnWriter::ArrayColumnWriter(const ColumnWriterOptions& opts, |
1041 | | std::unique_ptr<StorageField> field, |
1042 | | OffsetColumnWriter* offset_writer, |
1043 | | ScalarColumnWriter* null_writer, |
1044 | | std::unique_ptr<ColumnWriter> item_writer) |
1045 | 43.7k | : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta), |
1046 | 43.7k | _item_writer(std::move(item_writer)), |
1047 | 43.7k | _opts(opts) { |
1048 | 43.7k | _offset_writer.reset(offset_writer); |
1049 | 43.7k | if (is_nullable()) { |
1050 | 28.0k | _null_writer.reset(null_writer); |
1051 | 28.0k | } |
1052 | 43.7k | } |
1053 | | |
1054 | 43.7k | Status ArrayColumnWriter::init() { |
1055 | 43.7k | RETURN_IF_ERROR(_offset_writer->init()); |
1056 | 43.7k | if (is_nullable()) { |
1057 | 28.1k | RETURN_IF_ERROR(_null_writer->init()); |
1058 | 28.1k | } |
1059 | 43.7k | RETURN_IF_ERROR(_item_writer->init()); |
1060 | 43.7k | if (_opts.need_inverted_index) { |
1061 | 1.79k | auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get()); |
1062 | 1.79k | if (writer != nullptr) { |
1063 | 1.79k | RETURN_IF_ERROR(IndexColumnWriter::create(get_field(), &_inverted_index_writer, |
1064 | 1.79k | _opts.index_file_writer, |
1065 | 1.79k | _opts.inverted_indexes[0])); |
1066 | 1.79k | } |
1067 | 1.79k | } |
1068 | 43.7k | if (_opts.need_ann_index) { |
1069 | 60 | auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get()); |
1070 | 60 | if (writer != nullptr) { |
1071 | 60 | _ann_index_writer = std::make_unique<AnnIndexColumnWriter>(_opts.index_file_writer, |
1072 | 60 | _opts.ann_index); |
1073 | 60 | RETURN_IF_ERROR(_ann_index_writer->init()); |
1074 | 60 | } |
1075 | 60 | } |
1076 | 43.7k | return Status::OK(); |
1077 | 43.7k | } |
1078 | | |
1079 | 39.9k | Status ArrayColumnWriter::write_inverted_index() { |
1080 | 39.9k | if (_opts.need_inverted_index) { |
1081 | 1.79k | return _inverted_index_writer->finish(); |
1082 | 1.79k | } |
1083 | 38.1k | return Status::OK(); |
1084 | 39.9k | } |
1085 | | |
1086 | 39.8k | Status ArrayColumnWriter::write_ann_index() { |
1087 | 39.8k | if (_opts.need_ann_index) { |
1088 | 57 | return _ann_index_writer->finish(); |
1089 | 57 | } |
1090 | 39.8k | return Status::OK(); |
1091 | 39.8k | } |
1092 | | |
1093 | | // batch append data for array |
1094 | 43.4k | Status ArrayColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { |
1095 | | // data_ptr contains |
1096 | | // [size, offset_ptr, item_data_ptr, item_nullmap_ptr] |
1097 | 43.4k | auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr); |
1098 | | // total number length |
1099 | 43.4k | size_t element_cnt = size_t((unsigned long)(*data_ptr)); |
1100 | 43.4k | auto offset_data = *(data_ptr + 1); |
1101 | 43.4k | const uint8_t* offsets_ptr = (const uint8_t*)offset_data; |
1102 | 43.4k | auto data = *(data_ptr + 2); |
1103 | 43.4k | auto nested_null_map = *(data_ptr + 3); |
1104 | 43.4k | if (element_cnt > 0) { |
1105 | 27.4k | RETURN_IF_ERROR(_item_writer->append(reinterpret_cast<const uint8_t*>(nested_null_map), |
1106 | 27.4k | reinterpret_cast<const void*>(data), element_cnt)); |
1107 | 27.4k | } |
1108 | 43.4k | if (_opts.need_inverted_index) { |
1109 | 1.79k | auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get()); |
1110 | | // now only support nested type is scala |
1111 | 1.79k | if (writer != nullptr) { |
1112 | | //NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr |
1113 | 1.79k | RETURN_IF_ERROR(_inverted_index_writer->add_array_values( |
1114 | 1.79k | _item_writer->get_field()->size(), reinterpret_cast<const void*>(data), |
1115 | 1.79k | reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows)); |
1116 | 1.79k | } |
1117 | 1.79k | } |
1118 | | |
1119 | 43.4k | if (_opts.need_ann_index) { |
1120 | 60 | auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get()); |
1121 | | // now only support nested type is scala |
1122 | 60 | if (writer != nullptr) { |
1123 | | //NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr |
1124 | 60 | RETURN_IF_ERROR(_ann_index_writer->add_array_values( |
1125 | 60 | _item_writer->get_field()->size(), reinterpret_cast<const void*>(data), |
1126 | 60 | reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows)); |
1127 | 60 | } else { |
1128 | 0 | return Status::NotSupported( |
1129 | 0 | "Ann index can only be build on array with scalar type. but got {} as " |
1130 | 0 | "nested", |
1131 | 0 | _item_writer->get_field()->type()); |
1132 | 0 | } |
1133 | 60 | } |
1134 | | |
1135 | 43.4k | RETURN_IF_ERROR(_offset_writer->append_data(&offsets_ptr, num_rows)); |
1136 | 43.4k | return Status::OK(); |
1137 | 43.4k | } |
1138 | | |
1139 | 287 | uint64_t ArrayColumnWriter::estimate_buffer_size() { |
1140 | 287 | return _offset_writer->estimate_buffer_size() + |
1141 | 287 | (is_nullable() ? _null_writer->estimate_buffer_size() : 0) + |
1142 | 287 | _item_writer->estimate_buffer_size(); |
1143 | 287 | } |
1144 | | |
1145 | | Status ArrayColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr, |
1146 | 27.6k | size_t num_rows) { |
1147 | 27.6k | RETURN_IF_ERROR(append_data(ptr, num_rows)); |
1148 | 27.6k | if (is_nullable()) { |
1149 | 27.6k | if (_opts.need_inverted_index) { |
1150 | 1.39k | RETURN_IF_ERROR(_inverted_index_writer->add_array_nulls(null_map, num_rows)); |
1151 | 1.39k | } |
1152 | 27.6k | RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows)); |
1153 | 27.6k | } |
1154 | 27.6k | return Status::OK(); |
1155 | 27.6k | } |
1156 | | |
1157 | 43.8k | Status ArrayColumnWriter::finish() { |
1158 | 43.8k | RETURN_IF_ERROR(_offset_writer->finish()); |
1159 | 43.8k | if (is_nullable()) { |
1160 | 28.1k | RETURN_IF_ERROR(_null_writer->finish()); |
1161 | 28.1k | } |
1162 | 43.8k | RETURN_IF_ERROR(_item_writer->finish()); |
1163 | 43.8k | _opts.meta->set_num_rows(get_next_rowid()); |
1164 | 43.8k | return Status::OK(); |
1165 | 43.8k | } |
1166 | | |
1167 | 43.8k | Status ArrayColumnWriter::write_data() { |
1168 | 43.8k | RETURN_IF_ERROR(_offset_writer->write_data()); |
1169 | 43.8k | if (is_nullable()) { |
1170 | 28.1k | RETURN_IF_ERROR(_null_writer->write_data()); |
1171 | 28.1k | } |
1172 | 43.8k | RETURN_IF_ERROR(_item_writer->write_data()); |
1173 | 43.8k | return Status::OK(); |
1174 | 43.8k | } |
1175 | | |
1176 | 43.3k | Status ArrayColumnWriter::write_ordinal_index() { |
1177 | 43.3k | RETURN_IF_ERROR(_offset_writer->write_ordinal_index()); |
1178 | 43.3k | if (is_nullable()) { |
1179 | 27.6k | RETURN_IF_ERROR(_null_writer->write_ordinal_index()); |
1180 | 27.6k | } |
1181 | 43.3k | if (!has_empty_items()) { |
1182 | 27.4k | RETURN_IF_ERROR(_item_writer->write_ordinal_index()); |
1183 | 27.4k | } |
1184 | 43.3k | return Status::OK(); |
1185 | 43.3k | } |
1186 | | |
1187 | 0 | Status ArrayColumnWriter::append_nulls(size_t num_rows) { |
1188 | 0 | size_t num_lengths = num_rows; |
1189 | 0 | const ordinal_t offset = _item_writer->get_next_rowid(); |
1190 | 0 | while (num_lengths > 0) { |
1191 | | // TODO llj bulk write |
1192 | 0 | const auto* offset_ptr = reinterpret_cast<const uint8_t*>(&offset); |
1193 | 0 | RETURN_IF_ERROR(_offset_writer->append_data(&offset_ptr, 1)); |
1194 | 0 | --num_lengths; |
1195 | 0 | } |
1196 | 0 | return write_null_column(num_rows, true); |
1197 | 0 | } |
1198 | | |
1199 | 0 | Status ArrayColumnWriter::write_null_column(size_t num_rows, bool is_null) { |
1200 | 0 | uint8_t null_sign = is_null ? 1 : 0; |
1201 | 0 | while (is_nullable() && num_rows > 0) { |
1202 | | // TODO llj bulk write |
1203 | 0 | const uint8_t* null_sign_ptr = &null_sign; |
1204 | 0 | RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, 1)); |
1205 | 0 | --num_rows; |
1206 | 0 | } |
1207 | 0 | return Status::OK(); |
1208 | 0 | } |
1209 | | |
1210 | 0 | Status ArrayColumnWriter::finish_current_page() { |
1211 | 0 | return Status::NotSupported("array writer has no data, can not finish_current_page"); |
1212 | 0 | } |
1213 | | |
1214 | | /// ============================= MapColumnWriter =====================//// |
1215 | | MapColumnWriter::MapColumnWriter(const ColumnWriterOptions& opts, |
1216 | | std::unique_ptr<StorageField> field, |
1217 | | ScalarColumnWriter* null_writer, OffsetColumnWriter* offset_writer, |
1218 | | std::vector<std::unique_ptr<ColumnWriter>>& kv_writers) |
1219 | 22.8k | : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta), _opts(opts) { |
1220 | 22.8k | CHECK_EQ(kv_writers.size(), 2); |
1221 | 22.8k | _offsets_writer.reset(offset_writer); |
1222 | 22.8k | if (is_nullable()) { |
1223 | 7.76k | _null_writer.reset(null_writer); |
1224 | 7.76k | } |
1225 | 45.8k | for (auto& sub_writers : kv_writers) { |
1226 | 45.8k | _kv_writers.push_back(std::move(sub_writers)); |
1227 | 45.8k | } |
1228 | 22.8k | } |
1229 | | |
1230 | 22.8k | Status MapColumnWriter::init() { |
1231 | 22.8k | RETURN_IF_ERROR(_offsets_writer->init()); |
1232 | 22.8k | if (is_nullable()) { |
1233 | 7.77k | RETURN_IF_ERROR(_null_writer->init()); |
1234 | 7.77k | } |
1235 | | // here register_flush_page_callback to call this.put_extra_info_in_page() |
1236 | | // when finish cur data page |
1237 | 45.8k | for (auto& sub_writer : _kv_writers) { |
1238 | 45.8k | RETURN_IF_ERROR(sub_writer->init()); |
1239 | 45.8k | } |
1240 | 22.8k | return Status::OK(); |
1241 | 22.8k | } |
1242 | | |
1243 | 426 | uint64_t MapColumnWriter::estimate_buffer_size() { |
1244 | 426 | size_t estimate = 0; |
1245 | 852 | for (auto& sub_writer : _kv_writers) { |
1246 | 852 | estimate += sub_writer->estimate_buffer_size(); |
1247 | 852 | } |
1248 | 426 | estimate += _offsets_writer->estimate_buffer_size(); |
1249 | 426 | if (is_nullable()) { |
1250 | 405 | estimate += _null_writer->estimate_buffer_size(); |
1251 | 405 | } |
1252 | 426 | return estimate; |
1253 | 426 | } |
1254 | | |
1255 | 22.9k | Status MapColumnWriter::finish() { |
1256 | 22.9k | RETURN_IF_ERROR(_offsets_writer->finish()); |
1257 | 22.9k | if (is_nullable()) { |
1258 | 7.77k | RETURN_IF_ERROR(_null_writer->finish()); |
1259 | 7.77k | } |
1260 | 45.8k | for (auto& sub_writer : _kv_writers) { |
1261 | 45.8k | RETURN_IF_ERROR(sub_writer->finish()); |
1262 | 45.8k | } |
1263 | 22.9k | _opts.meta->set_num_rows(get_next_rowid()); |
1264 | 22.9k | return Status::OK(); |
1265 | 22.9k | } |
1266 | | |
1267 | | Status MapColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr, |
1268 | 7.62k | size_t num_rows) { |
1269 | 7.62k | RETURN_IF_ERROR(append_data(ptr, num_rows)); |
1270 | 7.62k | if (is_nullable()) { |
1271 | 7.62k | RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows)); |
1272 | 7.62k | } |
1273 | 7.62k | return Status::OK(); |
1274 | 7.62k | } |
1275 | | |
1276 | | // write key value data with offsets |
1277 | 23.8k | Status MapColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { |
1278 | | // data_ptr contains |
1279 | | // [size, offset_ptr, key_data_ptr, val_data_ptr, k_nullmap_ptr, v_nullmap_pr] |
1280 | | // which converted results from olap_map_convertor and later will use a structure to replace it |
1281 | 23.8k | auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr); |
1282 | | // total number length |
1283 | 23.8k | size_t element_cnt = size_t((unsigned long)(*data_ptr)); |
1284 | 23.8k | auto offset_data = *(data_ptr + 1); |
1285 | 23.8k | const uint8_t* offsets_ptr = (const uint8_t*)offset_data; |
1286 | | |
1287 | 23.8k | if (element_cnt > 0) { |
1288 | 41.2k | for (size_t i = 0; i < 2; ++i) { |
1289 | 27.5k | auto data = *(data_ptr + 2 + i); |
1290 | 27.5k | auto nested_null_map = *(data_ptr + 2 + 2 + i); |
1291 | 27.5k | RETURN_IF_ERROR( |
1292 | 27.5k | _kv_writers[i]->append(reinterpret_cast<const uint8_t*>(nested_null_map), |
1293 | 27.5k | reinterpret_cast<const void*>(data), element_cnt)); |
1294 | 27.5k | } |
1295 | 13.7k | } |
1296 | | // make sure the order : offset writer flush next_array_item_ordinal after kv_writers append_data |
1297 | | // because we use _kv_writers[0]->get_next_rowid() to set next_array_item_ordinal in offset page footer |
1298 | 23.8k | RETURN_IF_ERROR(_offsets_writer->append_data(&offsets_ptr, num_rows)); |
1299 | 23.8k | return Status::OK(); |
1300 | 23.8k | } |
1301 | | |
1302 | 22.9k | Status MapColumnWriter::write_data() { |
1303 | 22.9k | RETURN_IF_ERROR(_offsets_writer->write_data()); |
1304 | 22.9k | if (is_nullable()) { |
1305 | 7.77k | RETURN_IF_ERROR(_null_writer->write_data()); |
1306 | 7.77k | } |
1307 | 45.8k | for (auto& sub_writer : _kv_writers) { |
1308 | 45.8k | RETURN_IF_ERROR(sub_writer->write_data()); |
1309 | 45.8k | } |
1310 | 22.9k | return Status::OK(); |
1311 | 22.9k | } |
1312 | | |
1313 | 22.6k | Status MapColumnWriter::write_ordinal_index() { |
1314 | 22.6k | RETURN_IF_ERROR(_offsets_writer->write_ordinal_index()); |
1315 | 22.6k | if (is_nullable()) { |
1316 | 7.51k | RETURN_IF_ERROR(_null_writer->write_ordinal_index()); |
1317 | 7.51k | } |
1318 | 45.3k | for (auto& sub_writer : _kv_writers) { |
1319 | 45.3k | if (sub_writer->get_next_rowid() != 0) { |
1320 | 25.5k | RETURN_IF_ERROR(sub_writer->write_ordinal_index()); |
1321 | 25.5k | } |
1322 | 45.3k | } |
1323 | 22.6k | return Status::OK(); |
1324 | 22.6k | } |
1325 | | |
1326 | 0 | Status MapColumnWriter::append_nulls(size_t num_rows) { |
1327 | 0 | for (auto& sub_writer : _kv_writers) { |
1328 | 0 | RETURN_IF_ERROR(sub_writer->append_nulls(num_rows)); |
1329 | 0 | } |
1330 | 0 | const ordinal_t offset = _kv_writers[0]->get_next_rowid(); |
1331 | 0 | std::vector<UInt8> offsets_data(num_rows, cast_set<uint8_t>(offset)); |
1332 | 0 | const uint8_t* offsets_ptr = offsets_data.data(); |
1333 | 0 | RETURN_IF_ERROR(_offsets_writer->append_data(&offsets_ptr, num_rows)); |
1334 | | |
1335 | 0 | if (is_nullable()) { |
1336 | 0 | std::vector<UInt8> null_signs(num_rows, 1); |
1337 | 0 | const uint8_t* null_sign_ptr = null_signs.data(); |
1338 | 0 | RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, num_rows)); |
1339 | 0 | } |
1340 | 0 | return Status::OK(); |
1341 | 0 | } |
1342 | | |
1343 | 0 | Status MapColumnWriter::finish_current_page() { |
1344 | 0 | return Status::NotSupported("map writer has no data, can not finish_current_page"); |
1345 | 0 | } |
1346 | | |
1347 | 9.29k | Status MapColumnWriter::write_inverted_index() { |
1348 | 9.29k | if (_opts.need_inverted_index) { |
1349 | 0 | return _index_builder->finish(); |
1350 | 0 | } |
1351 | 9.29k | return Status::OK(); |
1352 | 9.29k | } |
1353 | | |
1354 | | VariantColumnWriter::VariantColumnWriter(const ColumnWriterOptions& opts, |
1355 | | const TabletColumn* column, |
1356 | | std::unique_ptr<StorageField> field) |
1357 | 5.95k | : ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta) { |
1358 | 5.95k | _impl = std::make_unique<VariantColumnWriterImpl>(opts, column); |
1359 | 5.95k | } |
1360 | | |
1361 | 5.94k | Status VariantColumnWriter::init() { |
1362 | 5.94k | return _impl->init(); |
1363 | 5.94k | } |
1364 | | |
1365 | 665 | Status VariantColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) { |
1366 | 665 | _next_rowid += num_rows; |
1367 | 665 | return _impl->append_data(ptr, num_rows); |
1368 | 665 | } |
1369 | | |
1370 | 859 | uint64_t VariantColumnWriter::estimate_buffer_size() { |
1371 | 859 | return _impl->estimate_buffer_size(); |
1372 | 859 | } |
1373 | | |
1374 | 5.97k | Status VariantColumnWriter::finish() { |
1375 | 5.97k | return _impl->finish(); |
1376 | 5.97k | } |
1377 | 5.97k | Status VariantColumnWriter::write_data() { |
1378 | 5.97k | return _impl->write_data(); |
1379 | 5.97k | } |
1380 | 5.97k | Status VariantColumnWriter::write_ordinal_index() { |
1381 | 5.97k | return _impl->write_ordinal_index(); |
1382 | 5.97k | } |
1383 | | |
1384 | 5.97k | Status VariantColumnWriter::write_zone_map() { |
1385 | 5.97k | return _impl->write_zone_map(); |
1386 | 5.97k | } |
1387 | | |
1388 | 5.95k | Status VariantColumnWriter::write_inverted_index() { |
1389 | 5.95k | return _impl->write_inverted_index(); |
1390 | 5.95k | } |
1391 | 5.95k | Status VariantColumnWriter::write_bloom_filter_index() { |
1392 | 5.95k | return _impl->write_bloom_filter_index(); |
1393 | 5.95k | } |
1394 | | |
1395 | | Status VariantColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr, |
1396 | 5.51k | size_t num_rows) { |
1397 | 5.51k | return _impl->append_nullable(null_map, ptr, num_rows); |
1398 | 5.51k | } |
1399 | | |
1400 | | } // namespace doris::segment_v2 |