be/src/storage/segment/rle_page.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include "common/cast_set.h" |
21 | | #include "storage/segment/options.h" // for PageBuilderOptions/PageDecoderOptions |
22 | | #include "storage/segment/page_builder.h" // for PageBuilder |
23 | | #include "storage/segment/page_decoder.h" // for PageDecoder |
24 | | #include "util/coding.h" // for encode_fixed32_le/decode_fixed32_le |
25 | | #include "util/rle_encoding.h" // for RleEncoder/RleDecoder |
26 | | #include "util/slice.h" // for OwnedSlice |
27 | | |
28 | | namespace doris { |
29 | | #include "common/compile_check_begin.h" |
30 | | namespace segment_v2 { |
31 | | |
32 | | enum { RLE_PAGE_HEADER_SIZE = 4 }; |
33 | | |
34 | | // RLE builder for generic integer and bool types. What is missing is some way |
35 | | // to enforce that this can only be instantiated for INT and BOOL types. |
36 | | // |
37 | | // The page format is as follows: |
38 | | // |
39 | | // 1. Header: (4 bytes total) |
40 | | // |
41 | | // <num_elements> [32-bit] |
42 | | // The number of elements encoded in the page. |
43 | | // |
44 | | // NOTE: all on-disk ints are encoded little-endian |
45 | | // |
46 | | // 2. Element data |
47 | | // |
48 | | // The header is followed by the rle-encoded element data. |
49 | | // |
50 | | // This Rle encoding algorithm is only effective for repeated INT type and bool type, |
51 | | // It is not good for sequence number or random number. BitshufflePage is recommended |
52 | | // for these case. |
53 | | // |
54 | | // TODO(hkp): optimize rle algorithm |
55 | | template <FieldType Type> |
56 | | class RlePageBuilder : public PageBuilderHelper<RlePageBuilder<Type> > { |
57 | | public: |
58 | | using Self = RlePageBuilder<Type>; |
59 | | friend class PageBuilderHelper<Self>; |
60 | | |
61 | 0 | Status init() override { |
62 | 0 | switch (Type) { |
63 | 0 | case FieldType::OLAP_FIELD_TYPE_BOOL: { |
64 | 0 | _bit_width = 1; |
65 | 0 | break; |
66 | 0 | } |
67 | 0 | default: { |
68 | 0 | _bit_width = SIZE_OF_TYPE * 8; |
69 | 0 | break; |
70 | 0 | } |
71 | 0 | } |
72 | 0 | _rle_encoder = new RleEncoder<CppType>(&_buf, _bit_width); |
73 | 0 | return reset(); |
74 | 0 | } |
75 | | |
76 | 0 | ~RlePageBuilder() { delete _rle_encoder; } |
77 | | |
78 | 0 | bool is_page_full() override { return _rle_encoder->len() >= _options.data_page_size; } |
79 | | |
80 | 0 | Status add(const uint8_t* vals, size_t* count) override { |
81 | 0 | DCHECK(!_finished); |
82 | 0 | auto new_vals = reinterpret_cast<const CppType*>(vals); |
83 | 0 | for (int i = 0; i < *count; ++i) { |
84 | | // note: vals is not guaranteed to be aligned for now, thus memcpy here |
85 | 0 | CppType value; |
86 | 0 | memcpy(&value, &new_vals[i], SIZE_OF_TYPE); |
87 | 0 | _rle_encoder->Put(value); |
88 | 0 | } |
89 | |
|
90 | 0 | if (_count == 0) { |
91 | 0 | memcpy(&_first_value, new_vals, SIZE_OF_TYPE); |
92 | 0 | } |
93 | 0 | memcpy(&_last_value, &new_vals[*count - 1], SIZE_OF_TYPE); |
94 | |
|
95 | 0 | _count += *count; |
96 | 0 | _raw_data_size += *count * SIZE_OF_TYPE; |
97 | 0 | return Status::OK(); |
98 | 0 | } |
99 | | |
100 | 0 | Status finish(OwnedSlice* slice) override { |
101 | 0 | DCHECK(!_finished); |
102 | 0 | _finished = true; |
103 | | // here should Flush first and then encode the count header |
104 | | // or it will lead to a bug if the header is less than 8 byte and the data is small |
105 | 0 | _rle_encoder->Flush(); |
106 | 0 | encode_fixed32_le(&_buf[0], cast_set<uint32_t>(_count)); |
107 | 0 | *slice = _buf.build(); |
108 | 0 | return Status::OK(); |
109 | 0 | } |
110 | | |
111 | 0 | Status reset() override { |
112 | 0 | RETURN_IF_CATCH_EXCEPTION({ |
113 | 0 | _count = 0; |
114 | 0 | _finished = false; |
115 | 0 | _raw_data_size = 0; |
116 | 0 | _rle_encoder->Clear(); |
117 | 0 | _rle_encoder->Reserve(RLE_PAGE_HEADER_SIZE, 0); |
118 | 0 | }); |
119 | 0 | return Status::OK(); |
120 | 0 | } |
121 | | |
122 | 0 | size_t count() const override { return _count; } |
123 | | |
124 | 0 | uint64_t size() const override { return _rle_encoder->len(); } |
125 | | |
126 | 0 | uint64_t get_raw_data_size() const override { return _raw_data_size; } |
127 | | |
128 | 0 | Status get_first_value(void* value) const override { |
129 | 0 | DCHECK(_finished); |
130 | 0 | if (_count == 0) { |
131 | 0 | return Status::Error<ErrorCode::ENTRY_NOT_FOUND>("page is empty"); |
132 | 0 | } |
133 | 0 | memcpy(value, &_first_value, SIZE_OF_TYPE); |
134 | 0 | return Status::OK(); |
135 | 0 | } |
136 | | |
137 | 0 | Status get_last_value(void* value) const override { |
138 | 0 | DCHECK(_finished); |
139 | 0 | if (_count == 0) { |
140 | 0 | return Status::Error<ErrorCode::ENTRY_NOT_FOUND>("page is empty"); |
141 | 0 | } |
142 | 0 | memcpy(value, &_last_value, SIZE_OF_TYPE); |
143 | 0 | return Status::OK(); |
144 | 0 | } |
145 | | |
146 | | private: |
147 | | RlePageBuilder(const PageBuilderOptions& options) |
148 | 0 | : _options(options), |
149 | 0 | _count(0), |
150 | 0 | _finished(false), |
151 | 0 | _bit_width(0), |
152 | 0 | _rle_encoder(nullptr) {} |
153 | | |
154 | | typedef typename TypeTraits<Type>::CppType CppType; |
155 | | enum { SIZE_OF_TYPE = TypeTraits<Type>::size }; |
156 | | |
157 | | PageBuilderOptions _options; |
158 | | size_t _count; |
159 | | bool _finished; |
160 | | int _bit_width; |
161 | | RleEncoder<CppType>* _rle_encoder = nullptr; |
162 | | faststring _buf; |
163 | | CppType _first_value; |
164 | | CppType _last_value; |
165 | | uint64_t _raw_data_size = 0; |
166 | | }; |
167 | | |
168 | | template <FieldType Type> |
169 | | class RlePageDecoder : public PageDecoder { |
170 | | public: |
171 | | RlePageDecoder(Slice slice, const PageDecoderOptions& options) |
172 | 0 | : _data(slice), |
173 | 0 | _options(options), |
174 | 0 | _parsed(false), |
175 | 0 | _num_elements(0), |
176 | 0 | _cur_index(0), |
177 | 0 | _bit_width(0) {} |
178 | | |
179 | 0 | Status init() override { |
180 | 0 | CHECK(!_parsed); |
181 | |
|
182 | 0 | if (_data.size < RLE_PAGE_HEADER_SIZE) { |
183 | 0 | return Status::Corruption("not enough bytes for header in RleBitMapBlockDecoder"); |
184 | 0 | } |
185 | 0 | _num_elements = decode_fixed32_le((const uint8_t*)&_data[0]); |
186 | |
|
187 | 0 | _parsed = true; |
188 | |
|
189 | 0 | switch (Type) { |
190 | 0 | case FieldType::OLAP_FIELD_TYPE_BOOL: { |
191 | 0 | _bit_width = 1; |
192 | 0 | break; |
193 | 0 | } |
194 | 0 | default: { |
195 | 0 | _bit_width = SIZE_OF_TYPE * 8; |
196 | 0 | break; |
197 | 0 | } |
198 | 0 | } |
199 | | |
200 | 0 | _rle_decoder = |
201 | 0 | RleDecoder<CppType>((uint8_t*)_data.data + RLE_PAGE_HEADER_SIZE, |
202 | 0 | cast_set<int>(_data.size - RLE_PAGE_HEADER_SIZE), _bit_width); |
203 | |
|
204 | 0 | RETURN_IF_ERROR(seek_to_position_in_page(0)); |
205 | 0 | return Status::OK(); |
206 | 0 | } |
207 | | |
208 | 0 | Status seek_to_position_in_page(size_t pos) override { |
209 | 0 | DCHECK(_parsed) << "Must call init()"; |
210 | 0 | DCHECK_LE(pos, _num_elements) |
211 | 0 | << "Tried to seek to " << pos << " which is > number of elements (" << _num_elements |
212 | 0 | << ") in the block!"; |
213 | | // If the block is empty (e.g. the column is filled with nulls), there is no data to seek. |
214 | 0 | if (_num_elements == 0) [[unlikely]] { |
215 | 0 | if (pos != 0) { |
216 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
217 | 0 | "seek pos {} is larger than total elements {}", pos, _num_elements); |
218 | 0 | } else { |
219 | 0 | return Status::OK(); |
220 | 0 | } |
221 | 0 | } |
222 | 0 | if (_cur_index == pos) { |
223 | | // No need to seek. |
224 | 0 | return Status::OK(); |
225 | 0 | } else if (_cur_index < pos) { |
226 | 0 | size_t nskip = pos - _cur_index; |
227 | 0 | _rle_decoder.Skip(nskip); |
228 | 0 | } else { |
229 | 0 | _rle_decoder = RleDecoder<CppType>((uint8_t*)_data.data + RLE_PAGE_HEADER_SIZE, |
230 | 0 | cast_set<int>(_data.size - RLE_PAGE_HEADER_SIZE), |
231 | 0 | _bit_width); |
232 | 0 | _rle_decoder.Skip(pos); |
233 | 0 | } |
234 | 0 | _cur_index = pos; |
235 | 0 | return Status::OK(); |
236 | 0 | } |
237 | | |
238 | 0 | Status next_batch(size_t* n, MutableColumnPtr& dst) override { |
239 | 0 | DCHECK(_parsed); |
240 | 0 | if (*n == 0 || _cur_index >= _num_elements) [[unlikely]] { |
241 | 0 | *n = 0; |
242 | 0 | return Status::OK(); |
243 | 0 | } |
244 | | |
245 | 0 | size_t to_fetch = std::min(*n, static_cast<size_t>(_num_elements - _cur_index)); |
246 | 0 | size_t remaining = to_fetch; |
247 | 0 | bool result = false; |
248 | 0 | CppType value; |
249 | 0 | while (remaining > 0) { |
250 | 0 | result = _rle_decoder.Get(&value); |
251 | 0 | DCHECK(result); |
252 | 0 | dst->insert_data((char*)(&value), SIZE_OF_TYPE); |
253 | 0 | remaining--; |
254 | 0 | } |
255 | |
|
256 | 0 | _cur_index += to_fetch; |
257 | 0 | *n = to_fetch; |
258 | 0 | return Status::OK(); |
259 | 0 | } |
260 | | |
261 | | Status read_by_rowids(const rowid_t* rowids, ordinal_t page_first_ordinal, size_t* n, |
262 | 0 | MutableColumnPtr& dst) override { |
263 | 0 | DCHECK(_parsed); |
264 | 0 | if (*n == 0 || _cur_index >= _num_elements) [[unlikely]] { |
265 | 0 | *n = 0; |
266 | 0 | return Status::OK(); |
267 | 0 | } |
268 | | |
269 | 0 | auto total = *n; |
270 | 0 | bool result = false; |
271 | 0 | size_t read_count = 0; |
272 | 0 | CppType value; |
273 | 0 | for (size_t i = 0; i < total; ++i) { |
274 | 0 | ordinal_t ord = rowids[i] - page_first_ordinal; |
275 | 0 | if (UNLIKELY(ord >= _num_elements)) { |
276 | 0 | *n = read_count; |
277 | 0 | return Status::OK(); |
278 | 0 | } |
279 | | |
280 | 0 | _rle_decoder.Skip(ord - _cur_index); |
281 | 0 | _cur_index = ord; |
282 | |
|
283 | 0 | result = _rle_decoder.Get(&value); |
284 | 0 | _cur_index++; |
285 | 0 | DCHECK(result); |
286 | 0 | dst->insert_data((char*)(&value), SIZE_OF_TYPE); |
287 | 0 | read_count++; |
288 | 0 | } |
289 | 0 | *n = read_count; |
290 | 0 | return Status::OK(); |
291 | 0 | } |
292 | | |
293 | 0 | size_t count() const override { return _num_elements; } |
294 | | |
295 | 0 | size_t current_index() const override { return _cur_index; } |
296 | | |
297 | | private: |
298 | | typedef typename TypeTraits<Type>::CppType CppType; |
299 | | enum { SIZE_OF_TYPE = TypeTraits<Type>::size }; |
300 | | |
301 | | Slice _data; |
302 | | PageDecoderOptions _options; |
303 | | bool _parsed; |
304 | | uint32_t _num_elements; |
305 | | size_t _cur_index; |
306 | | int _bit_width; |
307 | | RleDecoder<CppType> _rle_decoder; |
308 | | }; |
309 | | |
310 | | } // namespace segment_v2 |
311 | | #include "common/compile_check_end.h" |
312 | | } // namespace doris |