be/src/storage/segment/binary_dict_page.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/segment/binary_dict_page.h" |
19 | | |
20 | | #include <gen_cpp/olap_file.pb.h> |
21 | | #include <gen_cpp/segment_v2.pb.h> |
22 | | |
23 | | #include <algorithm> |
24 | | #include <ostream> |
25 | | #include <utility> |
26 | | |
27 | | #include "common/compiler_util.h" // IWYU pragma: keep |
28 | | #include "common/config.h" |
29 | | #include "common/logging.h" |
30 | | #include "common/status.h" |
31 | | #include "core/column/column.h" |
32 | | #include "core/column/column_string.h" |
33 | | #include "storage/segment/binary_plain_page_v2.h" |
34 | | #include "storage/segment/bitshuffle_page.h" |
35 | | #include "storage/segment/encoding_info.h" |
36 | | #include "util/coding.h" |
37 | | #include "util/slice.h" // for Slice |
38 | | |
39 | | namespace doris { |
40 | | struct StringRef; |
41 | | |
42 | | namespace segment_v2 { |
43 | | |
44 | | BinaryDictPageBuilder::BinaryDictPageBuilder(const PageBuilderOptions& options) |
45 | 3.89k | : _options(options), |
46 | 3.89k | _finished(false), |
47 | 3.89k | _data_page_builder(nullptr), |
48 | 3.89k | _dict_builder(nullptr), |
49 | 3.89k | _encoding_type(DICT_ENCODING), |
50 | | _dict_word_page_encoding_type( |
51 | 3.89k | options.encoding_preference.binary_plain_encoding_default_impl == |
52 | 3.89k | BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2 |
53 | 3.89k | ? PLAIN_ENCODING_V2 |
54 | 3.89k | : PLAIN_ENCODING), |
55 | | _fallback_binary_encoding_type( |
56 | 3.89k | options.encoding_preference.binary_plain_encoding_default_impl == |
57 | 3.89k | BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2 |
58 | 3.89k | ? PLAIN_ENCODING_V2 |
59 | 3.89k | : PLAIN_ENCODING) {} |
60 | | |
61 | 3.89k | Status BinaryDictPageBuilder::init() { |
62 | | // initially use DICT_ENCODING |
63 | | // TODO: the data page builder type can be created by Factory according to user config |
64 | 3.89k | PageBuilder* data_page_builder_ptr = nullptr; |
65 | 3.89k | RETURN_IF_ERROR(BitshufflePageBuilder<FieldType::OLAP_FIELD_TYPE_INT>::create( |
66 | 3.89k | &data_page_builder_ptr, _options)); |
67 | 3.89k | _data_page_builder.reset(data_page_builder_ptr); |
68 | 3.89k | PageBuilderOptions dict_builder_options; |
69 | | // here the binary plain page is used to store the dictionary items so |
70 | | // the data page size is set to the same as the dict page size |
71 | 3.89k | dict_builder_options.data_page_size = _options.dict_page_size; |
72 | 3.89k | dict_builder_options.dict_page_size = _options.dict_page_size; |
73 | 3.89k | dict_builder_options.is_dict_page = true; |
74 | | |
75 | 3.89k | const EncodingInfo* encoding_info; |
76 | 3.89k | RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR, |
77 | 3.89k | _dict_word_page_encoding_type, {}, &encoding_info)); |
78 | 3.89k | RETURN_IF_ERROR(encoding_info->create_page_builder(dict_builder_options, _dict_builder)); |
79 | 3.89k | return reset(); |
80 | 3.89k | } |
81 | | |
82 | 1.06M | bool BinaryDictPageBuilder::is_page_full() { |
83 | 1.06M | if (_data_page_builder->is_page_full()) { |
84 | 2.00k | return true; |
85 | 2.00k | } |
86 | 1.05M | if (_encoding_type == DICT_ENCODING && _dict_builder->is_page_full()) { |
87 | 3.13k | return true; |
88 | 3.13k | } |
89 | 1.05M | return false; |
90 | 1.05M | } |
91 | | |
92 | 219k | Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) { |
93 | 219k | if (_encoding_type == DICT_ENCODING) { |
94 | 16.4k | DCHECK(!_finished); |
95 | 16.4k | DCHECK_GT(*count, 0); |
96 | 16.4k | const Slice* src = reinterpret_cast<const Slice*>(vals); |
97 | 16.4k | size_t num_added = 0; |
98 | 16.4k | uint32_t value_code = -1; |
99 | 16.4k | auto* actual_builder = dynamic_cast<BitshufflePageBuilder<FieldType::OLAP_FIELD_TYPE_INT>*>( |
100 | 16.4k | _data_page_builder.get()); |
101 | | |
102 | 856k | for (int i = 0; i < *count; ++i, ++src) { |
103 | 841k | if (is_page_full()) { |
104 | 1.56k | break; |
105 | 1.56k | } |
106 | | |
107 | 840k | if (src->empty() && _has_empty) { |
108 | 87.8k | value_code = _empty_code; |
109 | 752k | } else if (auto iter = _dictionary.find(*src); iter != _dictionary.end()) { |
110 | 625k | value_code = iter->second; |
111 | 625k | } else { |
112 | 126k | Slice dict_item(src->data, src->size); |
113 | 126k | if (src->size > 0) { |
114 | 126k | char* item_mem = _arena.alloc(src->size); |
115 | 126k | if (item_mem == nullptr) { |
116 | 0 | return Status::MemoryAllocFailed("memory allocate failed, size:{}", |
117 | 0 | src->size); |
118 | 0 | } |
119 | 126k | dict_item.relocate(item_mem); |
120 | 126k | } |
121 | 126k | value_code = cast_set<uint32_t>(_dictionary.size()); |
122 | 126k | size_t add_count = 1; |
123 | 126k | RETURN_IF_ERROR(_dict_builder->add(reinterpret_cast<const uint8_t*>(&dict_item), |
124 | 126k | &add_count)); |
125 | 126k | if (add_count == 0) { |
126 | | // current dict page is full, stop processing remaining inputs |
127 | 0 | break; |
128 | 0 | } |
129 | 126k | _dictionary.emplace(dict_item, value_code); |
130 | 126k | if (src->empty()) { |
131 | 325 | _has_empty = true; |
132 | 325 | _empty_code = value_code; |
133 | 325 | } |
134 | 126k | } |
135 | 840k | size_t add_count = 1; |
136 | 840k | RETURN_IF_ERROR(actual_builder->single_add( |
137 | 840k | reinterpret_cast<const uint8_t*>(&value_code), &add_count)); |
138 | 840k | if (add_count == 0) { |
139 | | // current data page is full, stop processing remaining inputs |
140 | 0 | break; |
141 | 0 | } |
142 | | // Track raw data size: the original string size |
143 | 840k | _raw_data_size += src->size; |
144 | 840k | num_added += 1; |
145 | 840k | } |
146 | 16.4k | *count = num_added; |
147 | 16.4k | return Status::OK(); |
148 | 203k | } else { |
149 | 203k | DCHECK(_encoding_type == PLAIN_ENCODING || _encoding_type == PLAIN_ENCODING_V2); |
150 | 203k | RETURN_IF_ERROR(_data_page_builder->add(vals, count)); |
151 | | // For plain encoding, track raw data size from the input |
152 | 203k | const Slice* src = reinterpret_cast<const Slice*>(vals); |
153 | 443k | for (size_t i = 0; i < *count; ++i) { |
154 | 240k | _raw_data_size += src[i].size; |
155 | 240k | } |
156 | 203k | return Status::OK(); |
157 | 203k | } |
158 | 219k | } |
159 | | |
160 | 7.38k | Status BinaryDictPageBuilder::finish(OwnedSlice* slice) { |
161 | 7.38k | if (VLOG_DEBUG_IS_ON && _encoding_type == DICT_ENCODING) { |
162 | 0 | VLOG_DEBUG << "dict page size:" << _dict_builder->size(); |
163 | 0 | } |
164 | | |
165 | 7.38k | DCHECK(!_finished); |
166 | 7.38k | _finished = true; |
167 | | |
168 | 7.38k | OwnedSlice data_slice; |
169 | 7.38k | RETURN_IF_ERROR(_data_page_builder->finish(&data_slice)); |
170 | | // TODO(gaodayue) separate page header and content to avoid this copy |
171 | 7.38k | RETURN_IF_CATCH_EXCEPTION( |
172 | 7.38k | { _buffer.append(data_slice.slice().data, data_slice.slice().size); }); |
173 | 7.38k | encode_fixed32_le(&_buffer[0], _encoding_type); |
174 | 7.38k | *slice = _buffer.build(); |
175 | 7.38k | return Status::OK(); |
176 | 7.38k | } |
177 | | |
178 | 11.2k | Status BinaryDictPageBuilder::reset() { |
179 | 11.2k | RETURN_IF_CATCH_EXCEPTION({ |
180 | 11.2k | _finished = false; |
181 | 11.2k | _raw_data_size = 0; |
182 | 11.2k | _buffer.reserve(_options.data_page_size + BINARY_DICT_PAGE_HEADER_SIZE); |
183 | 11.2k | _buffer.resize(BINARY_DICT_PAGE_HEADER_SIZE); |
184 | | |
185 | 11.2k | if (_encoding_type == DICT_ENCODING && _dict_builder->is_page_full()) { |
186 | 11.2k | DCHECK(_fallback_binary_encoding_type == PLAIN_ENCODING || |
187 | 11.2k | _fallback_binary_encoding_type == PLAIN_ENCODING_V2); |
188 | 11.2k | const EncodingInfo* encoding_info; |
189 | 11.2k | RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR, |
190 | 11.2k | _fallback_binary_encoding_type, {}, &encoding_info)); |
191 | 11.2k | RETURN_IF_ERROR(encoding_info->create_page_builder(_options, _data_page_builder)); |
192 | 11.2k | _encoding_type = _fallback_binary_encoding_type; |
193 | 11.2k | } else { |
194 | 11.2k | RETURN_IF_ERROR(_data_page_builder->reset()); |
195 | 11.2k | } |
196 | 11.2k | }); |
197 | 11.2k | return Status::OK(); |
198 | 11.2k | } |
199 | | |
200 | 3 | size_t BinaryDictPageBuilder::count() const { |
201 | 3 | return _data_page_builder->count(); |
202 | 3 | } |
203 | | |
204 | 3.48k | uint64_t BinaryDictPageBuilder::size() const { |
205 | 3.48k | return _arena.used_size() + _data_page_builder->size(); |
206 | 3.48k | } |
207 | | |
208 | 3.86k | Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) { |
209 | 3.86k | return _dict_builder->finish(dictionary_page); |
210 | 3.86k | } |
211 | | |
212 | 3.86k | Status BinaryDictPageBuilder::get_dictionary_page_encoding(EncodingTypePB* encoding) const { |
213 | 3.86k | *encoding = _dict_word_page_encoding_type; |
214 | 3.86k | return Status::OK(); |
215 | 3.86k | } |
216 | | |
217 | 7.27k | uint64_t BinaryDictPageBuilder::get_raw_data_size() const { |
218 | 7.27k | return _raw_data_size; |
219 | 7.27k | } |
220 | | |
221 | | BinaryDictPageDecoder::BinaryDictPageDecoder(Slice data, const PageDecoderOptions& options) |
222 | 4.70k | : _data(data), |
223 | 4.70k | _options(options), |
224 | 4.70k | _data_page_decoder(nullptr), |
225 | 4.70k | _parsed(false), |
226 | 4.70k | _encoding_type(UNKNOWN_ENCODING) {} |
227 | | |
228 | 4.70k | Status BinaryDictPageDecoder::init() { |
229 | 4.70k | CHECK(!_parsed); |
230 | 4.70k | if (_data.size < BINARY_DICT_PAGE_HEADER_SIZE) { |
231 | 0 | return Status::Corruption("invalid data size:{}, header size:{}", _data.size, |
232 | 0 | BINARY_DICT_PAGE_HEADER_SIZE); |
233 | 0 | } |
234 | 4.70k | size_t type = decode_fixed32_le((const uint8_t*)&_data.data[0]); |
235 | 4.70k | _encoding_type = static_cast<EncodingTypePB>(type); |
236 | 4.70k | _data.remove_prefix(BINARY_DICT_PAGE_HEADER_SIZE); |
237 | 4.70k | if (_encoding_type == DICT_ENCODING) { |
238 | 2.77k | _data_page_decoder.reset( |
239 | 2.77k | _bit_shuffle_ptr = |
240 | 2.77k | new BitShufflePageDecoder<FieldType::OLAP_FIELD_TYPE_INT>(_data, _options)); |
241 | 2.77k | } else if (_encoding_type == PLAIN_ENCODING) { |
242 | 1.78k | _data_page_decoder.reset( |
243 | 1.78k | new BinaryPlainPageDecoder<FieldType::OLAP_FIELD_TYPE_VARCHAR>(_data, _options)); |
244 | 1.78k | } else if (_encoding_type == PLAIN_ENCODING_V2) { |
245 | 141 | _data_page_decoder.reset( |
246 | 141 | new BinaryPlainPageV2Decoder<FieldType::OLAP_FIELD_TYPE_VARCHAR>(_data, _options)); |
247 | 141 | } else { |
248 | 0 | LOG(WARNING) << "invalid encoding type:" << _encoding_type; |
249 | 0 | return Status::Corruption("invalid encoding type:{}", _encoding_type); |
250 | 0 | } |
251 | | |
252 | 4.70k | RETURN_IF_ERROR(_data_page_decoder->init()); |
253 | 4.70k | _parsed = true; |
254 | 4.70k | return Status::OK(); |
255 | 4.70k | } |
256 | | |
257 | 4.70k | BinaryDictPageDecoder::~BinaryDictPageDecoder() {} |
258 | | |
259 | 850 | Status BinaryDictPageDecoder::seek_to_position_in_page(size_t pos) { |
260 | 850 | return _data_page_decoder->seek_to_position_in_page(pos); |
261 | 850 | } |
262 | | |
263 | 18.6k | bool BinaryDictPageDecoder::is_dict_encoding() const { |
264 | 18.6k | return _encoding_type == DICT_ENCODING; |
265 | 18.6k | } |
266 | | |
267 | 2.77k | void BinaryDictPageDecoder::set_dict_decoder(uint32_t num_dict_items, StringRef* dict_word_info) { |
268 | 2.77k | _num_dict_items = num_dict_items; |
269 | 2.77k | _dict_word_info = dict_word_info; |
270 | 2.77k | }; |
271 | | |
272 | 8.52k | Status BinaryDictPageDecoder::next_batch(size_t* n, MutableColumnPtr& dst) { |
273 | 8.52k | if (!is_dict_encoding()) { |
274 | 1.42k | dst = dst->convert_to_predicate_column_if_dictionary(); |
275 | 1.42k | return _data_page_decoder->next_batch(n, dst); |
276 | 1.42k | } |
277 | | // dictionary encoding |
278 | 8.52k | DCHECK(_parsed); |
279 | 7.10k | DCHECK(_dict_word_info != nullptr) << "_dict_word_info is nullptr"; |
280 | | |
281 | 7.10k | if (*n == 0 || _bit_shuffle_ptr->_cur_index >= _bit_shuffle_ptr->_num_elements) [[unlikely]] { |
282 | 0 | *n = 0; |
283 | 0 | return Status::OK(); |
284 | 0 | } |
285 | | |
286 | 7.10k | size_t max_fetch = std::min(*n, static_cast<size_t>(_bit_shuffle_ptr->_num_elements - |
287 | 7.10k | _bit_shuffle_ptr->_cur_index)); |
288 | 7.10k | *n = max_fetch; |
289 | | |
290 | 7.10k | if (_options.only_read_offsets) { |
291 | | // OFFSET_ONLY mode: resolve dict codes to get real string lengths |
292 | | // without copying actual char data. This allows length() to work. |
293 | 0 | const auto* data_array = reinterpret_cast<const int32_t*>(_bit_shuffle_ptr->get_data(0)); |
294 | 0 | size_t start_index = _bit_shuffle_ptr->_cur_index; |
295 | | // Reuse _buffer (int32_t vector) to store uint32_t lengths. |
296 | | // int32_t and uint32_t have the same size/alignment, and string |
297 | | // lengths are always non-negative, so the bit patterns are identical. |
298 | 0 | _buffer.resize(max_fetch); |
299 | 0 | for (size_t i = 0; i < max_fetch; ++i) { |
300 | 0 | int32_t codeword = data_array[start_index + i]; |
301 | 0 | _buffer[i] = static_cast<int32_t>(_dict_word_info[codeword].size); |
302 | 0 | } |
303 | 0 | dst->insert_offsets_from_lengths(reinterpret_cast<const uint32_t*>(_buffer.data()), |
304 | 0 | max_fetch); |
305 | 7.10k | } else { |
306 | 7.10k | const auto* data_array = reinterpret_cast<const int32_t*>(_bit_shuffle_ptr->get_data(0)); |
307 | 7.10k | size_t start_index = _bit_shuffle_ptr->_cur_index; |
308 | | |
309 | 7.10k | dst->insert_many_dict_data(data_array, start_index, _dict_word_info, max_fetch, |
310 | 7.10k | _num_dict_items); |
311 | 7.10k | } |
312 | | |
313 | 7.10k | _bit_shuffle_ptr->_cur_index += max_fetch; |
314 | | |
315 | 7.10k | return Status::OK(); |
316 | 7.10k | } |
317 | | |
318 | | Status BinaryDictPageDecoder::read_by_rowids(const rowid_t* rowids, ordinal_t page_first_ordinal, |
319 | 1.04k | size_t* n, MutableColumnPtr& dst) { |
320 | 1.04k | if (!is_dict_encoding()) { |
321 | 507 | dst = dst->convert_to_predicate_column_if_dictionary(); |
322 | 507 | return _data_page_decoder->read_by_rowids(rowids, page_first_ordinal, n, dst); |
323 | 507 | } |
324 | 1.04k | DCHECK(_parsed); |
325 | 534 | DCHECK(_dict_word_info != nullptr) << "_dict_word_info is nullptr"; |
326 | | |
327 | 534 | if (*n == 0) [[unlikely]] { |
328 | 0 | *n = 0; |
329 | 0 | return Status::OK(); |
330 | 0 | } |
331 | | |
332 | 534 | auto total = *n; |
333 | | |
334 | 534 | if (_options.only_read_offsets) { |
335 | | // OFFSET_ONLY mode: resolve dict codes to get real string lengths |
336 | | // without copying actual char data. This allows length() to work correctly. |
337 | 0 | const auto* data_array = reinterpret_cast<const int32_t*>(_bit_shuffle_ptr->get_data(0)); |
338 | 0 | size_t read_count = 0; |
339 | 0 | _buffer.resize(total); |
340 | 0 | for (size_t i = 0; i < total; ++i) { |
341 | 0 | ordinal_t ord = rowids[i] - page_first_ordinal; |
342 | 0 | if (ord >= _bit_shuffle_ptr->_num_elements) [[unlikely]] { |
343 | 0 | break; |
344 | 0 | } |
345 | 0 | int32_t codeword = data_array[ord]; |
346 | 0 | _buffer[read_count] = static_cast<int32_t>(_dict_word_info[codeword].size); |
347 | 0 | read_count++; |
348 | 0 | } |
349 | 0 | if (read_count > 0) { |
350 | 0 | dst->insert_offsets_from_lengths(reinterpret_cast<const uint32_t*>(_buffer.data()), |
351 | 0 | read_count); |
352 | 0 | } |
353 | 0 | *n = read_count; |
354 | 0 | return Status::OK(); |
355 | 0 | } |
356 | | |
357 | 534 | const auto* data_array = reinterpret_cast<const int32_t*>(_bit_shuffle_ptr->get_data(0)); |
358 | 534 | size_t read_count = 0; |
359 | 534 | _buffer.resize(total); |
360 | 24.6k | for (size_t i = 0; i < total; ++i) { |
361 | 24.1k | ordinal_t ord = rowids[i] - page_first_ordinal; |
362 | 24.1k | if (ord >= _bit_shuffle_ptr->_num_elements) [[unlikely]] { |
363 | 0 | break; |
364 | 0 | } |
365 | | |
366 | 24.1k | _buffer[read_count++] = data_array[ord]; |
367 | 24.1k | } |
368 | | |
369 | 534 | if (LIKELY(read_count > 0)) { |
370 | 534 | dst->insert_many_dict_data(_buffer.data(), 0, _dict_word_info, read_count, _num_dict_items); |
371 | 534 | } |
372 | 534 | *n = read_count; |
373 | 534 | return Status::OK(); |
374 | 534 | } |
375 | | |
376 | | } // namespace segment_v2 |
377 | | } // namespace doris |