be/src/util/rle_encoding.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | #pragma once |
18 | | |
19 | | #include <glog/logging.h> |
20 | | |
21 | | #include <limits> // IWYU pragma: keep |
22 | | |
23 | | #include "common/cast_set.h" |
24 | | #include "util/bit_stream_utils.inline.h" |
25 | | #include "util/bit_util.h" |
26 | | |
27 | | namespace doris { |
28 | | #include "common/compile_check_begin.h" |
29 | | |
30 | | // Utility classes to do run length encoding (RLE) for fixed bit width values. If runs |
31 | | // are sufficiently long, RLE is used, otherwise, the values are just bit-packed |
32 | | // (literal encoding). |
33 | | // For both types of runs, there is a byte-aligned indicator which encodes the length |
34 | | // of the run and the type of the run. |
35 | | // This encoding has the benefit that when there aren't any long enough runs, values |
36 | | // are always decoded at fixed (can be precomputed) bit offsets OR both the value and |
37 | | // the run length are byte aligned. This allows for very efficient decoding |
38 | | // implementations. |
39 | | // The encoding is: |
40 | | // encoded-block := run* |
41 | | // run := literal-run | repeated-run |
42 | | // literal-run := literal-indicator < literal bytes > |
43 | | // repeated-run := repeated-indicator < repeated value. padded to byte boundary > |
44 | | // literal-indicator := varint_encode( number_of_groups << 1 | 1) |
45 | | // repeated-indicator := varint_encode( number_of_repetitions << 1 ) |
46 | | // |
47 | | // Each run is preceded by a varint. The varint's least significant bit is |
48 | | // used to indicate whether the run is a literal run or a repeated run. The rest |
49 | | // of the varint is used to determine the length of the run (eg how many times the |
50 | | // value repeats). |
51 | | // |
52 | | // In the case of literal runs, the run length is always a multiple of 8 (i.e. encode |
53 | | // in groups of 8), so that no matter the bit-width of the value, the sequence will end |
54 | | // on a byte boundary without padding. |
55 | | // Given that we know it is a multiple of 8, we store the number of 8-groups rather than |
56 | | // the actual number of encoded ints. (This means that the total number of encoded values |
57 | | // can not be determined from the encoded data, since the number of values in the last |
58 | | // group may not be a multiple of 8). |
59 | | // There is a break-even point when it is more storage efficient to do run length |
60 | | // encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes |
61 | | // for both the repeated encoding or the literal encoding. This value can always |
62 | | // be computed based on the bit-width. |
63 | | // TODO: think about how to use this for strings. The bit packing isn't quite the same. |
64 | | // |
65 | | // Examples with bit-width 1 (eg encoding booleans): |
66 | | // ---------------------------------------- |
67 | | // 100 1s followed by 100 0s: |
68 | | // <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte> |
69 | | // - (total 4 bytes) |
70 | | // |
71 | | // alternating 1s and 0s (200 total): |
72 | | // 200 ints = 25 groups of 8 |
73 | | // <varint((25 << 1) | 1)> <25 bytes of values, bitpacked> |
74 | | // (total 26 bytes, 1 byte overhead) |
75 | | // |
76 | | |
77 | | // Decoder class for RLE encoded data. |
78 | | // |
79 | | // NOTE: the encoded format does not have any length prefix or any other way of |
80 | | // indicating that the encoded sequence ends at a certain point, so the Decoder |
81 | | // methods may return some extra bits at the end before the read methods start |
82 | | // to return 0/false. |
83 | | template <typename T> |
84 | | class RleDecoder { |
85 | | public: |
86 | | // Create a decoder object. buffer/buffer_len is the decoded data. |
87 | | // bit_width is the width of each value (before encoding). |
88 | | RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width) |
89 | 495k | : bit_reader_(buffer, buffer_len), |
90 | 495k | bit_width_(bit_width), |
91 | 495k | current_value_(0), |
92 | 495k | repeat_count_(0), |
93 | 495k | literal_count_(0), |
94 | 495k | rewind_state_(CANT_REWIND) { |
95 | 495k | DCHECK_GE(bit_width_, 1); |
96 | 495k | DCHECK_LE(bit_width_, 64); |
97 | 495k | } _ZN5doris10RleDecoderIbEC2EPKhii Line | Count | Source | 89 | 472k | : bit_reader_(buffer, buffer_len), | 90 | 472k | bit_width_(bit_width), | 91 | 472k | current_value_(0), | 92 | 472k | repeat_count_(0), | 93 | 472k | literal_count_(0), | 94 | 472k | rewind_state_(CANT_REWIND) { | 95 | 472k | DCHECK_GE(bit_width_, 1); | 96 | | DCHECK_LE(bit_width_, 64); | 97 | 472k | } |
_ZN5doris10RleDecoderIhEC2EPKhii Line | Count | Source | 89 | 21.4k | : bit_reader_(buffer, buffer_len), | 90 | 21.4k | bit_width_(bit_width), | 91 | 21.4k | current_value_(0), | 92 | 21.4k | repeat_count_(0), | 93 | 21.4k | literal_count_(0), | 94 | 21.4k | rewind_state_(CANT_REWIND) { | 95 | 21.4k | DCHECK_GE(bit_width_, 1); | 96 | | DCHECK_LE(bit_width_, 64); | 97 | 21.4k | } |
_ZN5doris10RleDecoderIsEC2EPKhii Line | Count | Source | 89 | 1.51k | : bit_reader_(buffer, buffer_len), | 90 | 1.51k | bit_width_(bit_width), | 91 | 1.51k | current_value_(0), | 92 | 1.51k | repeat_count_(0), | 93 | 1.51k | literal_count_(0), | 94 | 1.51k | rewind_state_(CANT_REWIND) { | 95 | 1.51k | DCHECK_GE(bit_width_, 1); | 96 | | DCHECK_LE(bit_width_, 64); | 97 | 1.51k | } |
|
98 | | |
99 | 29.3M | RleDecoder() {}_ZN5doris10RleDecoderIbEC2Ev Line | Count | Source | 99 | 29.3M | RleDecoder() {} |
_ZN5doris10RleDecoderIhEC2Ev Line | Count | Source | 99 | 19.3k | RleDecoder() {} |
_ZN5doris10RleDecoderIsEC2Ev Line | Count | Source | 99 | 2.34k | RleDecoder() {} |
|
100 | | |
101 | | // Skip n values, and returns the number of non-zero entries skipped. |
102 | | size_t Skip(size_t to_skip); |
103 | | |
104 | | // Gets the next value. Returns false if there are no more. |
105 | | bool Get(T* val); |
106 | | |
107 | | // Seek to the previous value. |
108 | | void RewindOne(); |
109 | | |
110 | | // Gets the next run of the same 'val'. Returns 0 if there is no |
111 | | // more data to be decoded. Will return a run of at most 'max_run' |
112 | | // values. If there are more values than this, the next call to |
113 | | // GetNextRun will return more from the same run. |
114 | | size_t GetNextRun(T* val, size_t max_run); |
115 | | |
116 | | size_t get_values(T* values, size_t num_values); |
117 | | |
118 | | // Get the count of current repeated value |
119 | | size_t repeated_count(); |
120 | | |
121 | | // Get current repeated value, make sure that count equals repeated_count() |
122 | | T get_repeated_value(size_t count); |
123 | | |
124 | 0 | const BitReader& bit_reader() const { return bit_reader_; } |
125 | | |
126 | | private: |
127 | | bool ReadHeader(); |
128 | | |
129 | | enum RewindState { REWIND_LITERAL, REWIND_RUN, CANT_REWIND }; |
130 | | |
131 | | BitReader bit_reader_; |
132 | | int bit_width_; |
133 | | uint64_t current_value_; |
134 | | uint32_t repeat_count_; |
135 | | uint32_t literal_count_; |
136 | | RewindState rewind_state_; |
137 | | }; |
138 | | |
139 | | // Class to incrementally build the rle data. |
140 | | // The encoding has two modes: encoding repeated runs and literal runs. |
141 | | // If the run is sufficiently short, it is more efficient to encode as a literal run. |
142 | | // This class does so by buffering 8 values at a time. If they are not all the same |
143 | | // they are added to the literal run. If they are the same, they are added to the |
144 | | // repeated run. When we switch modes, the previous run is flushed out. |
145 | | template <typename T> |
146 | | class RleEncoder { |
147 | | public: |
148 | | // buffer: buffer to write bits to. |
149 | | // bit_width: max number of bits for value. |
150 | | // TODO: consider adding a min_repeated_run_length so the caller can control |
151 | | // when values should be encoded as repeated runs. Currently this is derived |
152 | | // based on the bit_width, which can determine a storage optimal choice. |
153 | | explicit RleEncoder(faststring* buffer, int bit_width) |
154 | 523k | : bit_width_(bit_width), bit_writer_(buffer) { |
155 | 523k | DCHECK_GE(bit_width_, 1); |
156 | 523k | DCHECK_LE(bit_width_, 64); |
157 | 523k | Clear(); |
158 | 523k | } _ZN5doris10RleEncoderIhEC2EPNS_10faststringEi Line | Count | Source | 154 | 14.0k | : bit_width_(bit_width), bit_writer_(buffer) { | 155 | 14.0k | DCHECK_GE(bit_width_, 1); | 156 | | DCHECK_LE(bit_width_, 64); | 157 | 14.0k | Clear(); | 158 | 14.0k | } |
_ZN5doris10RleEncoderIbEC2EPNS_10faststringEi Line | Count | Source | 154 | 509k | : bit_width_(bit_width), bit_writer_(buffer) { | 155 | 509k | DCHECK_GE(bit_width_, 1); | 156 | | DCHECK_LE(bit_width_, 64); | 157 | 509k | Clear(); | 158 | 509k | } |
|
159 | | |
160 | | // Reserve 'num_bytes' bytes for a plain encoded header, set each |
161 | | // byte with 'val': this is used for the RLE-encoded data blocks in |
162 | | // order to be able to able to store the initial ordinal position |
163 | | // and number of elements. This is a part of RleEncoder in order to |
164 | | // maintain the correct offset in 'buffer'. |
165 | | void Reserve(int num_bytes, uint8_t val); |
166 | | |
167 | | // Encode value. This value must be representable with bit_width_ bits. |
168 | | void Put(T value, size_t run_length = 1); |
169 | | |
170 | | // Flushes any pending values to the underlying buffer. |
171 | | // Returns the total number of bytes written |
172 | | int Flush(); |
173 | | |
174 | | // Resets all the state in the encoder. |
175 | | void Clear(); |
176 | | |
177 | 147k | int32_t len() const { return bit_writer_.bytes_written(); } |
178 | | |
179 | | private: |
180 | | // Flushes any buffered values. If this is part of a repeated run, this is largely |
181 | | // a no-op. |
182 | | // If it is part of a literal run, this will call FlushLiteralRun, which writes |
183 | | // out the buffered literal values. |
184 | | // If 'done' is true, the current run would be written even if it would normally |
185 | | // have been buffered more. This should only be called at the end, when the |
186 | | // encoder has received all values even if it would normally continue to be |
187 | | // buffered. |
188 | | void FlushBufferedValues(bool done); |
189 | | |
190 | | // Flushes literal values to the underlying buffer. If update_indicator_byte, |
191 | | // then the current literal run is complete and the indicator byte is updated. |
192 | | void FlushLiteralRun(bool update_indicator_byte); |
193 | | |
194 | | // Flushes a repeated run to the underlying buffer. |
195 | | void FlushRepeatedRun(); |
196 | | |
197 | | // Number of bits needed to encode the value. |
198 | | const int bit_width_; |
199 | | |
200 | | // Underlying buffer. |
201 | | BitWriter bit_writer_; |
202 | | |
203 | | // We need to buffer at most 8 values for literals. This happens when the |
204 | | // bit_width is 1 (so 8 values fit in one byte). |
205 | | // TODO: generalize this to other bit widths |
206 | | uint64_t buffered_values_[8]; |
207 | | |
208 | | // Number of values in buffered_values_ |
209 | | int num_buffered_values_; |
210 | | |
211 | | // The current (also last) value that was written and the count of how |
212 | | // many times in a row that value has been seen. This is maintained even |
213 | | // if we are in a literal run. If the repeat_count_ get high enough, we switch |
214 | | // to encoding repeated runs. |
215 | | uint64_t current_value_; |
216 | | int repeat_count_; |
217 | | |
218 | | // Number of literals in the current run. This does not include the literals |
219 | | // that might be in buffered_values_. Only after we've got a group big enough |
220 | | // can we decide if they should part of the literal_count_ or repeat_count_ |
221 | | int literal_count_; |
222 | | |
223 | | // Index of a byte in the underlying buffer that stores the indicator byte. |
224 | | // This is reserved as soon as we need a literal run but the value is written |
225 | | // when the literal run is complete. We maintain an index rather than a pointer |
226 | | // into the underlying buffer because the pointer value may become invalid if |
227 | | // the underlying buffer is resized. |
228 | | int literal_indicator_byte_idx_; |
229 | | }; |
230 | | |
231 | | template <typename T> |
232 | 18.0M | bool RleDecoder<T>::ReadHeader() { |
233 | 18.0M | DCHECK(bit_reader_.is_initialized()); |
234 | 18.0M | if (literal_count_ == 0 && repeat_count_ == 0) [[unlikely]] { |
235 | | // Read the next run's indicator int, it could be a literal or repeated run |
236 | | // The int is encoded as a vlq-encoded value. |
237 | 1.08M | uint32_t indicator_value = 0; |
238 | 1.08M | bool result = bit_reader_.GetVlqInt(&indicator_value); |
239 | 1.08M | if (!result) [[unlikely]] { |
240 | 11.8k | return false; |
241 | 11.8k | } |
242 | | |
243 | | // lsb indicates if it is a literal run or repeated run |
244 | 1.06M | bool is_literal = indicator_value & 1; |
245 | 1.06M | if (is_literal) { |
246 | 603k | literal_count_ = (indicator_value >> 1) * 8; |
247 | 603k | DCHECK_GT(literal_count_, 0); |
248 | 603k | } else { |
249 | 465k | repeat_count_ = indicator_value >> 1; |
250 | 465k | DCHECK_GT(repeat_count_, 0); |
251 | 465k | bool result1 = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), |
252 | 465k | reinterpret_cast<T*>(¤t_value_)); |
253 | 465k | DCHECK(result1); |
254 | 465k | } |
255 | 1.06M | } |
256 | 18.0M | return true; |
257 | 18.0M | } _ZN5doris10RleDecoderIsE10ReadHeaderEv Line | Count | Source | 232 | 8.43M | bool RleDecoder<T>::ReadHeader() { | 233 | 8.43M | DCHECK(bit_reader_.is_initialized()); | 234 | 8.43M | if (literal_count_ == 0 && repeat_count_ == 0) [[unlikely]] { | 235 | | // Read the next run's indicator int, it could be a literal or repeated run | 236 | | // The int is encoded as a vlq-encoded value. | 237 | 172k | uint32_t indicator_value = 0; | 238 | 172k | bool result = bit_reader_.GetVlqInt(&indicator_value); | 239 | 172k | if (!result) [[unlikely]] { | 240 | 8 | return false; | 241 | 8 | } | 242 | | | 243 | | // lsb indicates if it is a literal run or repeated run | 244 | 172k | bool is_literal = indicator_value & 1; | 245 | 172k | if (is_literal) { | 246 | 85.9k | literal_count_ = (indicator_value >> 1) * 8; | 247 | 85.9k | DCHECK_GT(literal_count_, 0); | 248 | 86.6k | } else { | 249 | 86.6k | repeat_count_ = indicator_value >> 1; | 250 | 86.6k | DCHECK_GT(repeat_count_, 0); | 251 | 86.6k | bool result1 = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), | 252 | 86.6k | reinterpret_cast<T*>(¤t_value_)); | 253 | 86.6k | DCHECK(result1); | 254 | 86.6k | } | 255 | 172k | } | 256 | 8.43M | return true; | 257 | 8.43M | } |
_ZN5doris10RleDecoderIbE10ReadHeaderEv Line | Count | Source | 232 | 2.03M | bool RleDecoder<T>::ReadHeader() { | 233 | 2.03M | DCHECK(bit_reader_.is_initialized()); | 234 | 2.03M | if (literal_count_ == 0 && repeat_count_ == 0) [[unlikely]] { | 235 | | // Read the next run's indicator int, it could be a literal or repeated run | 236 | | // The int is encoded as a vlq-encoded value. | 237 | 781k | uint32_t indicator_value = 0; | 238 | 781k | bool result = bit_reader_.GetVlqInt(&indicator_value); | 239 | 781k | if (!result) [[unlikely]] { | 240 | 11.8k | return false; | 241 | 11.8k | } | 242 | | | 243 | | // lsb indicates if it is a literal run or repeated run | 244 | 769k | bool is_literal = indicator_value & 1; | 245 | 769k | if (is_literal) { | 246 | 447k | literal_count_ = (indicator_value >> 1) * 8; | 247 | 447k | DCHECK_GT(literal_count_, 0); | 248 | 447k | } else { | 249 | 321k | repeat_count_ = indicator_value >> 1; | 250 | 321k | DCHECK_GT(repeat_count_, 0); | 251 | 321k | bool result1 = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), | 252 | 321k | reinterpret_cast<T*>(¤t_value_)); | 253 | 321k | DCHECK(result1); | 254 | 321k | } | 255 | 769k | } | 256 | 2.02M | return true; | 257 | 2.03M | } |
_ZN5doris10RleDecoderIhE10ReadHeaderEv Line | Count | Source | 232 | 7.62M | bool RleDecoder<T>::ReadHeader() { | 233 | 7.62M | DCHECK(bit_reader_.is_initialized()); | 234 | 7.62M | if (literal_count_ == 0 && repeat_count_ == 0) [[unlikely]] { | 235 | | // Read the next run's indicator int, it could be a literal or repeated run | 236 | | // The int is encoded as a vlq-encoded value. | 237 | 127k | uint32_t indicator_value = 0; | 238 | 127k | bool result = bit_reader_.GetVlqInt(&indicator_value); | 239 | 127k | if (!result) [[unlikely]] { | 240 | 0 | return false; | 241 | 0 | } | 242 | | | 243 | | // lsb indicates if it is a literal run or repeated run | 244 | 127k | bool is_literal = indicator_value & 1; | 245 | 127k | if (is_literal) { | 246 | 70.2k | literal_count_ = (indicator_value >> 1) * 8; | 247 | 70.2k | DCHECK_GT(literal_count_, 0); | 248 | 70.2k | } else { | 249 | 56.8k | repeat_count_ = indicator_value >> 1; | 250 | 56.8k | DCHECK_GT(repeat_count_, 0); | 251 | 56.8k | bool result1 = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), | 252 | 56.8k | reinterpret_cast<T*>(¤t_value_)); | 253 | 56.8k | DCHECK(result1); | 254 | 56.8k | } | 255 | 127k | } | 256 | 7.62M | return true; | 257 | 7.62M | } |
|
258 | | |
259 | | template <typename T> |
260 | 15.8M | bool RleDecoder<T>::Get(T* val) { |
261 | 15.8M | DCHECK(bit_reader_.is_initialized()); |
262 | 15.8M | if (!ReadHeader()) [[unlikely]] { |
263 | 0 | return false; |
264 | 0 | } |
265 | | |
266 | 15.8M | if (repeat_count_ > 0) [[likely]] { |
267 | 7.81M | *val = cast_set<T>(current_value_); |
268 | 7.81M | --repeat_count_; |
269 | 7.81M | rewind_state_ = REWIND_RUN; |
270 | 8.07M | } else { |
271 | 8.07M | DCHECK(literal_count_ > 0); |
272 | 8.07M | bool result = bit_reader_.GetValue(bit_width_, val); |
273 | 8.07M | DCHECK(result); |
274 | 8.07M | --literal_count_; |
275 | 8.07M | rewind_state_ = REWIND_LITERAL; |
276 | 8.07M | } |
277 | | |
278 | 15.8M | return true; |
279 | 15.8M | } _ZN5doris10RleDecoderIsE3GetEPs Line | Count | Source | 260 | 8.42M | bool RleDecoder<T>::Get(T* val) { | 261 | 8.42M | DCHECK(bit_reader_.is_initialized()); | 262 | 8.42M | if (!ReadHeader()) [[unlikely]] { | 263 | 0 | return false; | 264 | 0 | } | 265 | | | 266 | 8.42M | if (repeat_count_ > 0) [[likely]] { | 267 | 7.61M | *val = cast_set<T>(current_value_); | 268 | 7.61M | --repeat_count_; | 269 | 7.61M | rewind_state_ = REWIND_RUN; | 270 | 7.61M | } else { | 271 | 814k | DCHECK(literal_count_ > 0); | 272 | 814k | bool result = bit_reader_.GetValue(bit_width_, val); | 273 | 814k | DCHECK(result); | 274 | 814k | --literal_count_; | 275 | 814k | rewind_state_ = REWIND_LITERAL; | 276 | 814k | } | 277 | | | 278 | 8.42M | return true; | 279 | 8.42M | } |
_ZN5doris10RleDecoderIhE3GetEPh Line | Count | Source | 260 | 7.46M | bool RleDecoder<T>::Get(T* val) { | 261 | 7.46M | DCHECK(bit_reader_.is_initialized()); | 262 | 7.46M | if (!ReadHeader()) [[unlikely]] { | 263 | 0 | return false; | 264 | 0 | } | 265 | | | 266 | 7.46M | if (repeat_count_ > 0) [[likely]] { | 267 | 198k | *val = cast_set<T>(current_value_); | 268 | 198k | --repeat_count_; | 269 | 198k | rewind_state_ = REWIND_RUN; | 270 | 7.26M | } else { | 271 | 7.26M | DCHECK(literal_count_ > 0); | 272 | 7.26M | bool result = bit_reader_.GetValue(bit_width_, val); | 273 | 7.26M | DCHECK(result); | 274 | 7.26M | --literal_count_; | 275 | 7.26M | rewind_state_ = REWIND_LITERAL; | 276 | 7.26M | } | 277 | | | 278 | 7.46M | return true; | 279 | 7.46M | } |
|
280 | | |
281 | | template <typename T> |
282 | 625 | void RleDecoder<T>::RewindOne() { |
283 | 625 | DCHECK(bit_reader_.is_initialized()); |
284 | | |
285 | 625 | switch (rewind_state_) { |
286 | 0 | case CANT_REWIND: |
287 | 0 | throw Exception(Status::FatalError("Can't rewind more than once after each read!")); |
288 | 0 | break; |
289 | 2 | case REWIND_RUN: |
290 | 2 | ++repeat_count_; |
291 | 2 | break; |
292 | 623 | case REWIND_LITERAL: { |
293 | 623 | bit_reader_.Rewind(bit_width_); |
294 | 623 | ++literal_count_; |
295 | 623 | break; |
296 | 0 | } |
297 | 625 | } |
298 | | |
299 | 625 | rewind_state_ = CANT_REWIND; |
300 | 625 | } |
301 | | |
302 | | template <typename T> |
303 | 1.40M | size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) { |
304 | 1.40M | DCHECK(bit_reader_.is_initialized()); |
305 | 1.40M | DCHECK_GT(max_run, 0); |
306 | 1.40M | size_t ret = 0; |
307 | 1.40M | size_t rem = max_run; |
308 | 1.57M | while (ReadHeader()) { |
309 | 1.56M | if (repeat_count_ > 0) [[likely]] { |
310 | 494k | if (ret > 0 && *val != current_value_) [[unlikely]] { |
311 | 26.3k | return ret; |
312 | 26.3k | } |
313 | 467k | *val = cast_set<T>(current_value_); |
314 | 467k | if (repeat_count_ >= rem) { |
315 | | // The next run is longer than the amount of remaining data |
316 | | // that the caller wants to read. Only consume it partially. |
317 | 374k | repeat_count_ -= rem; |
318 | 374k | ret += rem; |
319 | 374k | return ret; |
320 | 374k | } |
321 | 93.6k | ret += repeat_count_; |
322 | 93.6k | rem -= repeat_count_; |
323 | 93.6k | repeat_count_ = 0; |
324 | 1.07M | } else { |
325 | 1.07M | DCHECK(literal_count_ > 0); |
326 | 1.07M | if (ret == 0) { |
327 | 1.00M | bool has_more = bit_reader_.GetValue(bit_width_, val); |
328 | 1.00M | DCHECK(has_more); |
329 | 1.00M | literal_count_--; |
330 | 1.00M | ret++; |
331 | 1.00M | rem--; |
332 | 1.00M | } |
333 | | |
334 | 2.37M | while (literal_count_ > 0) { |
335 | 2.29M | bool result = bit_reader_.GetValue(bit_width_, ¤t_value_); |
336 | 2.29M | DCHECK(result); |
337 | 2.29M | if (current_value_ != *val || rem == 0) { |
338 | 993k | bit_reader_.Rewind(bit_width_); |
339 | 993k | return ret; |
340 | 993k | } |
341 | 1.29M | ret++; |
342 | 1.29M | rem--; |
343 | 1.29M | literal_count_--; |
344 | 1.29M | } |
345 | 1.07M | } |
346 | 1.56M | } |
347 | 7.21k | return ret; |
348 | 1.40M | } _ZN5doris10RleDecoderIsE10GetNextRunEPsm Line | Count | Source | 303 | 1.11k | size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) { | 304 | 1.11k | DCHECK(bit_reader_.is_initialized()); | 305 | 1.11k | DCHECK_GT(max_run, 0); | 306 | 1.11k | size_t ret = 0; | 307 | 1.11k | size_t rem = max_run; | 308 | 1.21k | while (ReadHeader()) { | 309 | 1.20k | if (repeat_count_ > 0) [[likely]] { | 310 | 984 | if (ret > 0 && *val != current_value_) [[unlikely]] { | 311 | 41 | return ret; | 312 | 41 | } | 313 | 943 | *val = cast_set<T>(current_value_); | 314 | 943 | if (repeat_count_ >= rem) { | 315 | | // The next run is longer than the amount of remaining data | 316 | | // that the caller wants to read. Only consume it partially. | 317 | 878 | repeat_count_ -= rem; | 318 | 878 | ret += rem; | 319 | 878 | return ret; | 320 | 878 | } | 321 | 65 | ret += repeat_count_; | 322 | 65 | rem -= repeat_count_; | 323 | 65 | repeat_count_ = 0; | 324 | 223 | } else { | 325 | 223 | DCHECK(literal_count_ > 0); | 326 | 223 | if (ret == 0) { | 327 | 199 | bool has_more = bit_reader_.GetValue(bit_width_, val); | 328 | 199 | DCHECK(has_more); | 329 | 199 | literal_count_--; | 330 | 199 | ret++; | 331 | 199 | rem--; | 332 | 199 | } | 333 | | | 334 | 597 | while (literal_count_ > 0) { | 335 | 565 | bool result = bit_reader_.GetValue(bit_width_, ¤t_value_); | 336 | 565 | DCHECK(result); | 337 | 565 | if (current_value_ != *val || rem == 0) { | 338 | 191 | bit_reader_.Rewind(bit_width_); | 339 | 191 | return ret; | 340 | 191 | } | 341 | 374 | ret++; | 342 | 374 | rem--; | 343 | 374 | literal_count_--; | 344 | 374 | } | 345 | 223 | } | 346 | 1.20k | } | 347 | 8 | return ret; | 348 | 1.11k | } |
_ZN5doris10RleDecoderIbE10GetNextRunEPbm Line | Count | Source | 303 | 1.40M | size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) { | 304 | 1.40M | DCHECK(bit_reader_.is_initialized()); | 305 | 1.40M | DCHECK_GT(max_run, 0); | 306 | 1.40M | size_t ret = 0; | 307 | 1.40M | size_t rem = max_run; | 308 | 1.57M | while (ReadHeader()) { | 309 | 1.56M | if (repeat_count_ > 0) [[likely]] { | 310 | 493k | if (ret > 0 && *val != current_value_) [[unlikely]] { | 311 | 26.3k | return ret; | 312 | 26.3k | } | 313 | 466k | *val = cast_set<T>(current_value_); | 314 | 466k | if (repeat_count_ >= rem) { | 315 | | // The next run is longer than the amount of remaining data | 316 | | // that the caller wants to read. Only consume it partially. | 317 | 373k | repeat_count_ -= rem; | 318 | 373k | ret += rem; | 319 | 373k | return ret; | 320 | 373k | } | 321 | 93.5k | ret += repeat_count_; | 322 | 93.5k | rem -= repeat_count_; | 323 | 93.5k | repeat_count_ = 0; | 324 | 1.07M | } else { | 325 | 1.07M | DCHECK(literal_count_ > 0); | 326 | 1.07M | if (ret == 0) { | 327 | 1.00M | bool has_more = bit_reader_.GetValue(bit_width_, val); | 328 | 1.00M | DCHECK(has_more); | 329 | 1.00M | literal_count_--; | 330 | 1.00M | ret++; | 331 | 1.00M | rem--; | 332 | 1.00M | } | 333 | | | 334 | 2.37M | while (literal_count_ > 0) { | 335 | 2.29M | bool result = bit_reader_.GetValue(bit_width_, ¤t_value_); | 336 | 2.29M | DCHECK(result); | 337 | 2.29M | if (current_value_ != *val || rem == 0) { | 338 | 993k | bit_reader_.Rewind(bit_width_); | 339 | 993k | return ret; | 340 | 993k | } | 341 | 1.29M | ret++; | 342 | 1.29M | rem--; | 343 | 1.29M | literal_count_--; | 344 | 1.29M | } | 345 | 1.07M | } | 346 | 1.56M | } | 347 | 7.20k | return ret; | 348 | 1.40M | } |
|
349 | | |
350 | | template <typename T> |
351 | 919 | size_t RleDecoder<T>::get_values(T* values, size_t num_values) { |
352 | 919 | size_t read_num = 0; |
353 | 5.95k | while (read_num < num_values) { |
354 | 5.04k | size_t read_this_time = num_values - read_num; |
355 | | |
356 | 5.04k | if (LIKELY(repeat_count_ > 0)) { |
357 | 1.53k | read_this_time = std::min((size_t)repeat_count_, read_this_time); |
358 | 1.53k | std::fill(values, values + read_this_time, current_value_); |
359 | 1.53k | values += read_this_time; |
360 | 1.53k | repeat_count_ -= read_this_time; |
361 | 1.53k | read_num += read_this_time; |
362 | 3.51k | } else if (literal_count_ > 0) { |
363 | 1.05k | read_this_time = std::min((size_t)literal_count_, read_this_time); |
364 | 11.1k | for (int i = 0; i < read_this_time; ++i) { |
365 | 10.0k | bool result = bit_reader_.GetValue(bit_width_, values); |
366 | 10.0k | DCHECK(result); |
367 | 10.0k | values++; |
368 | 10.0k | } |
369 | 1.05k | literal_count_ -= read_this_time; |
370 | 1.05k | read_num += read_this_time; |
371 | 2.45k | } else { |
372 | 2.45k | if (!ReadHeader()) { |
373 | 0 | return read_num; |
374 | 0 | } |
375 | 2.45k | } |
376 | 5.04k | } |
377 | 919 | return read_num; |
378 | 919 | } _ZN5doris10RleDecoderIsE10get_valuesEPsm Line | Count | Source | 351 | 914 | size_t RleDecoder<T>::get_values(T* values, size_t num_values) { | 352 | 914 | size_t read_num = 0; | 353 | 5.94k | while (read_num < num_values) { | 354 | 5.03k | size_t read_this_time = num_values - read_num; | 355 | | | 356 | 5.03k | if (LIKELY(repeat_count_ > 0)) { | 357 | 1.53k | read_this_time = std::min((size_t)repeat_count_, read_this_time); | 358 | 1.53k | std::fill(values, values + read_this_time, current_value_); | 359 | 1.53k | values += read_this_time; | 360 | 1.53k | repeat_count_ -= read_this_time; | 361 | 1.53k | read_num += read_this_time; | 362 | 3.50k | } else if (literal_count_ > 0) { | 363 | 1.05k | read_this_time = std::min((size_t)literal_count_, read_this_time); | 364 | 11.1k | for (int i = 0; i < read_this_time; ++i) { | 365 | 10.0k | bool result = bit_reader_.GetValue(bit_width_, values); | 366 | 10.0k | DCHECK(result); | 367 | 10.0k | values++; | 368 | 10.0k | } | 369 | 1.05k | literal_count_ -= read_this_time; | 370 | 1.05k | read_num += read_this_time; | 371 | 2.44k | } else { | 372 | 2.44k | if (!ReadHeader()) { | 373 | 0 | return read_num; | 374 | 0 | } | 375 | 2.44k | } | 376 | 5.03k | } | 377 | 914 | return read_num; | 378 | 914 | } |
_ZN5doris10RleDecoderIhE10get_valuesEPhm Line | Count | Source | 351 | 5 | size_t RleDecoder<T>::get_values(T* values, size_t num_values) { | 352 | 5 | size_t read_num = 0; | 353 | 14 | while (read_num < num_values) { | 354 | 9 | size_t read_this_time = num_values - read_num; | 355 | | | 356 | 9 | if (LIKELY(repeat_count_ > 0)) { | 357 | 0 | read_this_time = std::min((size_t)repeat_count_, read_this_time); | 358 | 0 | std::fill(values, values + read_this_time, current_value_); | 359 | 0 | values += read_this_time; | 360 | 0 | repeat_count_ -= read_this_time; | 361 | 0 | read_num += read_this_time; | 362 | 9 | } else if (literal_count_ > 0) { | 363 | 5 | read_this_time = std::min((size_t)literal_count_, read_this_time); | 364 | 40 | for (int i = 0; i < read_this_time; ++i) { | 365 | 35 | bool result = bit_reader_.GetValue(bit_width_, values); | 366 | 35 | DCHECK(result); | 367 | 35 | values++; | 368 | 35 | } | 369 | 5 | literal_count_ -= read_this_time; | 370 | 5 | read_num += read_this_time; | 371 | 5 | } else { | 372 | 4 | if (!ReadHeader()) { | 373 | 0 | return read_num; | 374 | 0 | } | 375 | 4 | } | 376 | 9 | } | 377 | 5 | return read_num; | 378 | 5 | } |
|
379 | | |
380 | | template <typename T> |
381 | | size_t RleDecoder<T>::repeated_count() { |
382 | | if (repeat_count_ > 0) { |
383 | | return repeat_count_; |
384 | | } |
385 | | if (literal_count_ == 0) { |
386 | | ReadHeader(); |
387 | | } |
388 | | return repeat_count_; |
389 | | } |
390 | | |
391 | | template <typename T> |
392 | | T RleDecoder<T>::get_repeated_value(size_t count) { |
393 | | DCHECK_GE(repeat_count_, count); |
394 | | repeat_count_ -= count; |
395 | | return current_value_; |
396 | | } |
397 | | |
398 | | template <typename T> |
399 | 1.49M | size_t RleDecoder<T>::Skip(size_t to_skip) { |
400 | 1.49M | DCHECK(bit_reader_.is_initialized()); |
401 | | |
402 | 1.49M | size_t set_count = 0; |
403 | 2.04M | while (to_skip > 0) { |
404 | 549k | bool result = ReadHeader(); |
405 | 549k | DCHECK(result); |
406 | | |
407 | 549k | if (repeat_count_ > 0) [[likely]] { |
408 | 177k | size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip; |
409 | 177k | repeat_count_ -= nskip; |
410 | 177k | to_skip -= nskip; |
411 | 177k | if (current_value_ != 0) { |
412 | 139k | set_count += nskip; |
413 | 139k | } |
414 | 372k | } else { |
415 | 372k | DCHECK(literal_count_ > 0); |
416 | 372k | size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip; |
417 | 372k | literal_count_ -= nskip; |
418 | 372k | to_skip -= nskip; |
419 | 22.5M | for (; nskip > 0; nskip--) { |
420 | 22.1M | T value = 0; |
421 | 22.1M | bool result1 = bit_reader_.GetValue(bit_width_, &value); |
422 | 22.1M | DCHECK(result1); |
423 | 22.1M | if (value != 0) { |
424 | 13.1M | set_count++; |
425 | 13.1M | } |
426 | 22.1M | } |
427 | 372k | } |
428 | 549k | } |
429 | 1.49M | return set_count; |
430 | 1.49M | } _ZN5doris10RleDecoderIbE4SkipEm Line | Count | Source | 399 | 327k | size_t RleDecoder<T>::Skip(size_t to_skip) { | 400 | 327k | DCHECK(bit_reader_.is_initialized()); | 401 | | | 402 | 327k | size_t set_count = 0; | 403 | 772k | while (to_skip > 0) { | 404 | 444k | bool result = ReadHeader(); | 405 | 444k | DCHECK(result); | 406 | | | 407 | 444k | if (repeat_count_ > 0) [[likely]] { | 408 | 136k | size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip; | 409 | 136k | repeat_count_ -= nskip; | 410 | 136k | to_skip -= nskip; | 411 | 136k | if (current_value_ != 0) { | 412 | 119k | set_count += nskip; | 413 | 119k | } | 414 | 308k | } else { | 415 | 308k | DCHECK(literal_count_ > 0); | 416 | 308k | size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip; | 417 | 308k | literal_count_ -= nskip; | 418 | 308k | to_skip -= nskip; | 419 | 6.41M | for (; nskip > 0; nskip--) { | 420 | 6.10M | T value = 0; | 421 | 6.10M | bool result1 = bit_reader_.GetValue(bit_width_, &value); | 422 | 6.10M | DCHECK(result1); | 423 | 6.10M | if (value != 0) { | 424 | 4.85M | set_count++; | 425 | 4.85M | } | 426 | 6.10M | } | 427 | 308k | } | 428 | 444k | } | 429 | 327k | return set_count; | 430 | 327k | } |
_ZN5doris10RleDecoderIhE4SkipEm Line | Count | Source | 399 | 1.16M | size_t RleDecoder<T>::Skip(size_t to_skip) { | 400 | 1.16M | DCHECK(bit_reader_.is_initialized()); | 401 | | | 402 | 1.16M | size_t set_count = 0; | 403 | 1.27M | while (to_skip > 0) { | 404 | 104k | bool result = ReadHeader(); | 405 | 104k | DCHECK(result); | 406 | | | 407 | 104k | if (repeat_count_ > 0) [[likely]] { | 408 | 40.3k | size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip; | 409 | 40.3k | repeat_count_ -= nskip; | 410 | 40.3k | to_skip -= nskip; | 411 | 40.3k | if (current_value_ != 0) { | 412 | 19.9k | set_count += nskip; | 413 | 19.9k | } | 414 | 63.8k | } else { | 415 | 63.8k | DCHECK(literal_count_ > 0); | 416 | 63.8k | size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip; | 417 | 63.8k | literal_count_ -= nskip; | 418 | 63.8k | to_skip -= nskip; | 419 | 16.1M | for (; nskip > 0; nskip--) { | 420 | 16.0M | T value = 0; | 421 | 16.0M | bool result1 = bit_reader_.GetValue(bit_width_, &value); | 422 | 16.0M | DCHECK(result1); | 423 | 16.0M | if (value != 0) { | 424 | 8.34M | set_count++; | 425 | 8.34M | } | 426 | 16.0M | } | 427 | 63.8k | } | 428 | 104k | } | 429 | 1.16M | return set_count; | 430 | 1.16M | } |
|
431 | | |
432 | | // This function buffers input values 8 at a time. After seeing all 8 values, |
433 | | // it decides whether they should be encoded as a literal or repeated run. |
434 | | template <typename T> |
435 | 10.3M | void RleEncoder<T>::Put(T value, size_t run_length) { |
436 | 10.3M | DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); |
437 | | |
438 | | // Fast path: if this is a continuation of the current repeated run and |
439 | | // we've already buffered enough values, just increment repeat_count_ |
440 | 10.3M | if (current_value_ == value && repeat_count_ >= 8 && run_length > 0) [[likely]] { |
441 | 1.06M | repeat_count_ += run_length; |
442 | 1.06M | return; |
443 | 1.06M | } |
444 | | |
445 | | // Handle run_length > 1 more efficiently |
446 | 21.5M | while (run_length > 0) { |
447 | 13.3M | if (current_value_ == value) [[likely]] { |
448 | | // Need to buffer values until we reach 8 |
449 | 6.30M | size_t to_buffer = std::min(run_length, size_t(8 - num_buffered_values_)); |
450 | 28.4M | for (size_t i = 0; i < to_buffer; ++i) { |
451 | 22.1M | buffered_values_[num_buffered_values_++] = value; |
452 | 22.1M | ++repeat_count_; |
453 | 22.1M | } |
454 | 6.30M | run_length -= to_buffer; |
455 | 6.30M | if (num_buffered_values_ == 8) { |
456 | 2.92M | DCHECK_EQ(literal_count_ % 8, 0); |
457 | 2.92M | FlushBufferedValues(false); |
458 | | // After flushing, if we still have a repeated run and more values, |
459 | | // we can add them directly to repeat_count_ |
460 | 2.92M | if (repeat_count_ >= 8 && run_length > 0) { |
461 | 1.11M | repeat_count_ += run_length; |
462 | 1.11M | return; |
463 | 1.11M | } |
464 | 2.92M | } |
465 | 7.07M | } else { |
466 | | // Value changed |
467 | 7.07M | if (repeat_count_ >= 8) { |
468 | | // We had a run that was long enough but it has ended. Flush the |
469 | | // current repeated run. |
470 | 1.05M | DCHECK_EQ(literal_count_, 0); |
471 | 1.05M | FlushRepeatedRun(); |
472 | 1.05M | } |
473 | 7.07M | repeat_count_ = 1; |
474 | 7.07M | current_value_ = value; |
475 | | |
476 | 7.07M | buffered_values_[num_buffered_values_++] = value; |
477 | 7.07M | --run_length; |
478 | 7.07M | if (num_buffered_values_ == 8) { |
479 | 625k | DCHECK_EQ(literal_count_ % 8, 0); |
480 | 625k | FlushBufferedValues(false); |
481 | 625k | } |
482 | 7.07M | } |
483 | 13.3M | } |
484 | 9.27M | } _ZN5doris10RleEncoderIhE3PutEhm Line | Count | Source | 435 | 4.03M | void RleEncoder<T>::Put(T value, size_t run_length) { | 436 | 4.03M | DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); | 437 | | | 438 | | // Fast path: if this is a continuation of the current repeated run and | 439 | | // we've already buffered enough values, just increment repeat_count_ | 440 | 4.03M | if (current_value_ == value && repeat_count_ >= 8 && run_length > 0) [[likely]] { | 441 | 417k | repeat_count_ += run_length; | 442 | 417k | return; | 443 | 417k | } | 444 | | | 445 | | // Handle run_length > 1 more efficiently | 446 | 7.23M | while (run_length > 0) { | 447 | 3.61M | if (current_value_ == value) [[likely]] { | 448 | | // Need to buffer values until we reach 8 | 449 | 1.80M | size_t to_buffer = std::min(run_length, size_t(8 - num_buffered_values_)); | 450 | 3.61M | for (size_t i = 0; i < to_buffer; ++i) { | 451 | 1.80M | buffered_values_[num_buffered_values_++] = value; | 452 | 1.80M | ++repeat_count_; | 453 | 1.80M | } | 454 | 1.80M | run_length -= to_buffer; | 455 | 1.80M | if (num_buffered_values_ == 8) { | 456 | 226k | DCHECK_EQ(literal_count_ % 8, 0); | 457 | 226k | FlushBufferedValues(false); | 458 | | // After flushing, if we still have a repeated run and more values, | 459 | | // we can add them directly to repeat_count_ | 460 | 226k | if (repeat_count_ >= 8 && run_length > 0) { | 461 | 3 | repeat_count_ += run_length; | 462 | 3 | return; | 463 | 3 | } | 464 | 226k | } | 465 | 1.80M | } else { | 466 | | // Value changed | 467 | 1.80M | if (repeat_count_ >= 8) { | 468 | | // We had a run that was long enough but it has ended. Flush the | 469 | | // current repeated run. | 470 | 5.42k | DCHECK_EQ(literal_count_, 0); | 471 | 5.42k | FlushRepeatedRun(); | 472 | 5.42k | } | 473 | 1.80M | repeat_count_ = 1; | 474 | 1.80M | current_value_ = value; | 475 | | | 476 | 1.80M | buffered_values_[num_buffered_values_++] = value; | 477 | 1.80M | --run_length; | 478 | 1.80M | if (num_buffered_values_ == 8) { | 479 | | DCHECK_EQ(literal_count_ % 8, 0); | 480 | 222k | FlushBufferedValues(false); | 481 | 222k | } | 482 | 1.80M | } | 483 | 3.61M | } | 484 | 3.61M | } |
_ZN5doris10RleEncoderIbE3PutEbm Line | Count | Source | 435 | 6.30M | void RleEncoder<T>::Put(T value, size_t run_length) { | 436 | 6.30M | DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); | 437 | | | 438 | | // Fast path: if this is a continuation of the current repeated run and | 439 | | // we've already buffered enough values, just increment repeat_count_ | 440 | 6.30M | if (current_value_ == value && repeat_count_ >= 8 && run_length > 0) [[likely]] { | 441 | 651k | repeat_count_ += run_length; | 442 | 651k | return; | 443 | 651k | } | 444 | | | 445 | | // Handle run_length > 1 more efficiently | 446 | 14.2M | while (run_length > 0) { | 447 | 9.75M | if (current_value_ == value) [[likely]] { | 448 | | // Need to buffer values until we reach 8 | 449 | 4.49M | size_t to_buffer = std::min(run_length, size_t(8 - num_buffered_values_)); | 450 | 24.8M | for (size_t i = 0; i < to_buffer; ++i) { | 451 | 20.3M | buffered_values_[num_buffered_values_++] = value; | 452 | 20.3M | ++repeat_count_; | 453 | 20.3M | } | 454 | 4.49M | run_length -= to_buffer; | 455 | 4.49M | if (num_buffered_values_ == 8) { | 456 | 2.69M | DCHECK_EQ(literal_count_ % 8, 0); | 457 | 2.69M | FlushBufferedValues(false); | 458 | | // After flushing, if we still have a repeated run and more values, | 459 | | // we can add them directly to repeat_count_ | 460 | 2.69M | if (repeat_count_ >= 8 && run_length > 0) { | 461 | 1.11M | repeat_count_ += run_length; | 462 | 1.11M | return; | 463 | 1.11M | } | 464 | 2.69M | } | 465 | 5.26M | } else { | 466 | | // Value changed | 467 | 5.26M | if (repeat_count_ >= 8) { | 468 | | // We had a run that was long enough but it has ended. Flush the | 469 | | // current repeated run. | 470 | 1.04M | DCHECK_EQ(literal_count_, 0); | 471 | 1.04M | FlushRepeatedRun(); | 472 | 1.04M | } | 473 | 5.26M | repeat_count_ = 1; | 474 | 5.26M | current_value_ = value; | 475 | | | 476 | 5.26M | buffered_values_[num_buffered_values_++] = value; | 477 | 5.26M | --run_length; | 478 | 5.26M | if (num_buffered_values_ == 8) { | 479 | | DCHECK_EQ(literal_count_ % 8, 0); | 480 | 403k | FlushBufferedValues(false); | 481 | 403k | } | 482 | 5.26M | } | 483 | 9.75M | } | 484 | 5.65M | } |
|
485 | | |
486 | | template <typename T> |
487 | 3.42M | void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) { |
488 | 3.42M | if (literal_indicator_byte_idx_ < 0) { |
489 | | // The literal indicator byte has not been reserved yet, get one now. |
490 | 1.08M | literal_indicator_byte_idx_ = cast_set<int>(bit_writer_.GetByteIndexAndAdvance(1)); |
491 | 1.08M | DCHECK_GE(literal_indicator_byte_idx_, 0); |
492 | 1.08M | } |
493 | | |
494 | | // Write all the buffered values as bit packed literals |
495 | 22.3M | for (int i = 0; i < num_buffered_values_; ++i) { |
496 | 18.9M | bit_writer_.PutValue(buffered_values_[i], bit_width_); |
497 | 18.9M | } |
498 | 3.42M | num_buffered_values_ = 0; |
499 | | |
500 | 3.42M | if (update_indicator_byte) { |
501 | | // At this point we need to write the indicator byte for the literal run. |
502 | | // We only reserve one byte, to allow for streaming writes of literal values. |
503 | | // The logic makes sure we flush literal runs often enough to not overrun |
504 | | // the 1 byte. |
505 | 1.08M | int num_groups = BitUtil::Ceil(literal_count_, 8); |
506 | 1.08M | int32_t indicator_value = (num_groups << 1) | 1; |
507 | 1.08M | DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); |
508 | 1.08M | bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = |
509 | 1.08M | cast_set<uint8_t>(indicator_value); |
510 | 1.08M | literal_indicator_byte_idx_ = -1; |
511 | 1.08M | literal_count_ = 0; |
512 | 1.08M | } |
513 | 3.42M | } _ZN5doris10RleEncoderIhE15FlushLiteralRunEb Line | Count | Source | 487 | 449k | void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) { | 488 | 449k | if (literal_indicator_byte_idx_ < 0) { | 489 | | // The literal indicator byte has not been reserved yet, get one now. | 490 | 12.3k | literal_indicator_byte_idx_ = cast_set<int>(bit_writer_.GetByteIndexAndAdvance(1)); | 491 | 12.3k | DCHECK_GE(literal_indicator_byte_idx_, 0); | 492 | 12.3k | } | 493 | | | 494 | | // Write all the buffered values as bit packed literals | 495 | 3.99M | for (int i = 0; i < num_buffered_values_; ++i) { | 496 | 3.54M | bit_writer_.PutValue(buffered_values_[i], bit_width_); | 497 | 3.54M | } | 498 | 449k | num_buffered_values_ = 0; | 499 | | | 500 | 449k | if (update_indicator_byte) { | 501 | | // At this point we need to write the indicator byte for the literal run. | 502 | | // We only reserve one byte, to allow for streaming writes of literal values. | 503 | | // The logic makes sure we flush literal runs often enough to not overrun | 504 | | // the 1 byte. | 505 | 12.3k | int num_groups = BitUtil::Ceil(literal_count_, 8); | 506 | 12.3k | int32_t indicator_value = (num_groups << 1) | 1; | 507 | | DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); | 508 | 12.3k | bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = | 509 | 12.3k | cast_set<uint8_t>(indicator_value); | 510 | 12.3k | literal_indicator_byte_idx_ = -1; | 511 | 12.3k | literal_count_ = 0; | 512 | 12.3k | } | 513 | 449k | } |
_ZN5doris10RleEncoderIbE15FlushLiteralRunEb Line | Count | Source | 487 | 2.97M | void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) { | 488 | 2.97M | if (literal_indicator_byte_idx_ < 0) { | 489 | | // The literal indicator byte has not been reserved yet, get one now. | 490 | 1.07M | literal_indicator_byte_idx_ = cast_set<int>(bit_writer_.GetByteIndexAndAdvance(1)); | 491 | 1.07M | DCHECK_GE(literal_indicator_byte_idx_, 0); | 492 | 1.07M | } | 493 | | | 494 | | // Write all the buffered values as bit packed literals | 495 | 18.4M | for (int i = 0; i < num_buffered_values_; ++i) { | 496 | 15.4M | bit_writer_.PutValue(buffered_values_[i], bit_width_); | 497 | 15.4M | } | 498 | 2.97M | num_buffered_values_ = 0; | 499 | | | 500 | 2.97M | if (update_indicator_byte) { | 501 | | // At this point we need to write the indicator byte for the literal run. | 502 | | // We only reserve one byte, to allow for streaming writes of literal values. | 503 | | // The logic makes sure we flush literal runs often enough to not overrun | 504 | | // the 1 byte. | 505 | 1.07M | int num_groups = BitUtil::Ceil(literal_count_, 8); | 506 | 1.07M | int32_t indicator_value = (num_groups << 1) | 1; | 507 | | DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); | 508 | 1.07M | bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = | 509 | 1.07M | cast_set<uint8_t>(indicator_value); | 510 | 1.07M | literal_indicator_byte_idx_ = -1; | 511 | 1.07M | literal_count_ = 0; | 512 | 1.07M | } | 513 | 2.97M | } |
|
514 | | |
515 | | template <typename T> |
516 | 1.17M | void RleEncoder<T>::FlushRepeatedRun() { |
517 | 1.17M | DCHECK_GT(repeat_count_, 0); |
518 | | // The lsb of 0 indicates this is a repeated run |
519 | 1.17M | int32_t indicator_value = repeat_count_ << 1 | 0; |
520 | 1.17M | bit_writer_.PutVlqInt(indicator_value); |
521 | 1.17M | bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); |
522 | 1.17M | num_buffered_values_ = 0; |
523 | 1.17M | repeat_count_ = 0; |
524 | 1.17M | } _ZN5doris10RleEncoderIhE16FlushRepeatedRunEv Line | Count | Source | 516 | 14.9k | void RleEncoder<T>::FlushRepeatedRun() { | 517 | 14.9k | DCHECK_GT(repeat_count_, 0); | 518 | | // The lsb of 0 indicates this is a repeated run | 519 | 14.9k | int32_t indicator_value = repeat_count_ << 1 | 0; | 520 | 14.9k | bit_writer_.PutVlqInt(indicator_value); | 521 | 14.9k | bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); | 522 | 14.9k | num_buffered_values_ = 0; | 523 | 14.9k | repeat_count_ = 0; | 524 | 14.9k | } |
_ZN5doris10RleEncoderIbE16FlushRepeatedRunEv Line | Count | Source | 516 | 1.15M | void RleEncoder<T>::FlushRepeatedRun() { | 517 | 1.15M | DCHECK_GT(repeat_count_, 0); | 518 | | // The lsb of 0 indicates this is a repeated run | 519 | 1.15M | int32_t indicator_value = repeat_count_ << 1 | 0; | 520 | 1.15M | bit_writer_.PutVlqInt(indicator_value); | 521 | 1.15M | bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); | 522 | 1.15M | num_buffered_values_ = 0; | 523 | 1.15M | repeat_count_ = 0; | 524 | 1.15M | } |
|
525 | | |
526 | | // Flush the values that have been buffered. At this point we decide whether |
527 | | // we need to switch between the run types or continue the current one. |
528 | | template <typename T> |
529 | 3.54M | void RleEncoder<T>::FlushBufferedValues(bool done) { |
530 | 3.54M | if (repeat_count_ >= 8) { |
531 | | // Clear the buffered values. They are part of the repeated run now and we |
532 | | // don't want to flush them out as literals. |
533 | 1.19M | num_buffered_values_ = 0; |
534 | 1.19M | if (literal_count_ != 0) { |
535 | | // There was a current literal run. All the values in it have been flushed |
536 | | // but we still need to update the indicator byte. |
537 | 1.02M | DCHECK_EQ(literal_count_ % 8, 0); |
538 | 1.02M | DCHECK_EQ(repeat_count_, 8); |
539 | 1.02M | FlushLiteralRun(true); |
540 | 1.02M | } |
541 | 1.19M | DCHECK_EQ(literal_count_, 0); |
542 | 1.19M | return; |
543 | 1.19M | } |
544 | | |
545 | 2.34M | literal_count_ += num_buffered_values_; |
546 | 2.34M | int num_groups = BitUtil::Ceil(literal_count_, 8); |
547 | 2.34M | if (num_groups + 1 >= (1 << 6)) { |
548 | | // We need to start a new literal run because the indicator byte we've reserved |
549 | | // cannot store more values. |
550 | 7.30k | DCHECK_GE(literal_indicator_byte_idx_, 0); |
551 | 7.30k | FlushLiteralRun(true); |
552 | 2.34M | } else { |
553 | 2.34M | FlushLiteralRun(done); |
554 | 2.34M | } |
555 | 2.34M | repeat_count_ = 0; |
556 | 2.34M | } _ZN5doris10RleEncoderIhE19FlushBufferedValuesEb Line | Count | Source | 529 | 449k | void RleEncoder<T>::FlushBufferedValues(bool done) { | 530 | 449k | if (repeat_count_ >= 8) { | 531 | | // Clear the buffered values. They are part of the repeated run now and we | 532 | | // don't want to flush them out as literals. | 533 | 7.01k | num_buffered_values_ = 0; | 534 | 7.01k | if (literal_count_ != 0) { | 535 | | // There was a current literal run. All the values in it have been flushed | 536 | | // but we still need to update the indicator byte. | 537 | 4.21k | DCHECK_EQ(literal_count_ % 8, 0); | 538 | 4.21k | DCHECK_EQ(repeat_count_, 8); | 539 | 4.21k | FlushLiteralRun(true); | 540 | 4.21k | } | 541 | 7.01k | DCHECK_EQ(literal_count_, 0); | 542 | 7.01k | return; | 543 | 7.01k | } | 544 | | | 545 | 442k | literal_count_ += num_buffered_values_; | 546 | 442k | int num_groups = BitUtil::Ceil(literal_count_, 8); | 547 | 442k | if (num_groups + 1 >= (1 << 6)) { | 548 | | // We need to start a new literal run because the indicator byte we've reserved | 549 | | // cannot store more values. | 550 | 5.24k | DCHECK_GE(literal_indicator_byte_idx_, 0); | 551 | 5.24k | FlushLiteralRun(true); | 552 | 437k | } else { | 553 | 437k | FlushLiteralRun(done); | 554 | 437k | } | 555 | 442k | repeat_count_ = 0; | 556 | 442k | } |
_ZN5doris10RleEncoderIbE19FlushBufferedValuesEb Line | Count | Source | 529 | 3.09M | void RleEncoder<T>::FlushBufferedValues(bool done) { | 530 | 3.09M | if (repeat_count_ >= 8) { | 531 | | // Clear the buffered values. They are part of the repeated run now and we | 532 | | // don't want to flush them out as literals. | 533 | 1.18M | num_buffered_values_ = 0; | 534 | 1.18M | if (literal_count_ != 0) { | 535 | | // There was a current literal run. All the values in it have been flushed | 536 | | // but we still need to update the indicator byte. | 537 | 1.02M | DCHECK_EQ(literal_count_ % 8, 0); | 538 | 1.02M | DCHECK_EQ(repeat_count_, 8); | 539 | 1.02M | FlushLiteralRun(true); | 540 | 1.02M | } | 541 | 1.18M | DCHECK_EQ(literal_count_, 0); | 542 | 1.18M | return; | 543 | 1.18M | } | 544 | | | 545 | 1.90M | literal_count_ += num_buffered_values_; | 546 | 1.90M | int num_groups = BitUtil::Ceil(literal_count_, 8); | 547 | 1.90M | if (num_groups + 1 >= (1 << 6)) { | 548 | | // We need to start a new literal run because the indicator byte we've reserved | 549 | | // cannot store more values. | 550 | 2.06k | DCHECK_GE(literal_indicator_byte_idx_, 0); | 551 | 2.06k | FlushLiteralRun(true); | 552 | 1.90M | } else { | 553 | 1.90M | FlushLiteralRun(done); | 554 | 1.90M | } | 555 | 1.90M | repeat_count_ = 0; | 556 | 1.90M | } |
|
557 | | |
558 | | template <typename T> |
559 | 27.0k | void RleEncoder<T>::Reserve(int num_bytes, uint8_t val) { |
560 | 135k | for (int i = 0; i < num_bytes; ++i) { |
561 | 108k | bit_writer_.PutValue(val, 8); |
562 | 108k | } |
563 | 27.0k | } |
564 | | |
565 | | template <typename T> |
566 | 173k | int RleEncoder<T>::Flush() { |
567 | 173k | if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { |
568 | 173k | bool all_repeat = literal_count_ == 0 && |
569 | 173k | (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); |
570 | | // There is something pending, figure out if it's a repeated or literal run |
571 | 173k | if (repeat_count_ > 0 && all_repeat) { |
572 | 120k | FlushRepeatedRun(); |
573 | 120k | } else { |
574 | 52.9k | literal_count_ += num_buffered_values_; |
575 | 52.9k | FlushLiteralRun(true); |
576 | 52.9k | repeat_count_ = 0; |
577 | 52.9k | } |
578 | 173k | } |
579 | 173k | bit_writer_.Flush(); |
580 | 173k | DCHECK_EQ(num_buffered_values_, 0); |
581 | 173k | DCHECK_EQ(literal_count_, 0); |
582 | 173k | DCHECK_EQ(repeat_count_, 0); |
583 | 173k | return bit_writer_.bytes_written(); |
584 | 173k | } _ZN5doris10RleEncoderIhE5FlushEv Line | Count | Source | 566 | 13.0k | int RleEncoder<T>::Flush() { | 567 | 13.0k | if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { | 568 | 12.4k | bool all_repeat = literal_count_ == 0 && | 569 | 12.4k | (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); | 570 | | // There is something pending, figure out if it's a repeated or literal run | 571 | 12.4k | if (repeat_count_ > 0 && all_repeat) { | 572 | 9.52k | FlushRepeatedRun(); | 573 | 9.52k | } else { | 574 | 2.93k | literal_count_ += num_buffered_values_; | 575 | 2.93k | FlushLiteralRun(true); | 576 | 2.93k | repeat_count_ = 0; | 577 | 2.93k | } | 578 | 12.4k | } | 579 | 13.0k | bit_writer_.Flush(); | 580 | 13.0k | DCHECK_EQ(num_buffered_values_, 0); | 581 | 13.0k | DCHECK_EQ(literal_count_, 0); | 582 | | DCHECK_EQ(repeat_count_, 0); | 583 | 13.0k | return bit_writer_.bytes_written(); | 584 | 13.0k | } |
_ZN5doris10RleEncoderIbE5FlushEv Line | Count | Source | 566 | 160k | int RleEncoder<T>::Flush() { | 567 | 160k | if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { | 568 | 160k | bool all_repeat = literal_count_ == 0 && | 569 | 160k | (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); | 570 | | // There is something pending, figure out if it's a repeated or literal run | 571 | 160k | if (repeat_count_ > 0 && all_repeat) { | 572 | 110k | FlushRepeatedRun(); | 573 | 110k | } else { | 574 | 50.0k | literal_count_ += num_buffered_values_; | 575 | 50.0k | FlushLiteralRun(true); | 576 | 50.0k | repeat_count_ = 0; | 577 | 50.0k | } | 578 | 160k | } | 579 | 160k | bit_writer_.Flush(); | 580 | 160k | DCHECK_EQ(num_buffered_values_, 0); | 581 | 160k | DCHECK_EQ(literal_count_, 0); | 582 | | DCHECK_EQ(repeat_count_, 0); | 583 | 160k | return bit_writer_.bytes_written(); | 584 | 160k | } |
|
585 | | |
586 | | template <typename T> |
587 | 1.05M | void RleEncoder<T>::Clear() { |
588 | 1.05M | current_value_ = 0; |
589 | 1.05M | repeat_count_ = 0; |
590 | 1.05M | num_buffered_values_ = 0; |
591 | 1.05M | literal_count_ = 0; |
592 | 1.05M | literal_indicator_byte_idx_ = -1; |
593 | 1.05M | bit_writer_.Clear(); |
594 | 1.05M | } _ZN5doris10RleEncoderIhE5ClearEv Line | Count | Source | 587 | 41.1k | void RleEncoder<T>::Clear() { | 588 | 41.1k | current_value_ = 0; | 589 | 41.1k | repeat_count_ = 0; | 590 | 41.1k | num_buffered_values_ = 0; | 591 | 41.1k | literal_count_ = 0; | 592 | 41.1k | literal_indicator_byte_idx_ = -1; | 593 | 41.1k | bit_writer_.Clear(); | 594 | 41.1k | } |
_ZN5doris10RleEncoderIbE5ClearEv Line | Count | Source | 587 | 1.01M | void RleEncoder<T>::Clear() { | 588 | 1.01M | current_value_ = 0; | 589 | 1.01M | repeat_count_ = 0; | 590 | 1.01M | num_buffered_values_ = 0; | 591 | 1.01M | literal_count_ = 0; | 592 | 1.01M | literal_indicator_byte_idx_ = -1; | 593 | 1.01M | bit_writer_.Clear(); | 594 | 1.01M | } |
|
595 | | |
596 | | // Copy from https://github.com/apache/impala/blob/master/be/src/util/rle-encoding.h |
597 | | // Utility classes to do run length encoding (RLE) for fixed bit width values. If runs |
598 | | // are sufficiently long, RLE is used, otherwise, the values are just bit-packed |
599 | | // (literal encoding). |
600 | | // |
601 | | // For both types of runs, there is a byte-aligned indicator which encodes the length |
602 | | // of the run and the type of the run. |
603 | | // |
604 | | // This encoding has the benefit that when there aren't any long enough runs, values |
605 | | // are always decoded at fixed (can be precomputed) bit offsets OR both the value and |
606 | | // the run length are byte aligned. This allows for very efficient decoding |
607 | | // implementations. |
608 | | // The encoding is: |
609 | | // encoded-block := run* |
610 | | // run := literal-run | repeated-run |
611 | | // literal-run := literal-indicator < literal bytes > |
612 | | // repeated-run := repeated-indicator < repeated value. padded to byte boundary > |
613 | | // literal-indicator := varint_encode( number_of_groups << 1 | 1) |
614 | | // repeated-indicator := varint_encode( number_of_repetitions << 1 ) |
615 | | // |
616 | | // Each run is preceded by a varint. The varint's least significant bit is |
617 | | // used to indicate whether the run is a literal run or a repeated run. The rest |
618 | | // of the varint is used to determine the length of the run (eg how many times the |
619 | | // value repeats). |
620 | | // |
621 | | // In the case of literal runs, the run length is always a multiple of 8 (i.e. encode |
622 | | // in groups of 8), so that no matter the bit-width of the value, the sequence will end |
623 | | // on a byte boundary without padding. |
624 | | // Given that we know it is a multiple of 8, we store the number of 8-groups rather than |
625 | | // the actual number of encoded ints. (This means that the total number of encoded values |
626 | | // can not be determined from the encoded data, since the number of values in the last |
627 | | // group may not be a multiple of 8). For the last group of literal runs, we pad |
628 | | // the group to 8 with zeros. This allows for 8 at a time decoding on the read side |
629 | | // without the need for additional checks. |
630 | | // |
631 | | // There is a break-even point when it is more storage efficient to do run length |
632 | | // encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes |
633 | | // for both the repeated encoding or the literal encoding. This value can always |
634 | | // be computed based on the bit-width. |
635 | | // TODO: For 1 bit-width values it can be optimal to use 16 or 24 values, but more |
636 | | // investigation is needed to do this efficiently, see the reverted IMPALA-6658. |
637 | | // TODO: think about how to use this for strings. The bit packing isn't quite the same. |
638 | | // |
639 | | // Examples with bit-width 1 (eg encoding booleans): |
640 | | // ---------------------------------------- |
641 | | // 100 1s followed by 100 0s: |
642 | | // <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte> |
643 | | // - (total 4 bytes) |
644 | | // |
645 | | // alternating 1s and 0s (200 total): |
646 | | // 200 ints = 25 groups of 8 |
647 | | // <varint((25 << 1) | 1)> <25 bytes of values, bitpacked> |
648 | | // (total 26 bytes, 1 byte overhead) |
649 | | |
650 | | // RLE decoder with a batch-oriented interface that enables fast decoding. |
651 | | // Users of this class must first initialize the class to point to a buffer of |
652 | | // RLE-encoded data, passed into the constructor or Reset(). The provided |
653 | | // bit_width must be at most min(sizeof(T) * 8, BatchedBitReader::MAX_BITWIDTH). |
654 | | // Then they can decode data by checking NextNumRepeats()/NextNumLiterals() to |
655 | | // see if the next run is a repeated or literal run, then calling |
656 | | // GetRepeatedValue() or GetLiteralValues() respectively to read the values. |
657 | | // |
658 | | // End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0. |
659 | | // Other decoding errors are signalled by functions returning false. If an |
660 | | // error is encountered then it is not valid to read any more data until |
661 | | // Reset() is called. |
662 | | |
663 | | //bit-packed-run-len and rle-run-len must be in the range [1, 2^31 - 1]. |
664 | | // This means that a Parquet implementation can always store the run length in a signed 32-bit integer. |
665 | | template <typename T> |
666 | | class RleBatchDecoder { |
667 | | public: |
668 | 974 | RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) { |
669 | 974 | Reset(buffer, buffer_len, bit_width); |
670 | 974 | } |
671 | | |
672 | | RleBatchDecoder() = default; |
673 | | |
674 | | // Reset the decoder to read from a new buffer. |
675 | | void Reset(uint8_t* buffer, int buffer_len, int bit_width); |
676 | | |
677 | | // Return the size of the current repeated run. Returns zero if the current run is |
678 | | // a literal run or if no more runs can be read from the input. |
679 | | int32_t NextNumRepeats(); |
680 | | |
681 | | // Get the value of the current repeated run and consume the given number of repeats. |
682 | | // Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot |
683 | | // be greater than the remaining number of repeats in the run. 'num_repeats_to_consume' |
684 | | // can be set to 0 to peek at the value without consuming repeats. |
685 | | T GetRepeatedValue(int32_t num_repeats_to_consume); |
686 | | |
687 | | // Return the size of the current literal run. Returns zero if the current run is |
688 | | // a repeated run or if no more runs can be read from the input. |
689 | | int32_t NextNumLiterals(); |
690 | | |
691 | | // Consume 'num_literals_to_consume' literals from the current literal run, |
692 | | // copying the values to 'values'. 'num_literals_to_consume' must be <= |
693 | | // NextNumLiterals(). Returns true if the requested number of literals were |
694 | | // successfully read or false if an error was encountered, e.g. the input was |
695 | | // truncated. |
696 | | bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT; |
697 | | |
698 | | // Consume 'num_values_to_consume' values and copy them to 'values'. |
699 | | // Returns the number of consumed values or 0 if an error occurred. |
700 | | uint32_t GetBatch(T* values, uint32_t batch_num); |
701 | | |
702 | | private: |
703 | | // Called when both 'literal_count_' and 'repeat_count_' have been exhausted. |
704 | | // Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal |
705 | | // or repeated run, or leaves both at 0 if no more values can be read (either because |
706 | | // the end of the input was reached or an error was encountered decoding). |
707 | | void NextCounts(); |
708 | | |
709 | | /// Fill the literal buffer. Invalid to call if there are already buffered literals. |
710 | | /// Return false if the input was truncated. This does not advance 'literal_count_'. |
711 | | bool FillLiteralBuffer() WARN_UNUSED_RESULT; |
712 | | |
713 | 11.6k | bool HaveBufferedLiterals() const { return literal_buffer_pos_ < num_buffered_literals_; } |
714 | | |
715 | | /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing |
716 | | /// 'literal_count_'. Returns the number of literals outputted. |
717 | | int32_t OutputBufferedLiterals(int32_t max_to_output, T* values); |
718 | | |
719 | | BatchedBitReader bit_reader_; |
720 | | |
721 | | // Number of bits needed to encode the value. Must be between 0 and 64 after |
722 | | // the decoder is initialized with a buffer. -1 indicates the decoder was not |
723 | | // initialized. |
724 | | int bit_width_ = -1; |
725 | | |
726 | | // If a repeated run, the number of repeats remaining in the current run to be read. |
727 | | // If the current run is a literal run, this is 0. |
728 | | int32_t repeat_count_ = 0; |
729 | | |
730 | | // If a literal run, the number of literals remaining in the current run to be read. |
731 | | // If the current run is a repeated run, this is 0. |
732 | | int32_t literal_count_ = 0; |
733 | | |
734 | | // If a repeated run, the current repeated value. |
735 | | T repeated_value_; |
736 | | |
737 | | // Size of buffer for literal values. Large enough to decode a full batch of 32 |
738 | | // literals. The buffer is needed to allow clients to read in batches that are not |
739 | | // multiples of 32. |
740 | | static constexpr int LITERAL_BUFFER_LEN = 32; |
741 | | |
742 | | // Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' is the |
743 | | // position of the next literal to be read from the buffer. |
744 | | T literal_buffer_[LITERAL_BUFFER_LEN]; |
745 | | int num_buffered_literals_ = 0; |
746 | | int literal_buffer_pos_ = 0; |
747 | | }; |
748 | | |
749 | | template <typename T> |
750 | 11.7k | int32_t RleBatchDecoder<T>::OutputBufferedLiterals(int32_t max_to_output, T* values) { |
751 | 11.7k | int32_t num_to_output = |
752 | 11.7k | std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_); |
753 | 11.7k | memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * num_to_output); |
754 | 11.7k | literal_buffer_pos_ += num_to_output; |
755 | 11.7k | literal_count_ -= num_to_output; |
756 | 11.7k | return num_to_output; |
757 | 11.7k | } |
758 | | |
759 | | template <typename T> |
760 | 974 | void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) { |
761 | 974 | bit_reader_.Reset(buffer, buffer_len); |
762 | 974 | bit_width_ = bit_width; |
763 | 974 | repeat_count_ = 0; |
764 | 974 | literal_count_ = 0; |
765 | 974 | num_buffered_literals_ = 0; |
766 | 974 | literal_buffer_pos_ = 0; |
767 | 974 | } |
768 | | |
769 | | template <typename T> |
770 | 12.1k | int32_t RleBatchDecoder<T>::NextNumRepeats() { |
771 | 12.1k | if (repeat_count_ > 0) return repeat_count_; |
772 | 12.1k | if (literal_count_ == 0) NextCounts(); |
773 | 12.1k | return repeat_count_; |
774 | 12.1k | } |
775 | | |
776 | | template <typename T> |
777 | 11.9k | void RleBatchDecoder<T>::NextCounts() { |
778 | | // Read the next run's indicator int, it could be a literal or repeated run. |
779 | | // The int is encoded as a ULEB128-encoded value. |
780 | 11.9k | uint32_t indicator_value = 0; |
781 | 11.9k | if (UNLIKELY(!bit_reader_.GetUleb128<uint32_t>(&indicator_value))) { |
782 | 0 | return; |
783 | 0 | } |
784 | | |
785 | | // lsb indicates if it is a literal run or repeated run |
786 | 11.9k | bool is_literal = indicator_value & 1; |
787 | | |
788 | | // Don't try to handle run lengths that don't fit in an int32_t - just fail gracefully. |
789 | | // The Parquet standard does not allow longer runs - see PARQUET-1290. |
790 | 11.9k | uint32_t run_len = indicator_value >> 1; |
791 | 11.9k | if (is_literal) { |
792 | | // Use int64_t to avoid overflowing multiplication. |
793 | 11.4k | int64_t literal_count = static_cast<int64_t>(run_len) * 8; |
794 | 11.4k | if (UNLIKELY(literal_count > std::numeric_limits<int32_t>::max())) return; |
795 | 11.4k | literal_count_ = cast_set<int32_t>(literal_count); |
796 | 11.4k | } else { |
797 | 506 | if (UNLIKELY(run_len == 0)) return; |
798 | 506 | bool result = bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_); |
799 | 506 | if (UNLIKELY(!result)) return; |
800 | 506 | repeat_count_ = run_len; |
801 | 506 | } |
802 | 11.9k | } |
803 | | |
804 | | template <typename T> |
805 | 520 | T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) { |
806 | 520 | repeat_count_ -= num_repeats_to_consume; |
807 | 520 | return repeated_value_; |
808 | 520 | } |
809 | | |
810 | | template <typename T> |
811 | 11.6k | int32_t RleBatchDecoder<T>::NextNumLiterals() { |
812 | 11.6k | if (literal_count_ > 0) return literal_count_; |
813 | 0 | if (repeat_count_ == 0) NextCounts(); |
814 | 0 | return literal_count_; |
815 | 11.6k | } |
816 | | |
817 | | template <typename T> |
818 | 11.6k | bool RleBatchDecoder<T>::GetLiteralValues(int32_t num_literals_to_consume, T* values) { |
819 | 11.6k | int32_t num_consumed = 0; |
820 | | // Copy any buffered literals left over from previous calls. |
821 | 11.6k | if (HaveBufferedLiterals()) { |
822 | 163 | num_consumed = OutputBufferedLiterals(num_literals_to_consume, values); |
823 | 163 | } |
824 | | |
825 | 11.6k | int32_t num_remaining = num_literals_to_consume - num_consumed; |
826 | | // Copy literals directly to the output, bypassing 'literal_buffer_' when possible. |
827 | | // Need to round to a batch of 32 if the caller is consuming only part of the current |
828 | | // run avoid ending on a non-byte boundary. |
829 | 11.6k | int32_t num_to_bypass = |
830 | 11.6k | std::min<int32_t>(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32)); |
831 | 11.6k | if (num_to_bypass > 0) { |
832 | 11.0k | int num_read = bit_reader_.UnpackBatch(bit_width_, num_to_bypass, values + num_consumed); |
833 | | // If we couldn't read the expected number, that means the input was truncated. |
834 | 11.0k | if (num_read < num_to_bypass) return false; |
835 | 11.0k | literal_count_ -= num_to_bypass; |
836 | 11.0k | num_consumed += num_to_bypass; |
837 | 11.0k | num_remaining = num_literals_to_consume - num_consumed; |
838 | 11.0k | } |
839 | | |
840 | 11.6k | if (num_remaining > 0) { |
841 | | // We weren't able to copy all the literals requested directly from the input. |
842 | | // Buffer literals and copy over the requested number. |
843 | 11.5k | if (UNLIKELY(!FillLiteralBuffer())) return false; |
844 | 11.5k | OutputBufferedLiterals(num_remaining, values + num_consumed); |
845 | 11.5k | } |
846 | 11.6k | return true; |
847 | 11.6k | } |
848 | | |
849 | | template <typename T> |
850 | 11.5k | bool RleBatchDecoder<T>::FillLiteralBuffer() { |
851 | 11.5k | int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN, literal_count_); |
852 | 11.5k | num_buffered_literals_ = bit_reader_.UnpackBatch(bit_width_, num_to_buffer, literal_buffer_); |
853 | | // If we couldn't read the expected number, that means the input was truncated. |
854 | 11.5k | if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false; |
855 | 11.5k | literal_buffer_pos_ = 0; |
856 | 11.5k | return true; |
857 | 11.5k | } |
858 | | |
859 | | template <typename T> |
860 | 1.39k | uint32_t RleBatchDecoder<T>::GetBatch(T* values, uint32_t batch_num) { |
861 | 1.39k | uint32_t num_consumed = 0; |
862 | 13.5k | while (num_consumed < batch_num) { |
863 | | // Add RLE encoded values by repeating the current value this number of times. |
864 | 12.1k | uint32_t num_repeats = NextNumRepeats(); |
865 | 12.1k | if (num_repeats > 0) { |
866 | 520 | int32_t num_repeats_to_set = std::min(num_repeats, batch_num - num_consumed); |
867 | 520 | T repeated_value = GetRepeatedValue(num_repeats_to_set); |
868 | 1.34M | for (int i = 0; i < num_repeats_to_set; ++i) { |
869 | 1.34M | values[num_consumed + i] = repeated_value; |
870 | 1.34M | } |
871 | 520 | num_consumed += num_repeats_to_set; |
872 | 520 | continue; |
873 | 520 | } |
874 | | |
875 | | // Add remaining literal values, if any. |
876 | 11.6k | uint32_t num_literals = NextNumLiterals(); |
877 | 11.6k | if (num_literals == 0) { |
878 | 0 | break; |
879 | 0 | } |
880 | 11.6k | uint32_t num_literals_to_set = std::min(num_literals, batch_num - num_consumed); |
881 | 11.6k | if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) { |
882 | 0 | return 0; |
883 | 0 | } |
884 | 11.6k | num_consumed += num_literals_to_set; |
885 | 11.6k | } |
886 | 1.39k | return num_consumed; |
887 | 1.39k | } |
888 | | #include "common/compile_check_end.h" |
889 | | } // namespace doris |