be/src/storage/segment/binary_dict_page.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/segment/binary_dict_page.h" |
19 | | |
20 | | #include <gen_cpp/olap_file.pb.h> |
21 | | #include <gen_cpp/segment_v2.pb.h> |
22 | | |
23 | | #include <algorithm> |
24 | | #include <ostream> |
25 | | #include <utility> |
26 | | |
27 | | #include "common/compiler_util.h" // IWYU pragma: keep |
28 | | #include "common/config.h" |
29 | | #include "common/logging.h" |
30 | | #include "common/status.h" |
31 | | #include "core/column/column.h" |
32 | | #include "storage/segment/binary_plain_page_v2.h" |
33 | | #include "storage/segment/bitshuffle_page.h" |
34 | | #include "storage/segment/encoding_info.h" |
35 | | #include "util/coding.h" |
36 | | #include "util/slice.h" // for Slice |
37 | | |
38 | | namespace doris { |
39 | | #include "common/compile_check_begin.h" |
40 | | struct StringRef; |
41 | | |
42 | | namespace segment_v2 { |
43 | | |
44 | | BinaryDictPageBuilder::BinaryDictPageBuilder(const PageBuilderOptions& options) |
45 | 3.85k | : _options(options), |
46 | 3.85k | _finished(false), |
47 | 3.85k | _data_page_builder(nullptr), |
48 | 3.85k | _dict_builder(nullptr), |
49 | 3.85k | _encoding_type(DICT_ENCODING), |
50 | | _dict_word_page_encoding_type( |
51 | 3.85k | options.encoding_preference.binary_plain_encoding_default_impl == |
52 | 3.85k | BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2 |
53 | 3.85k | ? PLAIN_ENCODING_V2 |
54 | 3.85k | : PLAIN_ENCODING), |
55 | | _fallback_binary_encoding_type( |
56 | 3.85k | options.encoding_preference.binary_plain_encoding_default_impl == |
57 | 3.85k | BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2 |
58 | 3.85k | ? PLAIN_ENCODING_V2 |
59 | 3.85k | : PLAIN_ENCODING) {} |
60 | | |
61 | 3.85k | Status BinaryDictPageBuilder::init() { |
62 | | // initially use DICT_ENCODING |
63 | | // TODO: the data page builder type can be created by Factory according to user config |
64 | 3.85k | PageBuilder* data_page_builder_ptr = nullptr; |
65 | 3.85k | RETURN_IF_ERROR(BitshufflePageBuilder<FieldType::OLAP_FIELD_TYPE_INT>::create( |
66 | 3.85k | &data_page_builder_ptr, _options)); |
67 | 3.85k | _data_page_builder.reset(data_page_builder_ptr); |
68 | 3.85k | PageBuilderOptions dict_builder_options; |
69 | | // here the binary plain page is used to store the dictionary items so |
70 | | // the data page size is set to the same as the dict page size |
71 | 3.85k | dict_builder_options.data_page_size = _options.dict_page_size; |
72 | 3.85k | dict_builder_options.dict_page_size = _options.dict_page_size; |
73 | 3.85k | dict_builder_options.is_dict_page = true; |
74 | | |
75 | 3.85k | const EncodingInfo* encoding_info; |
76 | 3.85k | RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR, |
77 | 3.85k | _dict_word_page_encoding_type, {}, &encoding_info)); |
78 | 3.85k | RETURN_IF_ERROR(encoding_info->create_page_builder(dict_builder_options, _dict_builder)); |
79 | 3.85k | return reset(); |
80 | 3.85k | } |
81 | | |
82 | 1.05M | bool BinaryDictPageBuilder::is_page_full() { |
83 | 1.05M | if (_data_page_builder->is_page_full()) { |
84 | 2.00k | return true; |
85 | 2.00k | } |
86 | 1.05M | if (_encoding_type == DICT_ENCODING && _dict_builder->is_page_full()) { |
87 | 3.13k | return true; |
88 | 3.13k | } |
89 | 1.05M | return false; |
90 | 1.05M | } |
91 | | |
92 | 219k | Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) { |
93 | 219k | if (_encoding_type == DICT_ENCODING) { |
94 | 16.3k | DCHECK(!_finished); |
95 | 16.3k | DCHECK_GT(*count, 0); |
96 | 16.3k | const Slice* src = reinterpret_cast<const Slice*>(vals); |
97 | 16.3k | size_t num_added = 0; |
98 | 16.3k | uint32_t value_code = -1; |
99 | 16.3k | auto* actual_builder = dynamic_cast<BitshufflePageBuilder<FieldType::OLAP_FIELD_TYPE_INT>*>( |
100 | 16.3k | _data_page_builder.get()); |
101 | | |
102 | 16.3k | if (_data_page_builder->count() == 0) { |
103 | 3.82k | _first_value.assign_copy(reinterpret_cast<const uint8_t*>(src->get_data()), |
104 | 3.82k | src->get_size()); |
105 | 3.82k | } |
106 | | |
107 | 855k | for (int i = 0; i < *count; ++i, ++src) { |
108 | 840k | if (is_page_full()) { |
109 | 1.56k | break; |
110 | 1.56k | } |
111 | | |
112 | 838k | if (src->empty() && _has_empty) { |
113 | 87.6k | value_code = _empty_code; |
114 | 751k | } else if (auto iter = _dictionary.find(*src); iter != _dictionary.end()) { |
115 | 624k | value_code = iter->second; |
116 | 624k | } else { |
117 | 126k | Slice dict_item(src->data, src->size); |
118 | 126k | if (src->size > 0) { |
119 | 126k | char* item_mem = _arena.alloc(src->size); |
120 | 126k | if (item_mem == nullptr) { |
121 | 0 | return Status::MemoryAllocFailed("memory allocate failed, size:{}", |
122 | 0 | src->size); |
123 | 0 | } |
124 | 126k | dict_item.relocate(item_mem); |
125 | 126k | } |
126 | 126k | value_code = cast_set<uint32_t>(_dictionary.size()); |
127 | 126k | size_t add_count = 1; |
128 | 126k | RETURN_IF_ERROR(_dict_builder->add(reinterpret_cast<const uint8_t*>(&dict_item), |
129 | 126k | &add_count)); |
130 | 126k | if (add_count == 0) { |
131 | | // current dict page is full, stop processing remaining inputs |
132 | 0 | break; |
133 | 0 | } |
134 | 126k | _dictionary.emplace(dict_item, value_code); |
135 | 126k | if (src->empty()) { |
136 | 315 | _has_empty = true; |
137 | 315 | _empty_code = value_code; |
138 | 315 | } |
139 | 126k | } |
140 | 838k | size_t add_count = 1; |
141 | 838k | RETURN_IF_ERROR(actual_builder->single_add( |
142 | 838k | reinterpret_cast<const uint8_t*>(&value_code), &add_count)); |
143 | 838k | if (add_count == 0) { |
144 | | // current data page is full, stop processing remaining inputs |
145 | 0 | break; |
146 | 0 | } |
147 | | // Track raw data size: the original string size |
148 | 838k | _raw_data_size += src->size; |
149 | 838k | num_added += 1; |
150 | 838k | } |
151 | 16.3k | *count = num_added; |
152 | 16.3k | return Status::OK(); |
153 | 203k | } else { |
154 | 203k | DCHECK(_encoding_type == PLAIN_ENCODING || _encoding_type == PLAIN_ENCODING_V2); |
155 | 203k | RETURN_IF_ERROR(_data_page_builder->add(vals, count)); |
156 | | // For plain encoding, track raw data size from the input |
157 | 203k | const Slice* src = reinterpret_cast<const Slice*>(vals); |
158 | 443k | for (size_t i = 0; i < *count; ++i) { |
159 | 240k | _raw_data_size += src[i].size; |
160 | 240k | } |
161 | 203k | return Status::OK(); |
162 | 203k | } |
163 | 219k | } |
164 | | |
165 | 7.35k | Status BinaryDictPageBuilder::finish(OwnedSlice* slice) { |
166 | 7.35k | if (VLOG_DEBUG_IS_ON && _encoding_type == DICT_ENCODING) { |
167 | 0 | VLOG_DEBUG << "dict page size:" << _dict_builder->size(); |
168 | 0 | } |
169 | | |
170 | 7.35k | DCHECK(!_finished); |
171 | 7.35k | _finished = true; |
172 | | |
173 | 7.35k | OwnedSlice data_slice; |
174 | 7.35k | RETURN_IF_ERROR(_data_page_builder->finish(&data_slice)); |
175 | | // TODO(gaodayue) separate page header and content to avoid this copy |
176 | 7.35k | RETURN_IF_CATCH_EXCEPTION( |
177 | 7.35k | { _buffer.append(data_slice.slice().data, data_slice.slice().size); }); |
178 | 7.35k | encode_fixed32_le(&_buffer[0], _encoding_type); |
179 | 7.35k | *slice = _buffer.build(); |
180 | 7.35k | return Status::OK(); |
181 | 7.35k | } |
182 | | |
183 | 11.2k | Status BinaryDictPageBuilder::reset() { |
184 | 11.2k | RETURN_IF_CATCH_EXCEPTION({ |
185 | 11.2k | _finished = false; |
186 | 11.2k | _raw_data_size = 0; |
187 | 11.2k | _buffer.reserve(_options.data_page_size + BINARY_DICT_PAGE_HEADER_SIZE); |
188 | 11.2k | _buffer.resize(BINARY_DICT_PAGE_HEADER_SIZE); |
189 | | |
190 | 11.2k | if (_encoding_type == DICT_ENCODING && _dict_builder->is_page_full()) { |
191 | 11.2k | DCHECK(_fallback_binary_encoding_type == PLAIN_ENCODING || |
192 | 11.2k | _fallback_binary_encoding_type == PLAIN_ENCODING_V2); |
193 | 11.2k | const EncodingInfo* encoding_info; |
194 | 11.2k | RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR, |
195 | 11.2k | _fallback_binary_encoding_type, {}, &encoding_info)); |
196 | 11.2k | RETURN_IF_ERROR(encoding_info->create_page_builder(_options, _data_page_builder)); |
197 | 11.2k | _encoding_type = _fallback_binary_encoding_type; |
198 | 11.2k | } else { |
199 | 11.2k | RETURN_IF_ERROR(_data_page_builder->reset()); |
200 | 11.2k | } |
201 | 11.2k | }); |
202 | 11.2k | return Status::OK(); |
203 | 11.2k | } |
204 | | |
205 | 3 | size_t BinaryDictPageBuilder::count() const { |
206 | 3 | return _data_page_builder->count(); |
207 | 3 | } |
208 | | |
209 | 3.48k | uint64_t BinaryDictPageBuilder::size() const { |
210 | 3.48k | return _arena.used_size() + _data_page_builder->size(); |
211 | 3.48k | } |
212 | | |
213 | 3.82k | Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) { |
214 | 3.82k | return _dict_builder->finish(dictionary_page); |
215 | 3.82k | } |
216 | | |
217 | 3.82k | Status BinaryDictPageBuilder::get_dictionary_page_encoding(EncodingTypePB* encoding) const { |
218 | 3.82k | *encoding = _dict_word_page_encoding_type; |
219 | 3.82k | return Status::OK(); |
220 | 3.82k | } |
221 | | |
222 | 3 | Status BinaryDictPageBuilder::get_first_value(void* value) const { |
223 | 3 | DCHECK(_finished); |
224 | 3 | if (_data_page_builder->count() == 0) { |
225 | 0 | return Status::Error<ErrorCode::ENTRY_NOT_FOUND>("page is empty"); |
226 | 0 | } |
227 | 3 | if (_encoding_type != DICT_ENCODING) { |
228 | 0 | return _data_page_builder->get_first_value(value); |
229 | 0 | } |
230 | 3 | *reinterpret_cast<Slice*>(value) = Slice(_first_value); |
231 | 3 | return Status::OK(); |
232 | 3 | } |
233 | | |
234 | 3 | Status BinaryDictPageBuilder::get_last_value(void* value) const { |
235 | 3 | DCHECK(_finished); |
236 | 3 | if (_data_page_builder->count() == 0) { |
237 | 0 | return Status::Error<ErrorCode::ENTRY_NOT_FOUND>("page is empty"); |
238 | 0 | } |
239 | 3 | if (_encoding_type != DICT_ENCODING) { |
240 | 0 | return _data_page_builder->get_last_value(value); |
241 | 0 | } |
242 | 3 | uint32_t value_code; |
243 | 3 | RETURN_IF_ERROR(_data_page_builder->get_last_value(&value_code)); |
244 | 3 | RETURN_IF_ERROR(_dict_builder->get_dict_word(value_code, reinterpret_cast<Slice*>(value))); |
245 | 3 | return Status::OK(); |
246 | 3 | } |
247 | | |
248 | 7.25k | uint64_t BinaryDictPageBuilder::get_raw_data_size() const { |
249 | 7.25k | return _raw_data_size; |
250 | 7.25k | } |
251 | | |
252 | | BinaryDictPageDecoder::BinaryDictPageDecoder(Slice data, const PageDecoderOptions& options) |
253 | 4.70k | : _data(data), |
254 | 4.70k | _options(options), |
255 | 4.70k | _data_page_decoder(nullptr), |
256 | 4.70k | _parsed(false), |
257 | 4.70k | _encoding_type(UNKNOWN_ENCODING) {} |
258 | | |
259 | 4.70k | Status BinaryDictPageDecoder::init() { |
260 | 4.70k | CHECK(!_parsed); |
261 | 4.70k | if (_data.size < BINARY_DICT_PAGE_HEADER_SIZE) { |
262 | 0 | return Status::Corruption("invalid data size:{}, header size:{}", _data.size, |
263 | 0 | BINARY_DICT_PAGE_HEADER_SIZE); |
264 | 0 | } |
265 | 4.70k | size_t type = decode_fixed32_le((const uint8_t*)&_data.data[0]); |
266 | 4.70k | _encoding_type = static_cast<EncodingTypePB>(type); |
267 | 4.70k | _data.remove_prefix(BINARY_DICT_PAGE_HEADER_SIZE); |
268 | 4.70k | if (_encoding_type == DICT_ENCODING) { |
269 | 2.77k | _data_page_decoder.reset( |
270 | 2.77k | _bit_shuffle_ptr = |
271 | 2.77k | new BitShufflePageDecoder<FieldType::OLAP_FIELD_TYPE_INT>(_data, _options)); |
272 | 2.77k | } else if (_encoding_type == PLAIN_ENCODING) { |
273 | 1.78k | _data_page_decoder.reset( |
274 | 1.78k | new BinaryPlainPageDecoder<FieldType::OLAP_FIELD_TYPE_VARCHAR>(_data, _options)); |
275 | 1.78k | } else if (_encoding_type == PLAIN_ENCODING_V2) { |
276 | 141 | _data_page_decoder.reset( |
277 | 141 | new BinaryPlainPageV2Decoder<FieldType::OLAP_FIELD_TYPE_VARCHAR>(_data, _options)); |
278 | 141 | } else { |
279 | 0 | LOG(WARNING) << "invalid encoding type:" << _encoding_type; |
280 | 0 | return Status::Corruption("invalid encoding type:{}", _encoding_type); |
281 | 0 | } |
282 | | |
283 | 4.70k | RETURN_IF_ERROR(_data_page_decoder->init()); |
284 | 4.70k | _parsed = true; |
285 | 4.70k | return Status::OK(); |
286 | 4.70k | } |
287 | | |
288 | 4.70k | BinaryDictPageDecoder::~BinaryDictPageDecoder() {} |
289 | | |
290 | 825 | Status BinaryDictPageDecoder::seek_to_position_in_page(size_t pos) { |
291 | 825 | return _data_page_decoder->seek_to_position_in_page(pos); |
292 | 825 | } |
293 | | |
294 | 18.7k | bool BinaryDictPageDecoder::is_dict_encoding() const { |
295 | 18.7k | return _encoding_type == DICT_ENCODING; |
296 | 18.7k | } |
297 | | |
298 | 2.77k | void BinaryDictPageDecoder::set_dict_decoder(uint32_t num_dict_items, StringRef* dict_word_info) { |
299 | 2.77k | _num_dict_items = num_dict_items; |
300 | 2.77k | _dict_word_info = dict_word_info; |
301 | 2.77k | }; |
302 | | |
303 | 8.49k | Status BinaryDictPageDecoder::next_batch(size_t* n, MutableColumnPtr& dst) { |
304 | 8.49k | if (!is_dict_encoding()) { |
305 | 1.42k | dst = dst->convert_to_predicate_column_if_dictionary(); |
306 | 1.42k | return _data_page_decoder->next_batch(n, dst); |
307 | 1.42k | } |
308 | | // dictionary encoding |
309 | 8.49k | DCHECK(_parsed); |
310 | 7.07k | DCHECK(_dict_word_info != nullptr) << "_dict_word_info is nullptr"; |
311 | | |
312 | 7.07k | if (*n == 0 || _bit_shuffle_ptr->_cur_index >= _bit_shuffle_ptr->_num_elements) [[unlikely]] { |
313 | 0 | *n = 0; |
314 | 0 | return Status::OK(); |
315 | 0 | } |
316 | | |
317 | 7.07k | size_t max_fetch = std::min(*n, static_cast<size_t>(_bit_shuffle_ptr->_num_elements - |
318 | 7.07k | _bit_shuffle_ptr->_cur_index)); |
319 | 7.07k | *n = max_fetch; |
320 | | |
321 | 7.07k | const auto* data_array = reinterpret_cast<const int32_t*>(_bit_shuffle_ptr->get_data(0)); |
322 | 7.07k | size_t start_index = _bit_shuffle_ptr->_cur_index; |
323 | | |
324 | 7.07k | dst->insert_many_dict_data(data_array, start_index, _dict_word_info, max_fetch, |
325 | 7.07k | _num_dict_items); |
326 | | |
327 | 7.07k | _bit_shuffle_ptr->_cur_index += max_fetch; |
328 | | |
329 | 7.07k | return Status::OK(); |
330 | 7.07k | } |
331 | | |
332 | | Status BinaryDictPageDecoder::read_by_rowids(const rowid_t* rowids, ordinal_t page_first_ordinal, |
333 | 1.13k | size_t* n, MutableColumnPtr& dst) { |
334 | 1.13k | if (!is_dict_encoding()) { |
335 | 507 | dst = dst->convert_to_predicate_column_if_dictionary(); |
336 | 507 | return _data_page_decoder->read_by_rowids(rowids, page_first_ordinal, n, dst); |
337 | 507 | } |
338 | 1.13k | DCHECK(_parsed); |
339 | 626 | DCHECK(_dict_word_info != nullptr) << "_dict_word_info is nullptr"; |
340 | | |
341 | 626 | if (*n == 0) [[unlikely]] { |
342 | 0 | *n = 0; |
343 | 0 | return Status::OK(); |
344 | 0 | } |
345 | | |
346 | 626 | const auto* data_array = reinterpret_cast<const int32_t*>(_bit_shuffle_ptr->get_data(0)); |
347 | 626 | auto total = *n; |
348 | 626 | size_t read_count = 0; |
349 | 626 | _buffer.resize(total); |
350 | 25.6k | for (size_t i = 0; i < total; ++i) { |
351 | 25.0k | ordinal_t ord = rowids[i] - page_first_ordinal; |
352 | 25.0k | if (ord >= _bit_shuffle_ptr->_num_elements) [[unlikely]] { |
353 | 0 | break; |
354 | 0 | } |
355 | | |
356 | 25.0k | _buffer[read_count++] = data_array[ord]; |
357 | 25.0k | } |
358 | | |
359 | 626 | if (LIKELY(read_count > 0)) { |
360 | 626 | dst->insert_many_dict_data(_buffer.data(), 0, _dict_word_info, read_count, _num_dict_items); |
361 | 626 | } |
362 | 626 | *n = read_count; |
363 | 626 | return Status::OK(); |
364 | 626 | } |
365 | | |
366 | | #include "common/compile_check_end.h" |
367 | | } // namespace segment_v2 |
368 | | } // namespace doris |