be/src/storage/segment/rle_page.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include "common/cast_set.h" |
21 | | #include "storage/segment/options.h" // for PageBuilderOptions/PageDecoderOptions |
22 | | #include "storage/segment/page_builder.h" // for PageBuilder |
23 | | #include "storage/segment/page_decoder.h" // for PageDecoder |
24 | | #include "util/coding.h" // for encode_fixed32_le/decode_fixed32_le |
25 | | #include "util/rle_encoding.h" // for RleEncoder/RleDecoder |
26 | | #include "util/slice.h" // for OwnedSlice |
27 | | |
28 | | namespace doris { |
29 | | namespace segment_v2 { |
30 | | |
31 | | enum { RLE_PAGE_HEADER_SIZE = 4 }; |
32 | | |
33 | | // RLE builder for generic integer and bool types. What is missing is some way |
34 | | // to enforce that this can only be instantiated for INT and BOOL types. |
35 | | // |
36 | | // The page format is as follows: |
37 | | // |
38 | | // 1. Header: (4 bytes total) |
39 | | // |
40 | | // <num_elements> [32-bit] |
41 | | // The number of elements encoded in the page. |
42 | | // |
43 | | // NOTE: all on-disk ints are encoded little-endian |
44 | | // |
45 | | // 2. Element data |
46 | | // |
47 | | // The header is followed by the rle-encoded element data. |
48 | | // |
49 | | // This Rle encoding algorithm is only effective for repeated INT type and bool type, |
50 | | // It is not good for sequence number or random number. BitshufflePage is recommended |
51 | | // for these case. |
52 | | // |
53 | | // TODO(hkp): optimize rle algorithm |
54 | | template <FieldType Type> |
55 | | class RlePageBuilder : public PageBuilderHelper<RlePageBuilder<Type> > { |
56 | | public: |
57 | | using Self = RlePageBuilder<Type>; |
58 | | friend class PageBuilderHelper<Self>; |
59 | | |
60 | 14.2k | Status init() override { |
61 | 14.2k | switch (Type) { |
62 | 14.2k | case FieldType::OLAP_FIELD_TYPE_BOOL: { |
63 | 14.2k | _bit_width = 1; |
64 | 14.2k | break; |
65 | 0 | } |
66 | 0 | default: { |
67 | 0 | _bit_width = SIZE_OF_TYPE * 8; |
68 | 0 | break; |
69 | 0 | } |
70 | 14.2k | } |
71 | 14.2k | _rle_encoder = new RleEncoder<CppType>(&_buf, _bit_width); |
72 | 14.2k | return reset(); |
73 | 14.2k | } |
74 | | |
75 | 14.2k | ~RlePageBuilder() { delete _rle_encoder; } |
76 | | |
77 | 188k | bool is_page_full() override { return _rle_encoder->len() >= _options.data_page_size; } |
78 | | |
79 | 188k | Status add(const uint8_t* vals, size_t* count) override { |
80 | 188k | DCHECK(!_finished); |
81 | 188k | auto new_vals = reinterpret_cast<const CppType*>(vals); |
82 | 4.33M | for (int i = 0; i < *count; ++i) { |
83 | | // note: vals is not guaranteed to be aligned for now, thus memcpy here |
84 | 4.14M | CppType value; |
85 | 4.14M | memcpy(&value, &new_vals[i], SIZE_OF_TYPE); |
86 | 4.14M | _rle_encoder->Put(value); |
87 | 4.14M | } |
88 | | |
89 | 188k | _count += *count; |
90 | 188k | _raw_data_size += *count * SIZE_OF_TYPE; |
91 | 188k | return Status::OK(); |
92 | 188k | } |
93 | | |
94 | 13.2k | Status finish(OwnedSlice* slice) override { |
95 | 13.2k | DCHECK(!_finished); |
96 | 13.2k | _finished = true; |
97 | | // here should Flush first and then encode the count header |
98 | | // or it will lead to a bug if the header is less than 8 byte and the data is small |
99 | 13.2k | _rle_encoder->Flush(); |
100 | 13.2k | encode_fixed32_le(&_buf[0], cast_set<uint32_t>(_count)); |
101 | 13.2k | *slice = _buf.build(); |
102 | 13.2k | return Status::OK(); |
103 | 13.2k | } |
104 | | |
105 | 27.4k | Status reset() override { |
106 | 27.4k | RETURN_IF_CATCH_EXCEPTION({ |
107 | 27.4k | _count = 0; |
108 | 27.4k | _finished = false; |
109 | 27.4k | _raw_data_size = 0; |
110 | 27.4k | _rle_encoder->Clear(); |
111 | 27.4k | _rle_encoder->Reserve(RLE_PAGE_HEADER_SIZE, 0); |
112 | 27.4k | }); |
113 | 27.4k | return Status::OK(); |
114 | 27.4k | } |
115 | | |
116 | 0 | size_t count() const override { return _count; } |
117 | | |
118 | 480 | uint64_t size() const override { return _rle_encoder->len(); } |
119 | | |
120 | 13.2k | uint64_t get_raw_data_size() const override { return _raw_data_size; } |
121 | | |
122 | | private: |
123 | | RlePageBuilder(const PageBuilderOptions& options) |
124 | 14.2k | : _options(options), |
125 | 14.2k | _count(0), |
126 | 14.2k | _finished(false), |
127 | 14.2k | _bit_width(0), |
128 | 14.2k | _rle_encoder(nullptr) {} |
129 | | |
130 | | typedef typename TypeTraits<Type>::CppType CppType; |
131 | | enum { SIZE_OF_TYPE = TypeTraits<Type>::size }; |
132 | | |
133 | | PageBuilderOptions _options; |
134 | | size_t _count; |
135 | | bool _finished; |
136 | | int _bit_width; |
137 | | RleEncoder<CppType>* _rle_encoder = nullptr; |
138 | | faststring _buf; |
139 | | uint64_t _raw_data_size = 0; |
140 | | }; |
141 | | |
142 | | template <FieldType Type> |
143 | | class RlePageDecoder : public PageDecoder { |
144 | | public: |
145 | | RlePageDecoder(Slice slice, const PageDecoderOptions& options) |
146 | 20.5k | : _data(slice), |
147 | 20.5k | _options(options), |
148 | 20.5k | _parsed(false), |
149 | 20.5k | _num_elements(0), |
150 | 20.5k | _cur_index(0), |
151 | 20.5k | _bit_width(0) {} |
152 | | |
153 | 20.5k | Status init() override { |
154 | 20.5k | CHECK(!_parsed); |
155 | | |
156 | 20.5k | if (_data.size < RLE_PAGE_HEADER_SIZE) { |
157 | 0 | return Status::Corruption("not enough bytes for header in RleBitMapBlockDecoder"); |
158 | 0 | } |
159 | 20.5k | _num_elements = decode_fixed32_le((const uint8_t*)&_data[0]); |
160 | | |
161 | 20.5k | _parsed = true; |
162 | | |
163 | 20.5k | switch (Type) { |
164 | 20.5k | case FieldType::OLAP_FIELD_TYPE_BOOL: { |
165 | 20.5k | _bit_width = 1; |
166 | 20.5k | break; |
167 | 0 | } |
168 | 0 | default: { |
169 | 0 | _bit_width = SIZE_OF_TYPE * 8; |
170 | 0 | break; |
171 | 0 | } |
172 | 20.5k | } |
173 | | |
174 | 20.5k | _rle_decoder = |
175 | 20.5k | RleDecoder<CppType>((uint8_t*)_data.data + RLE_PAGE_HEADER_SIZE, |
176 | 20.5k | cast_set<int>(_data.size - RLE_PAGE_HEADER_SIZE), _bit_width); |
177 | | |
178 | 20.5k | RETURN_IF_ERROR(seek_to_position_in_page(0)); |
179 | 20.5k | return Status::OK(); |
180 | 20.5k | } |
181 | | |
182 | 41.9k | Status seek_to_position_in_page(size_t pos) override { |
183 | 41.9k | DCHECK(_parsed) << "Must call init()"; |
184 | 41.9k | DCHECK_LE(pos, _num_elements) |
185 | 0 | << "Tried to seek to " << pos << " which is > number of elements (" << _num_elements |
186 | 0 | << ") in the block!"; |
187 | | // If the block is empty (e.g. the column is filled with nulls), there is no data to seek. |
188 | 41.9k | if (_num_elements == 0) [[unlikely]] { |
189 | 1.02k | if (pos != 0) { |
190 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
191 | 0 | "seek pos {} is larger than total elements {}", pos, _num_elements); |
192 | 1.02k | } else { |
193 | 1.02k | return Status::OK(); |
194 | 1.02k | } |
195 | 1.02k | } |
196 | 40.8k | if (_cur_index == pos) { |
197 | | // No need to seek. |
198 | 21.1k | return Status::OK(); |
199 | 21.1k | } else if (_cur_index < pos) { |
200 | 17.6k | size_t nskip = pos - _cur_index; |
201 | 17.6k | _rle_decoder.Skip(nskip); |
202 | 17.6k | } else { |
203 | 2.14k | _rle_decoder = RleDecoder<CppType>((uint8_t*)_data.data + RLE_PAGE_HEADER_SIZE, |
204 | 2.14k | cast_set<int>(_data.size - RLE_PAGE_HEADER_SIZE), |
205 | 2.14k | _bit_width); |
206 | 2.14k | _rle_decoder.Skip(pos); |
207 | 2.14k | } |
208 | 19.7k | _cur_index = pos; |
209 | 19.7k | return Status::OK(); |
210 | 40.8k | } |
211 | | |
212 | 20.4k | Status next_batch(size_t* n, MutableColumnPtr& dst) override { |
213 | 20.4k | DCHECK(_parsed); |
214 | 20.4k | if (*n == 0 || _cur_index >= _num_elements) [[unlikely]] { |
215 | 0 | *n = 0; |
216 | 0 | return Status::OK(); |
217 | 0 | } |
218 | | |
219 | 20.4k | size_t to_fetch = std::min(*n, static_cast<size_t>(_num_elements - _cur_index)); |
220 | 20.4k | size_t remaining = to_fetch; |
221 | 20.4k | bool result = false; |
222 | 20.4k | CppType value; |
223 | 6.01M | while (remaining > 0) { |
224 | 5.99M | result = _rle_decoder.Get(&value); |
225 | 5.99M | DCHECK(result); |
226 | 5.99M | dst->insert_data((char*)(&value), SIZE_OF_TYPE); |
227 | 5.99M | remaining--; |
228 | 5.99M | } |
229 | | |
230 | 20.4k | _cur_index += to_fetch; |
231 | 20.4k | *n = to_fetch; |
232 | 20.4k | return Status::OK(); |
233 | 20.4k | } |
234 | | |
235 | | Status read_by_rowids(const rowid_t* rowids, ordinal_t page_first_ordinal, size_t* n, |
236 | 8.74k | MutableColumnPtr& dst) override { |
237 | 8.74k | DCHECK(_parsed); |
238 | 8.74k | if (*n == 0 || _cur_index >= _num_elements) [[unlikely]] { |
239 | 0 | *n = 0; |
240 | 0 | return Status::OK(); |
241 | 0 | } |
242 | | |
243 | 8.74k | auto total = *n; |
244 | 8.74k | bool result = false; |
245 | 8.74k | size_t read_count = 0; |
246 | 8.74k | CppType value; |
247 | 835k | for (size_t i = 0; i < total; ++i) { |
248 | 826k | ordinal_t ord = rowids[i] - page_first_ordinal; |
249 | 826k | if (UNLIKELY(ord >= _num_elements)) { |
250 | 0 | *n = read_count; |
251 | 0 | return Status::OK(); |
252 | 0 | } |
253 | | |
254 | 826k | _rle_decoder.Skip(ord - _cur_index); |
255 | 826k | _cur_index = ord; |
256 | | |
257 | 826k | result = _rle_decoder.Get(&value); |
258 | 826k | _cur_index++; |
259 | 826k | DCHECK(result); |
260 | 826k | dst->insert_data((char*)(&value), SIZE_OF_TYPE); |
261 | 826k | read_count++; |
262 | 826k | } |
263 | 8.74k | *n = read_count; |
264 | 8.74k | return Status::OK(); |
265 | 8.74k | } |
266 | | |
267 | 0 | size_t count() const override { return _num_elements; } |
268 | | |
269 | 16.2k | size_t current_index() const override { return _cur_index; } |
270 | | |
271 | | private: |
272 | | typedef typename TypeTraits<Type>::CppType CppType; |
273 | | enum { SIZE_OF_TYPE = TypeTraits<Type>::size }; |
274 | | |
275 | | Slice _data; |
276 | | PageDecoderOptions _options; |
277 | | bool _parsed; |
278 | | uint32_t _num_elements; |
279 | | size_t _cur_index; |
280 | | int _bit_width; |
281 | | RleDecoder<CppType> _rle_decoder; |
282 | | }; |
283 | | |
284 | | } // namespace segment_v2 |
285 | | } // namespace doris |