be/src/storage/segment/binary_dict_page.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/segment/binary_dict_page.h" |
19 | | |
20 | | #include <gen_cpp/olap_file.pb.h> |
21 | | #include <gen_cpp/segment_v2.pb.h> |
22 | | |
23 | | #include <algorithm> |
24 | | #include <ostream> |
25 | | #include <utility> |
26 | | |
27 | | #include "common/compiler_util.h" // IWYU pragma: keep |
28 | | #include "common/config.h" |
29 | | #include "common/logging.h" |
30 | | #include "common/status.h" |
31 | | #include "core/column/column.h" |
32 | | #include "core/column/column_string.h" |
33 | | #include "storage/segment/binary_plain_page_v2.h" |
34 | | #include "storage/segment/bitshuffle_page.h" |
35 | | #include "storage/segment/encoding_info.h" |
36 | | #include "util/coding.h" |
37 | | #include "util/slice.h" // for Slice |
38 | | |
39 | | namespace doris { |
40 | | #include "common/compile_check_begin.h" |
41 | | struct StringRef; |
42 | | |
43 | | namespace segment_v2 { |
44 | | |
45 | | BinaryDictPageBuilder::BinaryDictPageBuilder(const PageBuilderOptions& options) |
46 | 3.86k | : _options(options), |
47 | 3.86k | _finished(false), |
48 | 3.86k | _data_page_builder(nullptr), |
49 | 3.86k | _dict_builder(nullptr), |
50 | 3.86k | _encoding_type(DICT_ENCODING), |
51 | | _dict_word_page_encoding_type( |
52 | 3.86k | options.encoding_preference.binary_plain_encoding_default_impl == |
53 | 3.86k | BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2 |
54 | 3.86k | ? PLAIN_ENCODING_V2 |
55 | 3.86k | : PLAIN_ENCODING), |
56 | | _fallback_binary_encoding_type( |
57 | 3.86k | options.encoding_preference.binary_plain_encoding_default_impl == |
58 | 3.86k | BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2 |
59 | 3.86k | ? PLAIN_ENCODING_V2 |
60 | 3.86k | : PLAIN_ENCODING) {} |
61 | | |
62 | 3.86k | Status BinaryDictPageBuilder::init() { |
63 | | // initially use DICT_ENCODING |
64 | | // TODO: the data page builder type can be created by Factory according to user config |
65 | 3.86k | PageBuilder* data_page_builder_ptr = nullptr; |
66 | 3.86k | RETURN_IF_ERROR(BitshufflePageBuilder<FieldType::OLAP_FIELD_TYPE_INT>::create( |
67 | 3.86k | &data_page_builder_ptr, _options)); |
68 | 3.86k | _data_page_builder.reset(data_page_builder_ptr); |
69 | 3.86k | PageBuilderOptions dict_builder_options; |
70 | | // here the binary plain page is used to store the dictionary items so |
71 | | // the data page size is set to the same as the dict page size |
72 | 3.86k | dict_builder_options.data_page_size = _options.dict_page_size; |
73 | 3.86k | dict_builder_options.dict_page_size = _options.dict_page_size; |
74 | 3.86k | dict_builder_options.is_dict_page = true; |
75 | | |
76 | 3.86k | const EncodingInfo* encoding_info; |
77 | 3.86k | RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR, |
78 | 3.86k | _dict_word_page_encoding_type, {}, &encoding_info)); |
79 | 3.86k | RETURN_IF_ERROR(encoding_info->create_page_builder(dict_builder_options, _dict_builder)); |
80 | 3.86k | return reset(); |
81 | 3.86k | } |
82 | | |
83 | 1.06M | bool BinaryDictPageBuilder::is_page_full() { |
84 | 1.06M | if (_data_page_builder->is_page_full()) { |
85 | 2.00k | return true; |
86 | 2.00k | } |
87 | 1.05M | if (_encoding_type == DICT_ENCODING && _dict_builder->is_page_full()) { |
88 | 3.13k | return true; |
89 | 3.13k | } |
90 | 1.05M | return false; |
91 | 1.05M | } |
92 | | |
93 | 219k | Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) { |
94 | 219k | if (_encoding_type == DICT_ENCODING) { |
95 | 16.4k | DCHECK(!_finished); |
96 | 16.4k | DCHECK_GT(*count, 0); |
97 | 16.4k | const Slice* src = reinterpret_cast<const Slice*>(vals); |
98 | 16.4k | size_t num_added = 0; |
99 | 16.4k | uint32_t value_code = -1; |
100 | 16.4k | auto* actual_builder = dynamic_cast<BitshufflePageBuilder<FieldType::OLAP_FIELD_TYPE_INT>*>( |
101 | 16.4k | _data_page_builder.get()); |
102 | | |
103 | 16.4k | if (_data_page_builder->count() == 0) { |
104 | 3.83k | _first_value.assign_copy(reinterpret_cast<const uint8_t*>(src->get_data()), |
105 | 3.83k | src->get_size()); |
106 | 3.83k | } |
107 | | |
108 | 856k | for (int i = 0; i < *count; ++i, ++src) { |
109 | 841k | if (is_page_full()) { |
110 | 1.56k | break; |
111 | 1.56k | } |
112 | | |
113 | 840k | if (src->empty() && _has_empty) { |
114 | 87.8k | value_code = _empty_code; |
115 | 752k | } else if (auto iter = _dictionary.find(*src); iter != _dictionary.end()) { |
116 | 625k | value_code = iter->second; |
117 | 625k | } else { |
118 | 126k | Slice dict_item(src->data, src->size); |
119 | 126k | if (src->size > 0) { |
120 | 126k | char* item_mem = _arena.alloc(src->size); |
121 | 126k | if (item_mem == nullptr) { |
122 | 0 | return Status::MemoryAllocFailed("memory allocate failed, size:{}", |
123 | 0 | src->size); |
124 | 0 | } |
125 | 126k | dict_item.relocate(item_mem); |
126 | 126k | } |
127 | 126k | value_code = cast_set<uint32_t>(_dictionary.size()); |
128 | 126k | size_t add_count = 1; |
129 | 126k | RETURN_IF_ERROR(_dict_builder->add(reinterpret_cast<const uint8_t*>(&dict_item), |
130 | 126k | &add_count)); |
131 | 126k | if (add_count == 0) { |
132 | | // current dict page is full, stop processing remaining inputs |
133 | 0 | break; |
134 | 0 | } |
135 | 126k | _dictionary.emplace(dict_item, value_code); |
136 | 126k | if (src->empty()) { |
137 | 321 | _has_empty = true; |
138 | 321 | _empty_code = value_code; |
139 | 321 | } |
140 | 126k | } |
141 | 840k | size_t add_count = 1; |
142 | 840k | RETURN_IF_ERROR(actual_builder->single_add( |
143 | 840k | reinterpret_cast<const uint8_t*>(&value_code), &add_count)); |
144 | 840k | if (add_count == 0) { |
145 | | // current data page is full, stop processing remaining inputs |
146 | 0 | break; |
147 | 0 | } |
148 | | // Track raw data size: the original string size |
149 | 840k | _raw_data_size += src->size; |
150 | 840k | num_added += 1; |
151 | 840k | } |
152 | 16.4k | *count = num_added; |
153 | 16.4k | return Status::OK(); |
154 | 203k | } else { |
155 | 203k | DCHECK(_encoding_type == PLAIN_ENCODING || _encoding_type == PLAIN_ENCODING_V2); |
156 | 203k | RETURN_IF_ERROR(_data_page_builder->add(vals, count)); |
157 | | // For plain encoding, track raw data size from the input |
158 | 203k | const Slice* src = reinterpret_cast<const Slice*>(vals); |
159 | 443k | for (size_t i = 0; i < *count; ++i) { |
160 | 240k | _raw_data_size += src[i].size; |
161 | 240k | } |
162 | 203k | return Status::OK(); |
163 | 203k | } |
164 | 219k | } |
165 | | |
166 | 7.36k | Status BinaryDictPageBuilder::finish(OwnedSlice* slice) { |
167 | 7.36k | if (VLOG_DEBUG_IS_ON && _encoding_type == DICT_ENCODING) { |
168 | 0 | VLOG_DEBUG << "dict page size:" << _dict_builder->size(); |
169 | 0 | } |
170 | | |
171 | 7.36k | DCHECK(!_finished); |
172 | 7.36k | _finished = true; |
173 | | |
174 | 7.36k | OwnedSlice data_slice; |
175 | 7.36k | RETURN_IF_ERROR(_data_page_builder->finish(&data_slice)); |
176 | | // TODO(gaodayue) separate page header and content to avoid this copy |
177 | 7.36k | RETURN_IF_CATCH_EXCEPTION( |
178 | 7.36k | { _buffer.append(data_slice.slice().data, data_slice.slice().size); }); |
179 | 7.36k | encode_fixed32_le(&_buffer[0], _encoding_type); |
180 | 7.36k | *slice = _buffer.build(); |
181 | 7.36k | return Status::OK(); |
182 | 7.36k | } |
183 | | |
184 | 11.2k | Status BinaryDictPageBuilder::reset() { |
185 | 11.2k | RETURN_IF_CATCH_EXCEPTION({ |
186 | 11.2k | _finished = false; |
187 | 11.2k | _raw_data_size = 0; |
188 | 11.2k | _buffer.reserve(_options.data_page_size + BINARY_DICT_PAGE_HEADER_SIZE); |
189 | 11.2k | _buffer.resize(BINARY_DICT_PAGE_HEADER_SIZE); |
190 | | |
191 | 11.2k | if (_encoding_type == DICT_ENCODING && _dict_builder->is_page_full()) { |
192 | 11.2k | DCHECK(_fallback_binary_encoding_type == PLAIN_ENCODING || |
193 | 11.2k | _fallback_binary_encoding_type == PLAIN_ENCODING_V2); |
194 | 11.2k | const EncodingInfo* encoding_info; |
195 | 11.2k | RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR, |
196 | 11.2k | _fallback_binary_encoding_type, {}, &encoding_info)); |
197 | 11.2k | RETURN_IF_ERROR(encoding_info->create_page_builder(_options, _data_page_builder)); |
198 | 11.2k | _encoding_type = _fallback_binary_encoding_type; |
199 | 11.2k | } else { |
200 | 11.2k | RETURN_IF_ERROR(_data_page_builder->reset()); |
201 | 11.2k | } |
202 | 11.2k | }); |
203 | 11.2k | return Status::OK(); |
204 | 11.2k | } |
205 | | |
206 | 3 | size_t BinaryDictPageBuilder::count() const { |
207 | 3 | return _data_page_builder->count(); |
208 | 3 | } |
209 | | |
210 | 3.48k | uint64_t BinaryDictPageBuilder::size() const { |
211 | 3.48k | return _arena.used_size() + _data_page_builder->size(); |
212 | 3.48k | } |
213 | | |
214 | 3.82k | Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) { |
215 | 3.82k | return _dict_builder->finish(dictionary_page); |
216 | 3.82k | } |
217 | | |
218 | 3.82k | Status BinaryDictPageBuilder::get_dictionary_page_encoding(EncodingTypePB* encoding) const { |
219 | 3.82k | *encoding = _dict_word_page_encoding_type; |
220 | 3.82k | return Status::OK(); |
221 | 3.82k | } |
222 | | |
223 | 3 | Status BinaryDictPageBuilder::get_first_value(void* value) const { |
224 | 3 | DCHECK(_finished); |
225 | 3 | if (_data_page_builder->count() == 0) { |
226 | 0 | return Status::Error<ErrorCode::ENTRY_NOT_FOUND>("page is empty"); |
227 | 0 | } |
228 | 3 | if (_encoding_type != DICT_ENCODING) { |
229 | 0 | return _data_page_builder->get_first_value(value); |
230 | 0 | } |
231 | 3 | *reinterpret_cast<Slice*>(value) = Slice(_first_value); |
232 | 3 | return Status::OK(); |
233 | 3 | } |
234 | | |
235 | 3 | Status BinaryDictPageBuilder::get_last_value(void* value) const { |
236 | 3 | DCHECK(_finished); |
237 | 3 | if (_data_page_builder->count() == 0) { |
238 | 0 | return Status::Error<ErrorCode::ENTRY_NOT_FOUND>("page is empty"); |
239 | 0 | } |
240 | 3 | if (_encoding_type != DICT_ENCODING) { |
241 | 0 | return _data_page_builder->get_last_value(value); |
242 | 0 | } |
243 | 3 | uint32_t value_code; |
244 | 3 | RETURN_IF_ERROR(_data_page_builder->get_last_value(&value_code)); |
245 | 3 | RETURN_IF_ERROR(_dict_builder->get_dict_word(value_code, reinterpret_cast<Slice*>(value))); |
246 | 3 | return Status::OK(); |
247 | 3 | } |
248 | | |
249 | 7.26k | uint64_t BinaryDictPageBuilder::get_raw_data_size() const { |
250 | 7.26k | return _raw_data_size; |
251 | 7.26k | } |
252 | | |
253 | | BinaryDictPageDecoder::BinaryDictPageDecoder(Slice data, const PageDecoderOptions& options) |
254 | 4.68k | : _data(data), |
255 | 4.68k | _options(options), |
256 | 4.68k | _data_page_decoder(nullptr), |
257 | 4.68k | _parsed(false), |
258 | 4.68k | _encoding_type(UNKNOWN_ENCODING) {} |
259 | | |
260 | 4.68k | Status BinaryDictPageDecoder::init() { |
261 | 4.68k | CHECK(!_parsed); |
262 | 4.68k | if (_data.size < BINARY_DICT_PAGE_HEADER_SIZE) { |
263 | 0 | return Status::Corruption("invalid data size:{}, header size:{}", _data.size, |
264 | 0 | BINARY_DICT_PAGE_HEADER_SIZE); |
265 | 0 | } |
266 | 4.68k | size_t type = decode_fixed32_le((const uint8_t*)&_data.data[0]); |
267 | 4.68k | _encoding_type = static_cast<EncodingTypePB>(type); |
268 | 4.68k | _data.remove_prefix(BINARY_DICT_PAGE_HEADER_SIZE); |
269 | 4.68k | if (_encoding_type == DICT_ENCODING) { |
270 | 2.75k | _data_page_decoder.reset( |
271 | 2.75k | _bit_shuffle_ptr = |
272 | 2.75k | new BitShufflePageDecoder<FieldType::OLAP_FIELD_TYPE_INT>(_data, _options)); |
273 | 2.75k | } else if (_encoding_type == PLAIN_ENCODING) { |
274 | 1.78k | _data_page_decoder.reset( |
275 | 1.78k | new BinaryPlainPageDecoder<FieldType::OLAP_FIELD_TYPE_VARCHAR>(_data, _options)); |
276 | 1.78k | } else if (_encoding_type == PLAIN_ENCODING_V2) { |
277 | 141 | _data_page_decoder.reset( |
278 | 141 | new BinaryPlainPageV2Decoder<FieldType::OLAP_FIELD_TYPE_VARCHAR>(_data, _options)); |
279 | 141 | } else { |
280 | 0 | LOG(WARNING) << "invalid encoding type:" << _encoding_type; |
281 | 0 | return Status::Corruption("invalid encoding type:{}", _encoding_type); |
282 | 0 | } |
283 | | |
284 | 4.68k | RETURN_IF_ERROR(_data_page_decoder->init()); |
285 | 4.68k | _parsed = true; |
286 | 4.68k | return Status::OK(); |
287 | 4.68k | } |
288 | | |
289 | 4.68k | BinaryDictPageDecoder::~BinaryDictPageDecoder() {} |
290 | | |
291 | 651 | Status BinaryDictPageDecoder::seek_to_position_in_page(size_t pos) { |
292 | 651 | return _data_page_decoder->seek_to_position_in_page(pos); |
293 | 651 | } |
294 | | |
295 | 18.5k | bool BinaryDictPageDecoder::is_dict_encoding() const { |
296 | 18.5k | return _encoding_type == DICT_ENCODING; |
297 | 18.5k | } |
298 | | |
299 | 2.75k | void BinaryDictPageDecoder::set_dict_decoder(uint32_t num_dict_items, StringRef* dict_word_info) { |
300 | 2.75k | _num_dict_items = num_dict_items; |
301 | 2.75k | _dict_word_info = dict_word_info; |
302 | 2.75k | }; |
303 | | |
304 | 8.30k | Status BinaryDictPageDecoder::next_batch(size_t* n, MutableColumnPtr& dst) { |
305 | 8.30k | if (!is_dict_encoding()) { |
306 | 1.42k | dst = dst->convert_to_predicate_column_if_dictionary(); |
307 | 1.42k | return _data_page_decoder->next_batch(n, dst); |
308 | 1.42k | } |
309 | | // dictionary encoding |
310 | 8.30k | DCHECK(_parsed); |
311 | 6.88k | DCHECK(_dict_word_info != nullptr) << "_dict_word_info is nullptr"; |
312 | | |
313 | 6.88k | if (*n == 0 || _bit_shuffle_ptr->_cur_index >= _bit_shuffle_ptr->_num_elements) [[unlikely]] { |
314 | 0 | *n = 0; |
315 | 0 | return Status::OK(); |
316 | 0 | } |
317 | | |
318 | 6.88k | size_t max_fetch = std::min(*n, static_cast<size_t>(_bit_shuffle_ptr->_num_elements - |
319 | 6.88k | _bit_shuffle_ptr->_cur_index)); |
320 | 6.88k | *n = max_fetch; |
321 | | |
322 | 6.88k | if (_options.only_read_offsets) { |
323 | | // OFFSET_ONLY mode: resolve dict codes to get real string lengths |
324 | | // without copying actual char data. This allows length() to work. |
325 | 0 | const auto* data_array = reinterpret_cast<const int32_t*>(_bit_shuffle_ptr->get_data(0)); |
326 | 0 | size_t start_index = _bit_shuffle_ptr->_cur_index; |
327 | | // Reuse _buffer (int32_t vector) to store uint32_t lengths. |
328 | | // int32_t and uint32_t have the same size/alignment, and string |
329 | | // lengths are always non-negative, so the bit patterns are identical. |
330 | 0 | _buffer.resize(max_fetch); |
331 | 0 | for (size_t i = 0; i < max_fetch; ++i) { |
332 | 0 | int32_t codeword = data_array[start_index + i]; |
333 | 0 | _buffer[i] = static_cast<int32_t>(_dict_word_info[codeword].size); |
334 | 0 | } |
335 | 0 | dst->insert_offsets_from_lengths(reinterpret_cast<const uint32_t*>(_buffer.data()), |
336 | 0 | max_fetch); |
337 | 6.88k | } else { |
338 | 6.88k | const auto* data_array = reinterpret_cast<const int32_t*>(_bit_shuffle_ptr->get_data(0)); |
339 | 6.88k | size_t start_index = _bit_shuffle_ptr->_cur_index; |
340 | | |
341 | 6.88k | dst->insert_many_dict_data(data_array, start_index, _dict_word_info, max_fetch, |
342 | 6.88k | _num_dict_items); |
343 | 6.88k | } |
344 | | |
345 | 6.88k | _bit_shuffle_ptr->_cur_index += max_fetch; |
346 | | |
347 | 6.88k | return Status::OK(); |
348 | 6.88k | } |
349 | | |
350 | | Status BinaryDictPageDecoder::read_by_rowids(const rowid_t* rowids, ordinal_t page_first_ordinal, |
351 | 1.13k | size_t* n, MutableColumnPtr& dst) { |
352 | 1.13k | if (!is_dict_encoding()) { |
353 | 507 | dst = dst->convert_to_predicate_column_if_dictionary(); |
354 | 507 | return _data_page_decoder->read_by_rowids(rowids, page_first_ordinal, n, dst); |
355 | 507 | } |
356 | 1.13k | DCHECK(_parsed); |
357 | 626 | DCHECK(_dict_word_info != nullptr) << "_dict_word_info is nullptr"; |
358 | | |
359 | 626 | if (*n == 0) [[unlikely]] { |
360 | 0 | *n = 0; |
361 | 0 | return Status::OK(); |
362 | 0 | } |
363 | | |
364 | 626 | auto total = *n; |
365 | | |
366 | 626 | if (_options.only_read_offsets) { |
367 | | // OFFSET_ONLY mode: resolve dict codes to get real string lengths |
368 | | // without copying actual char data. This allows length() to work correctly. |
369 | 0 | const auto* data_array = reinterpret_cast<const int32_t*>(_bit_shuffle_ptr->get_data(0)); |
370 | 0 | size_t read_count = 0; |
371 | 0 | _buffer.resize(total); |
372 | 0 | for (size_t i = 0; i < total; ++i) { |
373 | 0 | ordinal_t ord = rowids[i] - page_first_ordinal; |
374 | 0 | if (ord >= _bit_shuffle_ptr->_num_elements) [[unlikely]] { |
375 | 0 | break; |
376 | 0 | } |
377 | 0 | int32_t codeword = data_array[ord]; |
378 | 0 | _buffer[read_count] = static_cast<int32_t>(_dict_word_info[codeword].size); |
379 | 0 | read_count++; |
380 | 0 | } |
381 | 0 | if (read_count > 0) { |
382 | 0 | dst->insert_offsets_from_lengths(reinterpret_cast<const uint32_t*>(_buffer.data()), |
383 | 0 | read_count); |
384 | 0 | } |
385 | 0 | *n = read_count; |
386 | 0 | return Status::OK(); |
387 | 0 | } |
388 | | |
389 | 626 | const auto* data_array = reinterpret_cast<const int32_t*>(_bit_shuffle_ptr->get_data(0)); |
390 | 626 | size_t read_count = 0; |
391 | 626 | _buffer.resize(total); |
392 | 25.6k | for (size_t i = 0; i < total; ++i) { |
393 | 25.0k | ordinal_t ord = rowids[i] - page_first_ordinal; |
394 | 25.0k | if (ord >= _bit_shuffle_ptr->_num_elements) [[unlikely]] { |
395 | 0 | break; |
396 | 0 | } |
397 | | |
398 | 25.0k | _buffer[read_count++] = data_array[ord]; |
399 | 25.0k | } |
400 | | |
401 | 626 | if (LIKELY(read_count > 0)) { |
402 | 626 | dst->insert_many_dict_data(_buffer.data(), 0, _dict_word_info, read_count, _num_dict_items); |
403 | 626 | } |
404 | 626 | *n = read_count; |
405 | 626 | return Status::OK(); |
406 | 626 | } |
407 | | |
408 | | #include "common/compile_check_end.h" |
409 | | } // namespace segment_v2 |
410 | | } // namespace doris |