/root/doris/be/src/util/rle_encoding.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | #pragma once |
18 | | |
19 | | #include <glog/logging.h> |
20 | | |
21 | | #include <limits> // IWYU pragma: keep |
22 | | |
23 | | #include "util/bit_stream_utils.inline.h" |
24 | | #include "util/bit_util.h" |
25 | | |
26 | | namespace doris { |
27 | | |
28 | | // Utility classes to do run length encoding (RLE) for fixed bit width values. If runs |
29 | | // are sufficiently long, RLE is used, otherwise, the values are just bit-packed |
30 | | // (literal encoding). |
31 | | // For both types of runs, there is a byte-aligned indicator which encodes the length |
32 | | // of the run and the type of the run. |
33 | | // This encoding has the benefit that when there aren't any long enough runs, values |
34 | | // are always decoded at fixed (can be precomputed) bit offsets OR both the value and |
35 | | // the run length are byte aligned. This allows for very efficient decoding |
36 | | // implementations. |
37 | | // The encoding is: |
38 | | // encoded-block := run* |
39 | | // run := literal-run | repeated-run |
40 | | // literal-run := literal-indicator < literal bytes > |
41 | | // repeated-run := repeated-indicator < repeated value. padded to byte boundary > |
42 | | // literal-indicator := varint_encode( number_of_groups << 1 | 1) |
43 | | // repeated-indicator := varint_encode( number_of_repetitions << 1 ) |
44 | | // |
45 | | // Each run is preceded by a varint. The varint's least significant bit is |
46 | | // used to indicate whether the run is a literal run or a repeated run. The rest |
47 | | // of the varint is used to determine the length of the run (eg how many times the |
48 | | // value repeats). |
49 | | // |
50 | | // In the case of literal runs, the run length is always a multiple of 8 (i.e. encode |
51 | | // in groups of 8), so that no matter the bit-width of the value, the sequence will end |
52 | | // on a byte boundary without padding. |
53 | | // Given that we know it is a multiple of 8, we store the number of 8-groups rather than |
54 | | // the actual number of encoded ints. (This means that the total number of encoded values |
55 | | // can not be determined from the encoded data, since the number of values in the last |
56 | | // group may not be a multiple of 8). |
57 | | // There is a break-even point when it is more storage efficient to do run length |
58 | | // encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes |
59 | | // for both the repeated encoding or the literal encoding. This value can always |
60 | | // be computed based on the bit-width. |
61 | | // TODO: think about how to use this for strings. The bit packing isn't quite the same. |
62 | | // |
63 | | // Examples with bit-width 1 (eg encoding booleans): |
64 | | // ---------------------------------------- |
65 | | // 100 1s followed by 100 0s: |
66 | | // <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte> |
67 | | // - (total 4 bytes) |
68 | | // |
69 | | // alternating 1s and 0s (200 total): |
70 | | // 200 ints = 25 groups of 8 |
71 | | // <varint((25 << 1) | 1)> <25 bytes of values, bitpacked> |
72 | | // (total 26 bytes, 1 byte overhead) |
73 | | // |
74 | | |
75 | | // Decoder class for RLE encoded data. |
76 | | // |
77 | | // NOTE: the encoded format does not have any length prefix or any other way of |
78 | | // indicating that the encoded sequence ends at a certain point, so the Decoder |
79 | | // methods may return some extra bits at the end before the read methods start |
80 | | // to return 0/false. |
81 | | template <typename T> |
82 | | class RleDecoder { |
83 | | public: |
84 | | // Create a decoder object. buffer/buffer_len is the decoded data. |
85 | | // bit_width is the width of each value (before encoding). |
86 | | RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width) |
87 | | : bit_reader_(buffer, buffer_len), |
88 | | bit_width_(bit_width), |
89 | | current_value_(0), |
90 | | repeat_count_(0), |
91 | | literal_count_(0), |
92 | 853 | rewind_state_(CANT_REWIND) { |
93 | 853 | DCHECK_GE(bit_width_, 1); |
94 | 853 | DCHECK_LE(bit_width_, 64); |
95 | 853 | } _ZN5doris10RleDecoderImEC2EPKhii Line | Count | Source | 92 | 394 | rewind_state_(CANT_REWIND) { | 93 | 394 | DCHECK_GE(bit_width_, 1); | 94 | 394 | DCHECK_LE(bit_width_, 64); | 95 | 394 | } |
_ZN5doris10RleDecoderIbEC2EPKhii Line | Count | Source | 92 | 409 | rewind_state_(CANT_REWIND) { | 93 | 409 | DCHECK_GE(bit_width_, 1); | 94 | 409 | DCHECK_LE(bit_width_, 64); | 95 | 409 | } |
_ZN5doris10RleDecoderIhEC2EPKhii Line | Count | Source | 92 | 5 | rewind_state_(CANT_REWIND) { | 93 | 5 | DCHECK_GE(bit_width_, 1); | 94 | 5 | DCHECK_LE(bit_width_, 64); | 95 | 5 | } |
_ZN5doris10RleDecoderIsEC2EPKhii Line | Count | Source | 92 | 45 | rewind_state_(CANT_REWIND) { | 93 | 45 | DCHECK_GE(bit_width_, 1); | 94 | 45 | DCHECK_LE(bit_width_, 64); | 95 | 45 | } |
|
96 | | |
97 | 25.6k | RleDecoder() {} _ZN5doris10RleDecoderIbEC2Ev Line | Count | Source | 97 | 25.5k | RleDecoder() {} |
_ZN5doris10RleDecoderIhEC2Ev Line | Count | Source | 97 | 6 | RleDecoder() {} |
_ZN5doris10RleDecoderIsEC2Ev Line | Count | Source | 97 | 87 | RleDecoder() {} |
|
98 | | |
99 | | // Skip n values, and returns the number of non-zero entries skipped. |
100 | | size_t Skip(size_t to_skip); |
101 | | |
102 | | // Gets the next value. Returns false if there are no more. |
103 | | bool Get(T* val); |
104 | | |
105 | | // Seek to the previous value. |
106 | | void RewindOne(); |
107 | | |
108 | | // Gets the next run of the same 'val'. Returns 0 if there is no |
109 | | // more data to be decoded. Will return a run of at most 'max_run' |
110 | | // values. If there are more values than this, the next call to |
111 | | // GetNextRun will return more from the same run. |
112 | | size_t GetNextRun(T* val, size_t max_run); |
113 | | |
114 | | size_t get_values(T* values, size_t num_values); |
115 | | |
116 | | // Get the count of current repeated value |
117 | | size_t repeated_count(); |
118 | | |
119 | | // Get current repeated value, make sure that count equals repeated_count() |
120 | | T get_repeated_value(size_t count); |
121 | | |
122 | 0 | const BitReader& bit_reader() const { return bit_reader_; } |
123 | | |
124 | | private: |
125 | | bool ReadHeader(); |
126 | | |
127 | | enum RewindState { REWIND_LITERAL, REWIND_RUN, CANT_REWIND }; |
128 | | |
129 | | BitReader bit_reader_; |
130 | | int bit_width_; |
131 | | uint64_t current_value_; |
132 | | uint32_t repeat_count_; |
133 | | uint32_t literal_count_; |
134 | | RewindState rewind_state_; |
135 | | }; |
136 | | |
137 | | // Class to incrementally build the rle data. |
138 | | // The encoding has two modes: encoding repeated runs and literal runs. |
139 | | // If the run is sufficiently short, it is more efficient to encode as a literal run. |
140 | | // This class does so by buffering 8 values at a time. If they are not all the same |
141 | | // they are added to the literal run. If they are the same, they are added to the |
142 | | // repeated run. When we switch modes, the previous run is flushed out. |
143 | | template <typename T> |
144 | | class RleEncoder { |
145 | | public: |
146 | | // buffer: buffer to write bits to. |
147 | | // bit_width: max number of bits for value. |
148 | | // TODO: consider adding a min_repeated_run_length so the caller can control |
149 | | // when values should be encoded as repeated runs. Currently this is derived |
150 | | // based on the bit_width, which can determine a storage optimal choice. |
151 | | explicit RleEncoder(faststring* buffer, int bit_width) |
152 | 3.66k | : bit_width_(bit_width), bit_writer_(buffer) { |
153 | 3.66k | DCHECK_GE(bit_width_, 1); |
154 | 3.66k | DCHECK_LE(bit_width_, 64); |
155 | 3.66k | Clear(); |
156 | 3.66k | } _ZN5doris10RleEncoderImEC2EPNS_10faststringEi Line | Count | Source | 152 | 394 | : bit_width_(bit_width), bit_writer_(buffer) { | 153 | 394 | DCHECK_GE(bit_width_, 1); | 154 | 394 | DCHECK_LE(bit_width_, 64); | 155 | 394 | Clear(); | 156 | 394 | } |
_ZN5doris10RleEncoderIbEC2EPNS_10faststringEi Line | Count | Source | 152 | 3.27k | : bit_width_(bit_width), bit_writer_(buffer) { | 153 | 3.27k | DCHECK_GE(bit_width_, 1); | 154 | 3.27k | DCHECK_LE(bit_width_, 64); | 155 | 3.27k | Clear(); | 156 | 3.27k | } |
|
157 | | |
158 | | // Reserve 'num_bytes' bytes for a plain encoded header, set each |
159 | | // byte with 'val': this is used for the RLE-encoded data blocks in |
160 | | // order to be able to able to store the initial ordinal position |
161 | | // and number of elements. This is a part of RleEncoder in order to |
162 | | // maintain the correct offset in 'buffer'. |
163 | | void Reserve(int num_bytes, uint8_t val); |
164 | | |
165 | | // Encode value. This value must be representable with bit_width_ bits. |
166 | | void Put(T value, size_t run_length = 1); |
167 | | |
168 | | // Flushes any pending values to the underlying buffer. |
169 | | // Returns the total number of bytes written |
170 | | int Flush(); |
171 | | |
172 | | // Resets all the state in the encoder. |
173 | | void Clear(); |
174 | | |
175 | 302 | int32_t len() const { return bit_writer_.bytes_written(); } |
176 | | |
177 | | private: |
178 | | // Flushes any buffered values. If this is part of a repeated run, this is largely |
179 | | // a no-op. |
180 | | // If it is part of a literal run, this will call FlushLiteralRun, which writes |
181 | | // out the buffered literal values. |
182 | | // If 'done' is true, the current run would be written even if it would normally |
183 | | // have been buffered more. This should only be called at the end, when the |
184 | | // encoder has received all values even if it would normally continue to be |
185 | | // buffered. |
186 | | void FlushBufferedValues(bool done); |
187 | | |
188 | | // Flushes literal values to the underlying buffer. If update_indicator_byte, |
189 | | // then the current literal run is complete and the indicator byte is updated. |
190 | | void FlushLiteralRun(bool update_indicator_byte); |
191 | | |
192 | | // Flushes a repeated run to the underlying buffer. |
193 | | void FlushRepeatedRun(); |
194 | | |
195 | | // Number of bits needed to encode the value. |
196 | | const int bit_width_; |
197 | | |
198 | | // Underlying buffer. |
199 | | BitWriter bit_writer_; |
200 | | |
201 | | // We need to buffer at most 8 values for literals. This happens when the |
202 | | // bit_width is 1 (so 8 values fit in one byte). |
203 | | // TODO: generalize this to other bit widths |
204 | | uint64_t buffered_values_[8]; |
205 | | |
206 | | // Number of values in buffered_values_ |
207 | | int num_buffered_values_; |
208 | | |
209 | | // The current (also last) value that was written and the count of how |
210 | | // many times in a row that value has been seen. This is maintained even |
211 | | // if we are in a literal run. If the repeat_count_ get high enough, we switch |
212 | | // to encoding repeated runs. |
213 | | uint64_t current_value_; |
214 | | int repeat_count_; |
215 | | |
216 | | // Number of literals in the current run. This does not include the literals |
217 | | // that might be in buffered_values_. Only after we've got a group big enough |
218 | | // can we decide if they should part of the literal_count_ or repeat_count_ |
219 | | int literal_count_; |
220 | | |
221 | | // Index of a byte in the underlying buffer that stores the indicator byte. |
222 | | // This is reserved as soon as we need a literal run but the value is written |
223 | | // when the literal run is complete. We maintain an index rather than a pointer |
224 | | // into the underlying buffer because the pointer value may become invalid if |
225 | | // the underlying buffer is resized. |
226 | | int literal_indicator_byte_idx_; |
227 | | }; |
228 | | |
229 | | template <typename T> |
230 | 344k | bool RleDecoder<T>::ReadHeader() { |
231 | 344k | DCHECK(bit_reader_.is_initialized()); |
232 | 344k | if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) { |
233 | | // Read the next run's indicator int, it could be a literal or repeated run |
234 | | // The int is encoded as a vlq-encoded value. |
235 | 23.6k | uint32_t indicator_value = 0; |
236 | 23.6k | bool result = bit_reader_.GetVlqInt(&indicator_value); |
237 | 23.6k | if (PREDICT_FALSE(!result)) { |
238 | 276 | return false; |
239 | 276 | } |
240 | | |
241 | | // lsb indicates if it is a literal run or repeated run |
242 | 23.3k | bool is_literal = indicator_value & 1; |
243 | 23.3k | if (is_literal) { |
244 | 1.33k | literal_count_ = (indicator_value >> 1) * 8; |
245 | 1.33k | DCHECK_GT(literal_count_, 0); |
246 | 22.0k | } else { |
247 | 22.0k | repeat_count_ = indicator_value >> 1; |
248 | 22.0k | DCHECK_GT(repeat_count_, 0); |
249 | 22.0k | bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), |
250 | 22.0k | reinterpret_cast<T*>(¤t_value_)); |
251 | 22.0k | DCHECK(result); |
252 | 22.0k | } |
253 | 23.3k | } |
254 | 344k | return true; |
255 | 344k | } _ZN5doris10RleDecoderImE10ReadHeaderEv Line | Count | Source | 230 | 244k | bool RleDecoder<T>::ReadHeader() { | 231 | 244k | DCHECK(bit_reader_.is_initialized()); | 232 | 244k | if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) { | 233 | | // Read the next run's indicator int, it could be a literal or repeated run | 234 | | // The int is encoded as a vlq-encoded value. | 235 | 3.03k | uint32_t indicator_value = 0; | 236 | 3.03k | bool result = bit_reader_.GetVlqInt(&indicator_value); | 237 | 3.03k | if (PREDICT_FALSE(!result)) { | 238 | 0 | return false; | 239 | 0 | } | 240 | | | 241 | | // lsb indicates if it is a literal run or repeated run | 242 | 3.03k | bool is_literal = indicator_value & 1; | 243 | 3.03k | if (is_literal) { | 244 | 1.12k | literal_count_ = (indicator_value >> 1) * 8; | 245 | 1.12k | DCHECK_GT(literal_count_, 0); | 246 | 1.91k | } else { | 247 | 1.91k | repeat_count_ = indicator_value >> 1; | 248 | 1.91k | DCHECK_GT(repeat_count_, 0); | 249 | 1.91k | bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), | 250 | 1.91k | reinterpret_cast<T*>(¤t_value_)); | 251 | 1.91k | DCHECK(result); | 252 | 1.91k | } | 253 | 3.03k | } | 254 | 244k | return true; | 255 | 244k | } |
_ZN5doris10RleDecoderIbE10ReadHeaderEv Line | Count | Source | 230 | 100k | bool RleDecoder<T>::ReadHeader() { | 231 | 100k | DCHECK(bit_reader_.is_initialized()); | 232 | 100k | if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) { | 233 | | // Read the next run's indicator int, it could be a literal or repeated run | 234 | | // The int is encoded as a vlq-encoded value. | 235 | 20.5k | uint32_t indicator_value = 0; | 236 | 20.5k | bool result = bit_reader_.GetVlqInt(&indicator_value); | 237 | 20.5k | if (PREDICT_FALSE(!result)) { | 238 | 276 | return false; | 239 | 276 | } | 240 | | | 241 | | // lsb indicates if it is a literal run or repeated run | 242 | 20.2k | bool is_literal = indicator_value & 1; | 243 | 20.2k | if (is_literal) { | 244 | 207 | literal_count_ = (indicator_value >> 1) * 8; | 245 | 207 | DCHECK_GT(literal_count_, 0); | 246 | 20.0k | } else { | 247 | 20.0k | repeat_count_ = indicator_value >> 1; | 248 | 20.0k | DCHECK_GT(repeat_count_, 0); | 249 | 20.0k | bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), | 250 | 20.0k | reinterpret_cast<T*>(¤t_value_)); | 251 | 20.0k | DCHECK(result); | 252 | 20.0k | } | 253 | 20.2k | } | 254 | 100k | return true; | 255 | 100k | } |
_ZN5doris10RleDecoderIsE10ReadHeaderEv Line | Count | Source | 230 | 49 | bool RleDecoder<T>::ReadHeader() { | 231 | 49 | DCHECK(bit_reader_.is_initialized()); | 232 | 49 | if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) { | 233 | | // Read the next run's indicator int, it could be a literal or repeated run | 234 | | // The int is encoded as a vlq-encoded value. | 235 | 45 | uint32_t indicator_value = 0; | 236 | 45 | bool result = bit_reader_.GetVlqInt(&indicator_value); | 237 | 45 | if (PREDICT_FALSE(!result)) { | 238 | 0 | return false; | 239 | 0 | } | 240 | | | 241 | | // lsb indicates if it is a literal run or repeated run | 242 | 45 | bool is_literal = indicator_value & 1; | 243 | 45 | if (is_literal) { | 244 | 3 | literal_count_ = (indicator_value >> 1) * 8; | 245 | 3 | DCHECK_GT(literal_count_, 0); | 246 | 42 | } else { | 247 | 42 | repeat_count_ = indicator_value >> 1; | 248 | 42 | DCHECK_GT(repeat_count_, 0); | 249 | 42 | bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), | 250 | 42 | reinterpret_cast<T*>(¤t_value_)); | 251 | 42 | DCHECK(result); | 252 | 42 | } | 253 | 45 | } | 254 | 49 | return true; | 255 | 49 | } |
_ZN5doris10RleDecoderIhE10ReadHeaderEv Line | Count | Source | 230 | 5 | bool RleDecoder<T>::ReadHeader() { | 231 | 5 | DCHECK(bit_reader_.is_initialized()); | 232 | 5 | if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) { | 233 | | // Read the next run's indicator int, it could be a literal or repeated run | 234 | | // The int is encoded as a vlq-encoded value. | 235 | 5 | uint32_t indicator_value = 0; | 236 | 5 | bool result = bit_reader_.GetVlqInt(&indicator_value); | 237 | 5 | if (PREDICT_FALSE(!result)) { | 238 | 0 | return false; | 239 | 0 | } | 240 | | | 241 | | // lsb indicates if it is a literal run or repeated run | 242 | 5 | bool is_literal = indicator_value & 1; | 243 | 5 | if (is_literal) { | 244 | 5 | literal_count_ = (indicator_value >> 1) * 8; | 245 | 5 | DCHECK_GT(literal_count_, 0); | 246 | 5 | } else { | 247 | 0 | repeat_count_ = indicator_value >> 1; | 248 | 0 | DCHECK_GT(repeat_count_, 0); | 249 | 0 | bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), | 250 | 0 | reinterpret_cast<T*>(¤t_value_)); | 251 | 0 | DCHECK(result); | 252 | 0 | } | 253 | 5 | } | 254 | 5 | return true; | 255 | 5 | } |
|
256 | | |
257 | | template <typename T> |
258 | 247k | bool RleDecoder<T>::Get(T* val) { |
259 | 247k | DCHECK(bit_reader_.is_initialized()); |
260 | 247k | if (PREDICT_FALSE(!ReadHeader())) { |
261 | 0 | return false; |
262 | 0 | } |
263 | | |
264 | 247k | if (PREDICT_TRUE(repeat_count_ > 0)) { |
265 | 158k | *val = current_value_; |
266 | 158k | --repeat_count_; |
267 | 158k | rewind_state_ = REWIND_RUN; |
268 | 158k | } else { |
269 | 89.0k | DCHECK(literal_count_ > 0); |
270 | 89.0k | bool result = bit_reader_.GetValue(bit_width_, val); |
271 | 89.0k | DCHECK(result); |
272 | 89.0k | --literal_count_; |
273 | 89.0k | rewind_state_ = REWIND_LITERAL; |
274 | 89.0k | } |
275 | | |
276 | 247k | return true; |
277 | 247k | } _ZN5doris10RleDecoderImE3GetEPm Line | Count | Source | 258 | 244k | bool RleDecoder<T>::Get(T* val) { | 259 | 244k | DCHECK(bit_reader_.is_initialized()); | 260 | 244k | if (PREDICT_FALSE(!ReadHeader())) { | 261 | 0 | return false; | 262 | 0 | } | 263 | | | 264 | 244k | if (PREDICT_TRUE(repeat_count_ > 0)) { | 265 | 155k | *val = current_value_; | 266 | 155k | --repeat_count_; | 267 | 155k | rewind_state_ = REWIND_RUN; | 268 | 155k | } else { | 269 | 88.9k | DCHECK(literal_count_ > 0); | 270 | 88.9k | bool result = bit_reader_.GetValue(bit_width_, val); | 271 | 88.9k | DCHECK(result); | 272 | 88.9k | --literal_count_; | 273 | 88.9k | rewind_state_ = REWIND_LITERAL; | 274 | 88.9k | } | 275 | | | 276 | 244k | return true; | 277 | 244k | } |
_ZN5doris10RleDecoderIbE3GetEPb Line | Count | Source | 258 | 3.17k | bool RleDecoder<T>::Get(T* val) { | 259 | 3.17k | DCHECK(bit_reader_.is_initialized()); | 260 | 3.17k | if (PREDICT_FALSE(!ReadHeader())) { | 261 | 0 | return false; | 262 | 0 | } | 263 | | | 264 | 3.17k | if (PREDICT_TRUE(repeat_count_ > 0)) { | 265 | 3.08k | *val = current_value_; | 266 | 3.08k | --repeat_count_; | 267 | 3.08k | rewind_state_ = REWIND_RUN; | 268 | 3.08k | } else { | 269 | 93 | DCHECK(literal_count_ > 0); | 270 | 93 | bool result = bit_reader_.GetValue(bit_width_, val); | 271 | 93 | DCHECK(result); | 272 | 93 | --literal_count_; | 273 | 93 | rewind_state_ = REWIND_LITERAL; | 274 | 93 | } | 275 | | | 276 | 3.17k | return true; | 277 | 3.17k | } |
_ZN5doris10RleDecoderIsE3GetEPs Line | Count | Source | 258 | 6 | bool RleDecoder<T>::Get(T* val) { | 259 | 6 | DCHECK(bit_reader_.is_initialized()); | 260 | 6 | if (PREDICT_FALSE(!ReadHeader())) { | 261 | 0 | return false; | 262 | 0 | } | 263 | | | 264 | 6 | if (PREDICT_TRUE(repeat_count_ > 0)) { | 265 | 6 | *val = current_value_; | 266 | 6 | --repeat_count_; | 267 | 6 | rewind_state_ = REWIND_RUN; | 268 | 6 | } else { | 269 | 0 | DCHECK(literal_count_ > 0); | 270 | 0 | bool result = bit_reader_.GetValue(bit_width_, val); | 271 | 0 | DCHECK(result); | 272 | 0 | --literal_count_; | 273 | 0 | rewind_state_ = REWIND_LITERAL; | 274 | 0 | } | 275 | | | 276 | 6 | return true; | 277 | 6 | } |
|
278 | | |
279 | | template <typename T> |
280 | 2 | void RleDecoder<T>::RewindOne() { |
281 | 2 | DCHECK(bit_reader_.is_initialized()); |
282 | | |
283 | 2 | switch (rewind_state_) { |
284 | 0 | case CANT_REWIND: |
285 | 0 | throw Exception(Status::FatalError("Can't rewind more than once after each read!")); |
286 | 0 | break; |
287 | 2 | case REWIND_RUN: |
288 | 2 | ++repeat_count_; |
289 | 2 | break; |
290 | 0 | case REWIND_LITERAL: { |
291 | 0 | bit_reader_.Rewind(bit_width_); |
292 | 0 | ++literal_count_; |
293 | 0 | break; |
294 | 0 | } |
295 | 2 | } |
296 | | |
297 | 2 | rewind_state_ = CANT_REWIND; |
298 | 2 | } |
299 | | |
300 | | template <typename T> |
301 | 78.0k | size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) { |
302 | 78.0k | DCHECK(bit_reader_.is_initialized()); |
303 | 78.0k | DCHECK_GT(max_run, 0); |
304 | 78.0k | size_t ret = 0; |
305 | 78.0k | size_t rem = max_run; |
306 | 97.3k | while (ReadHeader()) { |
307 | 97.1k | if (PREDICT_TRUE(repeat_count_ > 0)) { |
308 | 86.3k | if (PREDICT_FALSE(ret > 0 && *val != current_value_)) { |
309 | 18.9k | return ret; |
310 | 18.9k | } |
311 | 67.3k | *val = current_value_; |
312 | 67.3k | if (repeat_count_ >= rem) { |
313 | | // The next run is longer than the amount of remaining data |
314 | | // that the caller wants to read. Only consume it partially. |
315 | 48.2k | repeat_count_ -= rem; |
316 | 48.2k | ret += rem; |
317 | 48.2k | return ret; |
318 | 48.2k | } |
319 | 19.1k | ret += repeat_count_; |
320 | 19.1k | rem -= repeat_count_; |
321 | 19.1k | repeat_count_ = 0; |
322 | 19.1k | } else { |
323 | 10.8k | DCHECK(literal_count_ > 0); |
324 | 10.8k | if (ret == 0) { |
325 | 10.7k | bool has_more = bit_reader_.GetValue(bit_width_, val); |
326 | 10.7k | DCHECK(has_more); |
327 | 10.7k | literal_count_--; |
328 | 10.7k | ret++; |
329 | 10.7k | rem--; |
330 | 10.7k | } |
331 | | |
332 | 41.8k | while (literal_count_ > 0) { |
333 | 41.7k | bool result = bit_reader_.GetValue(bit_width_, ¤t_value_); |
334 | 41.7k | DCHECK(result); |
335 | 41.7k | if (current_value_ != *val || rem == 0) { |
336 | 10.6k | bit_reader_.Rewind(bit_width_); |
337 | 10.6k | return ret; |
338 | 10.6k | } |
339 | 31.0k | ret++; |
340 | 31.0k | rem--; |
341 | 31.0k | literal_count_--; |
342 | 31.0k | } |
343 | 10.8k | } |
344 | 97.1k | } |
345 | 276 | return ret; |
346 | 78.0k | } _ZN5doris10RleDecoderIbE10GetNextRunEPbm Line | Count | Source | 301 | 78.0k | size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) { | 302 | 78.0k | DCHECK(bit_reader_.is_initialized()); | 303 | 78.0k | DCHECK_GT(max_run, 0); | 304 | 78.0k | size_t ret = 0; | 305 | 78.0k | size_t rem = max_run; | 306 | 97.3k | while (ReadHeader()) { | 307 | 97.1k | if (PREDICT_TRUE(repeat_count_ > 0)) { | 308 | 86.2k | if (PREDICT_FALSE(ret > 0 && *val != current_value_)) { | 309 | 18.9k | return ret; | 310 | 18.9k | } | 311 | 67.3k | *val = current_value_; | 312 | 67.3k | if (repeat_count_ >= rem) { | 313 | | // The next run is longer than the amount of remaining data | 314 | | // that the caller wants to read. Only consume it partially. | 315 | 48.2k | repeat_count_ -= rem; | 316 | 48.2k | ret += rem; | 317 | 48.2k | return ret; | 318 | 48.2k | } | 319 | 19.1k | ret += repeat_count_; | 320 | 19.1k | rem -= repeat_count_; | 321 | 19.1k | repeat_count_ = 0; | 322 | 19.1k | } else { | 323 | 10.8k | DCHECK(literal_count_ > 0); | 324 | 10.8k | if (ret == 0) { | 325 | 10.7k | bool has_more = bit_reader_.GetValue(bit_width_, val); | 326 | 10.7k | DCHECK(has_more); | 327 | 10.7k | literal_count_--; | 328 | 10.7k | ret++; | 329 | 10.7k | rem--; | 330 | 10.7k | } | 331 | | | 332 | 41.8k | while (literal_count_ > 0) { | 333 | 41.7k | bool result = bit_reader_.GetValue(bit_width_, ¤t_value_); | 334 | 41.7k | DCHECK(result); | 335 | 41.7k | if (current_value_ != *val || rem == 0) { | 336 | 10.6k | bit_reader_.Rewind(bit_width_); | 337 | 10.6k | return ret; | 338 | 10.6k | } | 339 | 31.0k | ret++; | 340 | 31.0k | rem--; | 341 | 31.0k | literal_count_--; | 342 | 31.0k | } | 343 | 10.8k | } | 344 | 97.1k | } | 345 | 276 | return ret; | 346 | 78.0k | } |
_ZN5doris10RleDecoderIsE10GetNextRunEPsm Line | Count | Source | 301 | 7 | size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) { | 302 | 7 | DCHECK(bit_reader_.is_initialized()); | 303 | 7 | DCHECK_GT(max_run, 0); | 304 | 7 | size_t ret = 0; | 305 | 7 | size_t rem = max_run; | 306 | 7 | while (ReadHeader()) { | 307 | 7 | if (PREDICT_TRUE(repeat_count_ > 0)) { | 308 | 7 | if (PREDICT_FALSE(ret > 0 && *val != current_value_)) { | 309 | 0 | return ret; | 310 | 0 | } | 311 | 7 | *val = current_value_; | 312 | 7 | if (repeat_count_ >= rem) { | 313 | | // The next run is longer than the amount of remaining data | 314 | | // that the caller wants to read. Only consume it partially. | 315 | 7 | repeat_count_ -= rem; | 316 | 7 | ret += rem; | 317 | 7 | return ret; | 318 | 7 | } | 319 | 0 | ret += repeat_count_; | 320 | 0 | rem -= repeat_count_; | 321 | 0 | repeat_count_ = 0; | 322 | 0 | } else { | 323 | 0 | DCHECK(literal_count_ > 0); | 324 | 0 | if (ret == 0) { | 325 | 0 | bool has_more = bit_reader_.GetValue(bit_width_, val); | 326 | 0 | DCHECK(has_more); | 327 | 0 | literal_count_--; | 328 | 0 | ret++; | 329 | 0 | rem--; | 330 | 0 | } | 331 | |
| 332 | 0 | while (literal_count_ > 0) { | 333 | 0 | bool result = bit_reader_.GetValue(bit_width_, ¤t_value_); | 334 | 0 | DCHECK(result); | 335 | 0 | if (current_value_ != *val || rem == 0) { | 336 | 0 | bit_reader_.Rewind(bit_width_); | 337 | 0 | return ret; | 338 | 0 | } | 339 | 0 | ret++; | 340 | 0 | rem--; | 341 | 0 | literal_count_--; | 342 | 0 | } | 343 | 0 | } | 344 | 7 | } | 345 | 0 | return ret; | 346 | 7 | } |
|
347 | | |
348 | | template <typename T> |
349 | 39 | size_t RleDecoder<T>::get_values(T* values, size_t num_values) { |
350 | 39 | size_t read_num = 0; |
351 | 120 | while (read_num < num_values) { |
352 | 81 | size_t read_this_time = num_values - read_num; |
353 | | |
354 | 81 | if (LIKELY(repeat_count_ > 0)) { |
355 | 33 | read_this_time = std::min((size_t)repeat_count_, read_this_time); |
356 | 33 | std::fill(values, values + read_this_time, current_value_); |
357 | 33 | values += read_this_time; |
358 | 33 | repeat_count_ -= read_this_time; |
359 | 33 | read_num += read_this_time; |
360 | 48 | } else if (literal_count_ > 0) { |
361 | 8 | read_this_time = std::min((size_t)literal_count_, read_this_time); |
362 | 59 | for (int i = 0; i < read_this_time; ++i) { |
363 | 51 | bool result = bit_reader_.GetValue(bit_width_, values); |
364 | 51 | DCHECK(result); |
365 | 51 | values++; |
366 | 51 | } |
367 | 8 | literal_count_ -= read_this_time; |
368 | 8 | read_num += read_this_time; |
369 | 40 | } else { |
370 | 40 | if (!ReadHeader()) { |
371 | 0 | return read_num; |
372 | 0 | } |
373 | 40 | } |
374 | 81 | } |
375 | 39 | return read_num; |
376 | 39 | } _ZN5doris10RleDecoderIhE10get_valuesEPhm Line | Count | Source | 349 | 5 | size_t RleDecoder<T>::get_values(T* values, size_t num_values) { | 350 | 5 | size_t read_num = 0; | 351 | 14 | while (read_num < num_values) { | 352 | 9 | size_t read_this_time = num_values - read_num; | 353 | | | 354 | 9 | if (LIKELY(repeat_count_ > 0)) { | 355 | 0 | read_this_time = std::min((size_t)repeat_count_, read_this_time); | 356 | 0 | std::fill(values, values + read_this_time, current_value_); | 357 | 0 | values += read_this_time; | 358 | 0 | repeat_count_ -= read_this_time; | 359 | 0 | read_num += read_this_time; | 360 | 9 | } else if (literal_count_ > 0) { | 361 | 5 | read_this_time = std::min((size_t)literal_count_, read_this_time); | 362 | 40 | for (int i = 0; i < read_this_time; ++i) { | 363 | 35 | bool result = bit_reader_.GetValue(bit_width_, values); | 364 | 35 | DCHECK(result); | 365 | 35 | values++; | 366 | 35 | } | 367 | 5 | literal_count_ -= read_this_time; | 368 | 5 | read_num += read_this_time; | 369 | 5 | } else { | 370 | 4 | if (!ReadHeader()) { | 371 | 0 | return read_num; | 372 | 0 | } | 373 | 4 | } | 374 | 9 | } | 375 | 5 | return read_num; | 376 | 5 | } |
_ZN5doris10RleDecoderIsE10get_valuesEPsm Line | Count | Source | 349 | 34 | size_t RleDecoder<T>::get_values(T* values, size_t num_values) { | 350 | 34 | size_t read_num = 0; | 351 | 106 | while (read_num < num_values) { | 352 | 72 | size_t read_this_time = num_values - read_num; | 353 | | | 354 | 72 | if (LIKELY(repeat_count_ > 0)) { | 355 | 33 | read_this_time = std::min((size_t)repeat_count_, read_this_time); | 356 | 33 | std::fill(values, values + read_this_time, current_value_); | 357 | 33 | values += read_this_time; | 358 | 33 | repeat_count_ -= read_this_time; | 359 | 33 | read_num += read_this_time; | 360 | 39 | } else if (literal_count_ > 0) { | 361 | 3 | read_this_time = std::min((size_t)literal_count_, read_this_time); | 362 | 19 | for (int i = 0; i < read_this_time; ++i) { | 363 | 16 | bool result = bit_reader_.GetValue(bit_width_, values); | 364 | 16 | DCHECK(result); | 365 | 16 | values++; | 366 | 16 | } | 367 | 3 | literal_count_ -= read_this_time; | 368 | 3 | read_num += read_this_time; | 369 | 36 | } else { | 370 | 36 | if (!ReadHeader()) { | 371 | 0 | return read_num; | 372 | 0 | } | 373 | 36 | } | 374 | 72 | } | 375 | 34 | return read_num; | 376 | 34 | } |
|
377 | | |
378 | | template <typename T> |
379 | | size_t RleDecoder<T>::repeated_count() { |
380 | | if (repeat_count_ > 0) { |
381 | | return repeat_count_; |
382 | | } |
383 | | if (literal_count_ == 0) { |
384 | | ReadHeader(); |
385 | | } |
386 | | return repeat_count_; |
387 | | } |
388 | | |
389 | | template <typename T> |
390 | | T RleDecoder<T>::get_repeated_value(size_t count) { |
391 | | DCHECK_GE(repeat_count_, count); |
392 | | repeat_count_ -= count; |
393 | | return current_value_; |
394 | | } |
395 | | |
396 | | template <typename T> |
397 | 5 | size_t RleDecoder<T>::Skip(size_t to_skip) { |
398 | 5 | DCHECK(bit_reader_.is_initialized()); |
399 | | |
400 | 5 | size_t set_count = 0; |
401 | 16 | while (to_skip > 0) { |
402 | 11 | bool result = ReadHeader(); |
403 | 11 | DCHECK(result); |
404 | | |
405 | 11 | if (PREDICT_TRUE(repeat_count_ > 0)) { |
406 | 7 | size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip; |
407 | 7 | repeat_count_ -= nskip; |
408 | 7 | to_skip -= nskip; |
409 | 7 | if (current_value_ != 0) { |
410 | 3 | set_count += nskip; |
411 | 3 | } |
412 | 7 | } else { |
413 | 4 | DCHECK(literal_count_ > 0); |
414 | 4 | size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip; |
415 | 4 | literal_count_ -= nskip; |
416 | 4 | to_skip -= nskip; |
417 | 60 | for (; nskip > 0; nskip--) { |
418 | 56 | T value = 0; |
419 | 56 | bool result = bit_reader_.GetValue(bit_width_, &value); |
420 | 56 | DCHECK(result); |
421 | 56 | if (value != 0) { |
422 | 28 | set_count++; |
423 | 28 | } |
424 | 56 | } |
425 | 4 | } |
426 | 11 | } |
427 | 5 | return set_count; |
428 | 5 | } _ZN5doris10RleDecoderIbE4SkipEm Line | Count | Source | 397 | 4 | size_t RleDecoder<T>::Skip(size_t to_skip) { | 398 | 4 | DCHECK(bit_reader_.is_initialized()); | 399 | | | 400 | 4 | size_t set_count = 0; | 401 | 14 | while (to_skip > 0) { | 402 | 10 | bool result = ReadHeader(); | 403 | 10 | DCHECK(result); | 404 | | | 405 | 10 | if (PREDICT_TRUE(repeat_count_ > 0)) { | 406 | 7 | size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip; | 407 | 7 | repeat_count_ -= nskip; | 408 | 7 | to_skip -= nskip; | 409 | 7 | if (current_value_ != 0) { | 410 | 3 | set_count += nskip; | 411 | 3 | } | 412 | 7 | } else { | 413 | 3 | DCHECK(literal_count_ > 0); | 414 | 3 | size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip; | 415 | 3 | literal_count_ -= nskip; | 416 | 3 | to_skip -= nskip; | 417 | 56 | for (; nskip > 0; nskip--) { | 418 | 53 | T value = 0; | 419 | 53 | bool result = bit_reader_.GetValue(bit_width_, &value); | 420 | 53 | DCHECK(result); | 421 | 53 | if (value != 0) { | 422 | 26 | set_count++; | 423 | 26 | } | 424 | 53 | } | 425 | 3 | } | 426 | 10 | } | 427 | 4 | return set_count; | 428 | 4 | } |
_ZN5doris10RleDecoderIhE4SkipEm Line | Count | Source | 397 | 1 | size_t RleDecoder<T>::Skip(size_t to_skip) { | 398 | 1 | DCHECK(bit_reader_.is_initialized()); | 399 | | | 400 | 1 | size_t set_count = 0; | 401 | 2 | while (to_skip > 0) { | 402 | 1 | bool result = ReadHeader(); | 403 | 1 | DCHECK(result); | 404 | | | 405 | 1 | if (PREDICT_TRUE(repeat_count_ > 0)) { | 406 | 0 | size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip; | 407 | 0 | repeat_count_ -= nskip; | 408 | 0 | to_skip -= nskip; | 409 | 0 | if (current_value_ != 0) { | 410 | 0 | set_count += nskip; | 411 | 0 | } | 412 | 1 | } else { | 413 | 1 | DCHECK(literal_count_ > 0); | 414 | 1 | size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip; | 415 | 1 | literal_count_ -= nskip; | 416 | 1 | to_skip -= nskip; | 417 | 4 | for (; nskip > 0; nskip--) { | 418 | 3 | T value = 0; | 419 | 3 | bool result = bit_reader_.GetValue(bit_width_, &value); | 420 | 3 | DCHECK(result); | 421 | 3 | if (value != 0) { | 422 | 2 | set_count++; | 423 | 2 | } | 424 | 3 | } | 425 | 1 | } | 426 | 1 | } | 427 | 1 | return set_count; | 428 | 1 | } |
|
429 | | |
430 | | // This function buffers input values 8 at a time. After seeing all 8 values, |
431 | | // it decides whether they should be encoded as a literal or repeated run. |
432 | | template <typename T> |
433 | 732k | void RleEncoder<T>::Put(T value, size_t run_length) { |
434 | 732k | DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); |
435 | | |
436 | | // TODO(perf): remove the loop and use the repeat_count_ |
437 | 3.35M | for (; run_length > 0; run_length--) { |
438 | 2.61M | if (PREDICT_TRUE(current_value_ == value)) { |
439 | 2.51M | ++repeat_count_; |
440 | 2.51M | if (repeat_count_ > 8) { |
441 | | // This is just a continuation of the current run, no need to buffer the |
442 | | // values. |
443 | | // Note that this is the fast path for long repeated runs. |
444 | 2.28M | continue; |
445 | 2.28M | } |
446 | 2.51M | } else { |
447 | 107k | if (repeat_count_ >= 8) { |
448 | | // We had a run that was long enough but it has ended. Flush the |
449 | | // current repeated run. |
450 | 21.4k | DCHECK_EQ(literal_count_, 0); |
451 | 21.4k | FlushRepeatedRun(); |
452 | 21.4k | } |
453 | 107k | repeat_count_ = 1; |
454 | 107k | current_value_ = value; |
455 | 107k | } |
456 | | |
457 | 329k | buffered_values_[num_buffered_values_] = value; |
458 | 329k | if (++num_buffered_values_ == 8) { |
459 | 41.0k | DCHECK_EQ(literal_count_ % 8, 0); |
460 | 41.0k | FlushBufferedValues(false); |
461 | 41.0k | } |
462 | 329k | } |
463 | 732k | } _ZN5doris10RleEncoderImE3PutEmm Line | Count | Source | 433 | 244k | void RleEncoder<T>::Put(T value, size_t run_length) { | 434 | 244k | DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); | 435 | | | 436 | | // TODO(perf): remove the loop and use the repeat_count_ | 437 | 488k | for (; run_length > 0; run_length--) { | 438 | 244k | if (PREDICT_TRUE(current_value_ == value)) { | 439 | 167k | ++repeat_count_; | 440 | 167k | if (repeat_count_ > 8) { | 441 | | // This is just a continuation of the current run, no need to buffer the | 442 | | // values. | 443 | | // Note that this is the fast path for long repeated runs. | 444 | 140k | continue; | 445 | 140k | } | 446 | 167k | } else { | 447 | 76.9k | if (repeat_count_ >= 8) { | 448 | | // We had a run that was long enough but it has ended. Flush the | 449 | | // current repeated run. | 450 | 1.65k | DCHECK_EQ(literal_count_, 0); | 451 | 1.65k | FlushRepeatedRun(); | 452 | 1.65k | } | 453 | 76.9k | repeat_count_ = 1; | 454 | 76.9k | current_value_ = value; | 455 | 76.9k | } | 456 | | | 457 | 103k | buffered_values_[num_buffered_values_] = value; | 458 | 103k | if (++num_buffered_values_ == 8) { | 459 | 12.9k | DCHECK_EQ(literal_count_ % 8, 0); | 460 | 12.9k | FlushBufferedValues(false); | 461 | 12.9k | } | 462 | 103k | } | 463 | 244k | } |
_ZN5doris10RleEncoderIbE3PutEbm Line | Count | Source | 433 | 487k | void RleEncoder<T>::Put(T value, size_t run_length) { | 434 | 487k | DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); | 435 | | | 436 | | // TODO(perf): remove the loop and use the repeat_count_ | 437 | 2.86M | for (; run_length > 0; run_length--) { | 438 | 2.37M | if (PREDICT_TRUE(current_value_ == value)) { | 439 | 2.34M | ++repeat_count_; | 440 | 2.34M | if (repeat_count_ > 8) { | 441 | | // This is just a continuation of the current run, no need to buffer the | 442 | | // values. | 443 | | // Note that this is the fast path for long repeated runs. | 444 | 2.14M | continue; | 445 | 2.14M | } | 446 | 2.34M | } else { | 447 | 30.1k | if (repeat_count_ >= 8) { | 448 | | // We had a run that was long enough but it has ended. Flush the | 449 | | // current repeated run. | 450 | 19.7k | DCHECK_EQ(literal_count_, 0); | 451 | 19.7k | FlushRepeatedRun(); | 452 | 19.7k | } | 453 | 30.1k | repeat_count_ = 1; | 454 | 30.1k | current_value_ = value; | 455 | 30.1k | } | 456 | | | 457 | 225k | buffered_values_[num_buffered_values_] = value; | 458 | 225k | if (++num_buffered_values_ == 8) { | 459 | 28.0k | DCHECK_EQ(literal_count_ % 8, 0); | 460 | 28.0k | FlushBufferedValues(false); | 461 | 28.0k | } | 462 | 225k | } | 463 | 487k | } |
|
464 | | |
465 | | template <typename T> |
466 | 17.4k | void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) { |
467 | 17.4k | if (literal_indicator_byte_idx_ < 0) { |
468 | | // The literal indicator byte has not been reserved yet, get one now. |
469 | 1.32k | literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1); |
470 | 1.32k | DCHECK_GE(literal_indicator_byte_idx_, 0); |
471 | 1.32k | } |
472 | | |
473 | | // Write all the buffered values as bit packed literals |
474 | 148k | for (int i = 0; i < num_buffered_values_; ++i) { |
475 | 130k | bit_writer_.PutValue(buffered_values_[i], bit_width_); |
476 | 130k | } |
477 | 17.4k | num_buffered_values_ = 0; |
478 | | |
479 | 17.4k | if (update_indicator_byte) { |
480 | | // At this point we need to write the indicator byte for the literal run. |
481 | | // We only reserve one byte, to allow for streaming writes of literal values. |
482 | | // The logic makes sure we flush literal runs often enough to not overrun |
483 | | // the 1 byte. |
484 | 1.32k | int num_groups = BitUtil::Ceil(literal_count_, 8); |
485 | 1.32k | int32_t indicator_value = (num_groups << 1) | 1; |
486 | 1.32k | DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); |
487 | 1.32k | bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value; |
488 | 1.32k | literal_indicator_byte_idx_ = -1; |
489 | 1.32k | literal_count_ = 0; |
490 | 1.32k | } |
491 | 17.4k | } _ZN5doris10RleEncoderImE15FlushLiteralRunEb Line | Count | Source | 466 | 12.0k | void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) { | 467 | 12.0k | if (literal_indicator_byte_idx_ < 0) { | 468 | | // The literal indicator byte has not been reserved yet, get one now. | 469 | 1.12k | literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1); | 470 | 1.12k | DCHECK_GE(literal_indicator_byte_idx_, 0); | 471 | 1.12k | } | 472 | | | 473 | | // Write all the buffered values as bit packed literals | 474 | 101k | for (int i = 0; i < num_buffered_values_; ++i) { | 475 | 88.9k | bit_writer_.PutValue(buffered_values_[i], bit_width_); | 476 | 88.9k | } | 477 | 12.0k | num_buffered_values_ = 0; | 478 | | | 479 | 12.0k | if (update_indicator_byte) { | 480 | | // At this point we need to write the indicator byte for the literal run. | 481 | | // We only reserve one byte, to allow for streaming writes of literal values. | 482 | | // The logic makes sure we flush literal runs often enough to not overrun | 483 | | // the 1 byte. | 484 | 1.12k | int num_groups = BitUtil::Ceil(literal_count_, 8); | 485 | 1.12k | int32_t indicator_value = (num_groups << 1) | 1; | 486 | 1.12k | DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); | 487 | 1.12k | bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value; | 488 | 1.12k | literal_indicator_byte_idx_ = -1; | 489 | 1.12k | literal_count_ = 0; | 490 | 1.12k | } | 491 | 12.0k | } |
_ZN5doris10RleEncoderIbE15FlushLiteralRunEb Line | Count | Source | 466 | 5.35k | void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) { | 467 | 5.35k | if (literal_indicator_byte_idx_ < 0) { | 468 | | // The literal indicator byte has not been reserved yet, get one now. | 469 | 209 | literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1); | 470 | 209 | DCHECK_GE(literal_indicator_byte_idx_, 0); | 471 | 209 | } | 472 | | | 473 | | // Write all the buffered values as bit packed literals | 474 | 47.1k | for (int i = 0; i < num_buffered_values_; ++i) { | 475 | 41.7k | bit_writer_.PutValue(buffered_values_[i], bit_width_); | 476 | 41.7k | } | 477 | 5.35k | num_buffered_values_ = 0; | 478 | | | 479 | 5.35k | if (update_indicator_byte) { | 480 | | // At this point we need to write the indicator byte for the literal run. | 481 | | // We only reserve one byte, to allow for streaming writes of literal values. | 482 | | // The logic makes sure we flush literal runs often enough to not overrun | 483 | | // the 1 byte. | 484 | 209 | int num_groups = BitUtil::Ceil(literal_count_, 8); | 485 | 209 | int32_t indicator_value = (num_groups << 1) | 1; | 486 | 209 | DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); | 487 | 209 | bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value; | 488 | 209 | literal_indicator_byte_idx_ = -1; | 489 | 209 | literal_count_ = 0; | 490 | 209 | } | 491 | 5.35k | } |
|
492 | | |
493 | | template <typename T> |
494 | 22.0k | void RleEncoder<T>::FlushRepeatedRun() { |
495 | 22.0k | DCHECK_GT(repeat_count_, 0); |
496 | | // The lsb of 0 indicates this is a repeated run |
497 | 22.0k | int32_t indicator_value = repeat_count_ << 1 | 0; |
498 | 22.0k | bit_writer_.PutVlqInt(indicator_value); |
499 | 22.0k | bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); |
500 | 22.0k | num_buffered_values_ = 0; |
501 | 22.0k | repeat_count_ = 0; |
502 | 22.0k | } _ZN5doris10RleEncoderImE16FlushRepeatedRunEv Line | Count | Source | 494 | 1.91k | void RleEncoder<T>::FlushRepeatedRun() { | 495 | 1.91k | DCHECK_GT(repeat_count_, 0); | 496 | | // The lsb of 0 indicates this is a repeated run | 497 | 1.91k | int32_t indicator_value = repeat_count_ << 1 | 0; | 498 | 1.91k | bit_writer_.PutVlqInt(indicator_value); | 499 | 1.91k | bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); | 500 | 1.91k | num_buffered_values_ = 0; | 501 | 1.91k | repeat_count_ = 0; | 502 | 1.91k | } |
_ZN5doris10RleEncoderIbE16FlushRepeatedRunEv Line | Count | Source | 494 | 20.0k | void RleEncoder<T>::FlushRepeatedRun() { | 495 | 20.0k | DCHECK_GT(repeat_count_, 0); | 496 | | // The lsb of 0 indicates this is a repeated run | 497 | 20.0k | int32_t indicator_value = repeat_count_ << 1 | 0; | 498 | 20.0k | bit_writer_.PutVlqInt(indicator_value); | 499 | 20.0k | bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); | 500 | 20.0k | num_buffered_values_ = 0; | 501 | 20.0k | repeat_count_ = 0; | 502 | 20.0k | } |
|
503 | | |
504 | | // Flush the values that have been buffered. At this point we decide whether |
505 | | // we need to switch between the run types or continue the current one. |
506 | | template <typename T> |
507 | 41.0k | void RleEncoder<T>::FlushBufferedValues(bool done) { |
508 | 41.0k | if (repeat_count_ >= 8) { |
509 | | // Clear the buffered values. They are part of the repeated run now and we |
510 | | // don't want to flush them out as literals. |
511 | 24.7k | num_buffered_values_ = 0; |
512 | 24.7k | if (literal_count_ != 0) { |
513 | | // There was a current literal run. All the values in it have been flushed |
514 | | // but we still need to update the indicator byte. |
515 | 919 | DCHECK_EQ(literal_count_ % 8, 0); |
516 | 919 | DCHECK_EQ(repeat_count_, 8); |
517 | 919 | FlushLiteralRun(true); |
518 | 919 | } |
519 | 24.7k | DCHECK_EQ(literal_count_, 0); |
520 | 24.7k | return; |
521 | 24.7k | } |
522 | | |
523 | 16.2k | literal_count_ += num_buffered_values_; |
524 | 16.2k | int num_groups = BitUtil::Ceil(literal_count_, 8); |
525 | 16.2k | if (num_groups + 1 >= (1 << 6)) { |
526 | | // We need to start a new literal run because the indicator byte we've reserved |
527 | | // cannot store more values. |
528 | 167 | DCHECK_GE(literal_indicator_byte_idx_, 0); |
529 | 167 | FlushLiteralRun(true); |
530 | 16.0k | } else { |
531 | 16.0k | FlushLiteralRun(done); |
532 | 16.0k | } |
533 | 16.2k | repeat_count_ = 0; |
534 | 16.2k | } _ZN5doris10RleEncoderImE19FlushBufferedValuesEb Line | Count | Source | 507 | 12.9k | void RleEncoder<T>::FlushBufferedValues(bool done) { | 508 | 12.9k | if (repeat_count_ >= 8) { | 509 | | // Clear the buffered values. They are part of the repeated run now and we | 510 | | // don't want to flush them out as literals. | 511 | 1.85k | num_buffered_values_ = 0; | 512 | 1.85k | if (literal_count_ != 0) { | 513 | | // There was a current literal run. All the values in it have been flushed | 514 | | // but we still need to update the indicator byte. | 515 | 858 | DCHECK_EQ(literal_count_ % 8, 0); | 516 | 858 | DCHECK_EQ(repeat_count_, 8); | 517 | 858 | FlushLiteralRun(true); | 518 | 858 | } | 519 | 1.85k | DCHECK_EQ(literal_count_, 0); | 520 | 1.85k | return; | 521 | 1.85k | } | 522 | | | 523 | 11.0k | literal_count_ += num_buffered_values_; | 524 | 11.0k | int num_groups = BitUtil::Ceil(literal_count_, 8); | 525 | 11.0k | if (num_groups + 1 >= (1 << 6)) { | 526 | | // We need to start a new literal run because the indicator byte we've reserved | 527 | | // cannot store more values. | 528 | 128 | DCHECK_GE(literal_indicator_byte_idx_, 0); | 529 | 128 | FlushLiteralRun(true); | 530 | 10.9k | } else { | 531 | 10.9k | FlushLiteralRun(done); | 532 | 10.9k | } | 533 | 11.0k | repeat_count_ = 0; | 534 | 11.0k | } |
_ZN5doris10RleEncoderIbE19FlushBufferedValuesEb Line | Count | Source | 507 | 28.0k | void RleEncoder<T>::FlushBufferedValues(bool done) { | 508 | 28.0k | if (repeat_count_ >= 8) { | 509 | | // Clear the buffered values. They are part of the repeated run now and we | 510 | | // don't want to flush them out as literals. | 511 | 22.9k | num_buffered_values_ = 0; | 512 | 22.9k | if (literal_count_ != 0) { | 513 | | // There was a current literal run. All the values in it have been flushed | 514 | | // but we still need to update the indicator byte. | 515 | 61 | DCHECK_EQ(literal_count_ % 8, 0); | 516 | 61 | DCHECK_EQ(repeat_count_, 8); | 517 | 61 | FlushLiteralRun(true); | 518 | 61 | } | 519 | 22.9k | DCHECK_EQ(literal_count_, 0); | 520 | 22.9k | return; | 521 | 22.9k | } | 522 | | | 523 | 5.18k | literal_count_ += num_buffered_values_; | 524 | 5.18k | int num_groups = BitUtil::Ceil(literal_count_, 8); | 525 | 5.18k | if (num_groups + 1 >= (1 << 6)) { | 526 | | // We need to start a new literal run because the indicator byte we've reserved | 527 | | // cannot store more values. | 528 | 39 | DCHECK_GE(literal_indicator_byte_idx_, 0); | 529 | 39 | FlushLiteralRun(true); | 530 | 5.14k | } else { | 531 | 5.14k | FlushLiteralRun(done); | 532 | 5.14k | } | 533 | 5.18k | repeat_count_ = 0; | 534 | 5.18k | } |
|
535 | | |
536 | | template <typename T> |
537 | 0 | void RleEncoder<T>::Reserve(int num_bytes, uint8_t val) { |
538 | 0 | for (int i = 0; i < num_bytes; ++i) { |
539 | 0 | bit_writer_.PutValue(val, 8); |
540 | 0 | } |
541 | 0 | } |
542 | | |
543 | | template <typename T> |
544 | 806 | int RleEncoder<T>::Flush() { |
545 | 806 | if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { |
546 | 804 | bool all_repeat = literal_count_ == 0 && |
547 | 804 | (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); |
548 | | // There is something pending, figure out if it's a repeated or literal run |
549 | 804 | if (repeat_count_ > 0 && all_repeat) { |
550 | 561 | FlushRepeatedRun(); |
551 | 561 | } else { |
552 | 243 | literal_count_ += num_buffered_values_; |
553 | 243 | FlushLiteralRun(true); |
554 | 243 | repeat_count_ = 0; |
555 | 243 | } |
556 | 804 | } |
557 | 806 | bit_writer_.Flush(); |
558 | 806 | DCHECK_EQ(num_buffered_values_, 0); |
559 | 806 | DCHECK_EQ(literal_count_, 0); |
560 | 806 | DCHECK_EQ(repeat_count_, 0); |
561 | 806 | return bit_writer_.bytes_written(); |
562 | 806 | } _ZN5doris10RleEncoderImE5FlushEv Line | Count | Source | 544 | 394 | int RleEncoder<T>::Flush() { | 545 | 394 | if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { | 546 | 394 | bool all_repeat = literal_count_ == 0 && | 547 | 394 | (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); | 548 | | // There is something pending, figure out if it's a repeated or literal run | 549 | 394 | if (repeat_count_ > 0 && all_repeat) { | 550 | 260 | FlushRepeatedRun(); | 551 | 260 | } else { | 552 | 134 | literal_count_ += num_buffered_values_; | 553 | 134 | FlushLiteralRun(true); | 554 | 134 | repeat_count_ = 0; | 555 | 134 | } | 556 | 394 | } | 557 | 394 | bit_writer_.Flush(); | 558 | 394 | DCHECK_EQ(num_buffered_values_, 0); | 559 | 394 | DCHECK_EQ(literal_count_, 0); | 560 | 394 | DCHECK_EQ(repeat_count_, 0); | 561 | 394 | return bit_writer_.bytes_written(); | 562 | 394 | } |
_ZN5doris10RleEncoderIbE5FlushEv Line | Count | Source | 544 | 412 | int RleEncoder<T>::Flush() { | 545 | 412 | if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { | 546 | 410 | bool all_repeat = literal_count_ == 0 && | 547 | 410 | (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); | 548 | | // There is something pending, figure out if it's a repeated or literal run | 549 | 410 | if (repeat_count_ > 0 && all_repeat) { | 550 | 301 | FlushRepeatedRun(); | 551 | 301 | } else { | 552 | 109 | literal_count_ += num_buffered_values_; | 553 | 109 | FlushLiteralRun(true); | 554 | 109 | repeat_count_ = 0; | 555 | 109 | } | 556 | 410 | } | 557 | 412 | bit_writer_.Flush(); | 558 | 412 | DCHECK_EQ(num_buffered_values_, 0); | 559 | 412 | DCHECK_EQ(literal_count_, 0); | 560 | 412 | DCHECK_EQ(repeat_count_, 0); | 561 | 412 | return bit_writer_.bytes_written(); | 562 | 412 | } |
|
563 | | |
564 | | template <typename T> |
565 | 6.54k | void RleEncoder<T>::Clear() { |
566 | 6.54k | current_value_ = 0; |
567 | 6.54k | repeat_count_ = 0; |
568 | 6.54k | num_buffered_values_ = 0; |
569 | 6.54k | literal_count_ = 0; |
570 | 6.54k | literal_indicator_byte_idx_ = -1; |
571 | 6.54k | bit_writer_.Clear(); |
572 | 6.54k | } _ZN5doris10RleEncoderImE5ClearEv Line | Count | Source | 565 | 394 | void RleEncoder<T>::Clear() { | 566 | 394 | current_value_ = 0; | 567 | 394 | repeat_count_ = 0; | 568 | 394 | num_buffered_values_ = 0; | 569 | 394 | literal_count_ = 0; | 570 | 394 | literal_indicator_byte_idx_ = -1; | 571 | 394 | bit_writer_.Clear(); | 572 | 394 | } |
_ZN5doris10RleEncoderIbE5ClearEv Line | Count | Source | 565 | 6.15k | void RleEncoder<T>::Clear() { | 566 | 6.15k | current_value_ = 0; | 567 | 6.15k | repeat_count_ = 0; | 568 | 6.15k | num_buffered_values_ = 0; | 569 | 6.15k | literal_count_ = 0; | 570 | 6.15k | literal_indicator_byte_idx_ = -1; | 571 | 6.15k | bit_writer_.Clear(); | 572 | 6.15k | } |
|
573 | | |
574 | | // Copy from https://github.com/apache/impala/blob/master/be/src/util/rle-encoding.h |
575 | | // Utility classes to do run length encoding (RLE) for fixed bit width values. If runs |
576 | | // are sufficiently long, RLE is used, otherwise, the values are just bit-packed |
577 | | // (literal encoding). |
578 | | // |
579 | | // For both types of runs, there is a byte-aligned indicator which encodes the length |
580 | | // of the run and the type of the run. |
581 | | // |
582 | | // This encoding has the benefit that when there aren't any long enough runs, values |
583 | | // are always decoded at fixed (can be precomputed) bit offsets OR both the value and |
584 | | // the run length are byte aligned. This allows for very efficient decoding |
585 | | // implementations. |
586 | | // The encoding is: |
587 | | // encoded-block := run* |
588 | | // run := literal-run | repeated-run |
589 | | // literal-run := literal-indicator < literal bytes > |
590 | | // repeated-run := repeated-indicator < repeated value. padded to byte boundary > |
591 | | // literal-indicator := varint_encode( number_of_groups << 1 | 1) |
592 | | // repeated-indicator := varint_encode( number_of_repetitions << 1 ) |
593 | | // |
594 | | // Each run is preceded by a varint. The varint's least significant bit is |
595 | | // used to indicate whether the run is a literal run or a repeated run. The rest |
596 | | // of the varint is used to determine the length of the run (eg how many times the |
597 | | // value repeats). |
598 | | // |
599 | | // In the case of literal runs, the run length is always a multiple of 8 (i.e. encode |
600 | | // in groups of 8), so that no matter the bit-width of the value, the sequence will end |
601 | | // on a byte boundary without padding. |
602 | | // Given that we know it is a multiple of 8, we store the number of 8-groups rather than |
603 | | // the actual number of encoded ints. (This means that the total number of encoded values |
604 | | // can not be determined from the encoded data, since the number of values in the last |
605 | | // group may not be a multiple of 8). For the last group of literal runs, we pad |
606 | | // the group to 8 with zeros. This allows for 8 at a time decoding on the read side |
607 | | // without the need for additional checks. |
608 | | // |
609 | | // There is a break-even point when it is more storage efficient to do run length |
610 | | // encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes |
611 | | // for both the repeated encoding or the literal encoding. This value can always |
612 | | // be computed based on the bit-width. |
613 | | // TODO: For 1 bit-width values it can be optimal to use 16 or 24 values, but more |
614 | | // investigation is needed to do this efficiently, see the reverted IMPALA-6658. |
615 | | // TODO: think about how to use this for strings. The bit packing isn't quite the same. |
616 | | // |
617 | | // Examples with bit-width 1 (eg encoding booleans): |
618 | | // ---------------------------------------- |
619 | | // 100 1s followed by 100 0s: |
620 | | // <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte> |
621 | | // - (total 4 bytes) |
622 | | // |
623 | | // alternating 1s and 0s (200 total): |
624 | | // 200 ints = 25 groups of 8 |
625 | | // <varint((25 << 1) | 1)> <25 bytes of values, bitpacked> |
626 | | // (total 26 bytes, 1 byte overhead) |
627 | | |
628 | | // RLE decoder with a batch-oriented interface that enables fast decoding. |
629 | | // Users of this class must first initialize the class to point to a buffer of |
630 | | // RLE-encoded data, passed into the constructor or Reset(). The provided |
631 | | // bit_width must be at most min(sizeof(T) * 8, BatchedBitReader::MAX_BITWIDTH). |
632 | | // Then they can decode data by checking NextNumRepeats()/NextNumLiterals() to |
633 | | // see if the next run is a repeated or literal run, then calling |
634 | | // GetRepeatedValue() or GetLiteralValues() respectively to read the values. |
635 | | // |
636 | | // End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0. |
637 | | // Other decoding errors are signalled by functions returning false. If an |
638 | | // error is encountered then it is not valid to read any more data until |
639 | | // Reset() is called. |
640 | | |
641 | | //bit-packed-run-len and rle-run-len must be in the range [1, 2^31 - 1]. |
642 | | // This means that a Parquet implementation can always store the run length in a signed 32-bit integer. |
643 | | template <typename T> |
644 | | class RleBatchDecoder { |
645 | | public: |
646 | 35 | RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) { |
647 | 35 | Reset(buffer, buffer_len, bit_width); |
648 | 35 | } |
649 | | |
650 | | RleBatchDecoder() = default; |
651 | | |
652 | | // Reset the decoder to read from a new buffer. |
653 | | void Reset(uint8_t* buffer, int buffer_len, int bit_width); |
654 | | |
655 | | // Return the size of the current repeated run. Returns zero if the current run is |
656 | | // a literal run or if no more runs can be read from the input. |
657 | | int32_t NextNumRepeats(); |
658 | | |
659 | | // Get the value of the current repeated run and consume the given number of repeats. |
660 | | // Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot |
661 | | // be greater than the remaining number of repeats in the run. 'num_repeats_to_consume' |
662 | | // can be set to 0 to peek at the value without consuming repeats. |
663 | | T GetRepeatedValue(int32_t num_repeats_to_consume); |
664 | | |
665 | | // Return the size of the current literal run. Returns zero if the current run is |
666 | | // a repeated run or if no more runs can be read from the input. |
667 | | int32_t NextNumLiterals(); |
668 | | |
669 | | // Consume 'num_literals_to_consume' literals from the current literal run, |
670 | | // copying the values to 'values'. 'num_literals_to_consume' must be <= |
671 | | // NextNumLiterals(). Returns true if the requested number of literals were |
672 | | // successfully read or false if an error was encountered, e.g. the input was |
673 | | // truncated. |
674 | | bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT; |
675 | | |
676 | | // Consume 'num_values_to_consume' values and copy them to 'values'. |
677 | | // Returns the number of consumed values or 0 if an error occurred. |
678 | | uint32_t GetBatch(T* values, uint32_t batch_num); |
679 | | |
680 | | private: |
681 | | // Called when both 'literal_count_' and 'repeat_count_' have been exhausted. |
682 | | // Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal |
683 | | // or repeated run, or leaves both at 0 if no more values can be read (either because |
684 | | // the end of the input was reached or an error was encountered decoding). |
685 | | void NextCounts(); |
686 | | |
687 | | /// Fill the literal buffer. Invalid to call if there are already buffered literals. |
688 | | /// Return false if the input was truncated. This does not advance 'literal_count_'. |
689 | | bool FillLiteralBuffer() WARN_UNUSED_RESULT; |
690 | | |
691 | 20 | bool HaveBufferedLiterals() const { return literal_buffer_pos_ < num_buffered_literals_; } |
692 | | |
693 | | /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing |
694 | | /// 'literal_count_'. Returns the number of literals outputted. |
695 | | int32_t OutputBufferedLiterals(int32_t max_to_output, T* values); |
696 | | |
697 | | BatchedBitReader bit_reader_; |
698 | | |
699 | | // Number of bits needed to encode the value. Must be between 0 and 64 after |
700 | | // the decoder is initialized with a buffer. -1 indicates the decoder was not |
701 | | // initialized. |
702 | | int bit_width_ = -1; |
703 | | |
704 | | // If a repeated run, the number of repeats remaining in the current run to be read. |
705 | | // If the current run is a literal run, this is 0. |
706 | | int32_t repeat_count_ = 0; |
707 | | |
708 | | // If a literal run, the number of literals remaining in the current run to be read. |
709 | | // If the current run is a repeated run, this is 0. |
710 | | int32_t literal_count_ = 0; |
711 | | |
712 | | // If a repeated run, the current repeated value. |
713 | | T repeated_value_; |
714 | | |
715 | | // Size of buffer for literal values. Large enough to decode a full batch of 32 |
716 | | // literals. The buffer is needed to allow clients to read in batches that are not |
717 | | // multiples of 32. |
718 | | static constexpr int LITERAL_BUFFER_LEN = 32; |
719 | | |
720 | | // Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' is the |
721 | | // position of the next literal to be read from the buffer. |
722 | | T literal_buffer_[LITERAL_BUFFER_LEN]; |
723 | | int num_buffered_literals_ = 0; |
724 | | int literal_buffer_pos_ = 0; |
725 | | }; |
726 | | |
727 | | template <typename T> |
728 | 20 | int32_t RleBatchDecoder<T>::OutputBufferedLiterals(int32_t max_to_output, T* values) { |
729 | 20 | int32_t num_to_output = |
730 | 20 | std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_); |
731 | 20 | memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * num_to_output); |
732 | 20 | literal_buffer_pos_ += num_to_output; |
733 | 20 | literal_count_ -= num_to_output; |
734 | 20 | return num_to_output; |
735 | 20 | } |
736 | | |
737 | | template <typename T> |
738 | 35 | void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) { |
739 | 35 | bit_reader_.Reset(buffer, buffer_len); |
740 | 35 | bit_width_ = bit_width; |
741 | 35 | repeat_count_ = 0; |
742 | 35 | literal_count_ = 0; |
743 | 35 | num_buffered_literals_ = 0; |
744 | 35 | literal_buffer_pos_ = 0; |
745 | 35 | } |
746 | | |
747 | | template <typename T> |
748 | 56 | int32_t RleBatchDecoder<T>::NextNumRepeats() { |
749 | 56 | if (repeat_count_ > 0) return repeat_count_; |
750 | 54 | if (literal_count_ == 0) NextCounts(); |
751 | 54 | return repeat_count_; |
752 | 56 | } |
753 | | |
754 | | template <typename T> |
755 | 54 | void RleBatchDecoder<T>::NextCounts() { |
756 | | // Read the next run's indicator int, it could be a literal or repeated run. |
757 | | // The int is encoded as a ULEB128-encoded value. |
758 | 54 | uint32_t indicator_value = 0; |
759 | 54 | if (UNLIKELY(!bit_reader_.GetUleb128<uint32_t>(&indicator_value))) { |
760 | 0 | return; |
761 | 0 | } |
762 | | |
763 | | // lsb indicates if it is a literal run or repeated run |
764 | 54 | bool is_literal = indicator_value & 1; |
765 | | |
766 | | // Don't try to handle run lengths that don't fit in an int32_t - just fail gracefully. |
767 | | // The Parquet standard does not allow longer runs - see PARQUET-1290. |
768 | 54 | uint32_t run_len = indicator_value >> 1; |
769 | 54 | if (is_literal) { |
770 | | // Use int64_t to avoid overflowing multiplication. |
771 | 20 | int64_t literal_count = static_cast<int64_t>(run_len) * 8; |
772 | 20 | if (UNLIKELY(literal_count > std::numeric_limits<int32_t>::max())) return; |
773 | 20 | literal_count_ = literal_count; |
774 | 34 | } else { |
775 | 34 | if (UNLIKELY(run_len == 0)) return; |
776 | 34 | bool result = bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_); |
777 | 34 | if (UNLIKELY(!result)) return; |
778 | 34 | repeat_count_ = run_len; |
779 | 34 | } |
780 | 54 | } |
781 | | |
782 | | template <typename T> |
783 | 36 | T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) { |
784 | 36 | repeat_count_ -= num_repeats_to_consume; |
785 | 36 | return repeated_value_; |
786 | 36 | } |
787 | | |
788 | | template <typename T> |
789 | 20 | int32_t RleBatchDecoder<T>::NextNumLiterals() { |
790 | 20 | if (literal_count_ > 0) return literal_count_; |
791 | 0 | if (repeat_count_ == 0) NextCounts(); |
792 | 0 | return literal_count_; |
793 | 20 | } |
794 | | |
795 | | template <typename T> |
796 | 20 | bool RleBatchDecoder<T>::GetLiteralValues(int32_t num_literals_to_consume, T* values) { |
797 | 20 | int32_t num_consumed = 0; |
798 | | // Copy any buffered literals left over from previous calls. |
799 | 20 | if (HaveBufferedLiterals()) { |
800 | 0 | num_consumed = OutputBufferedLiterals(num_literals_to_consume, values); |
801 | 0 | } |
802 | | |
803 | 20 | int32_t num_remaining = num_literals_to_consume - num_consumed; |
804 | | // Copy literals directly to the output, bypassing 'literal_buffer_' when possible. |
805 | | // Need to round to a batch of 32 if the caller is consuming only part of the current |
806 | | // run avoid ending on a non-byte boundary. |
807 | 20 | int32_t num_to_bypass = |
808 | 20 | std::min<int32_t>(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32)); |
809 | 20 | if (num_to_bypass > 0) { |
810 | 0 | int num_read = bit_reader_.UnpackBatch(bit_width_, num_to_bypass, values + num_consumed); |
811 | | // If we couldn't read the expected number, that means the input was truncated. |
812 | 0 | if (num_read < num_to_bypass) return false; |
813 | 0 | literal_count_ -= num_to_bypass; |
814 | 0 | num_consumed += num_to_bypass; |
815 | 0 | num_remaining = num_literals_to_consume - num_consumed; |
816 | 0 | } |
817 | | |
818 | 20 | if (num_remaining > 0) { |
819 | | // We weren't able to copy all the literals requested directly from the input. |
820 | | // Buffer literals and copy over the requested number. |
821 | 20 | if (UNLIKELY(!FillLiteralBuffer())) return false; |
822 | 20 | OutputBufferedLiterals(num_remaining, values + num_consumed); |
823 | 20 | } |
824 | 20 | return true; |
825 | 20 | } |
826 | | |
827 | | template <typename T> |
828 | 20 | bool RleBatchDecoder<T>::FillLiteralBuffer() { |
829 | 20 | int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN, literal_count_); |
830 | 20 | num_buffered_literals_ = bit_reader_.UnpackBatch(bit_width_, num_to_buffer, literal_buffer_); |
831 | | // If we couldn't read the expected number, that means the input was truncated. |
832 | 20 | if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false; |
833 | 20 | literal_buffer_pos_ = 0; |
834 | 20 | return true; |
835 | 20 | } |
836 | | |
837 | | template <typename T> |
838 | 37 | uint32_t RleBatchDecoder<T>::GetBatch(T* values, uint32_t batch_num) { |
839 | 37 | uint32_t num_consumed = 0; |
840 | 93 | while (num_consumed < batch_num) { |
841 | | // Add RLE encoded values by repeating the current value this number of times. |
842 | 56 | uint32_t num_repeats = NextNumRepeats(); |
843 | 56 | if (num_repeats > 0) { |
844 | 36 | int32_t num_repeats_to_set = std::min(num_repeats, batch_num - num_consumed); |
845 | 36 | T repeated_value = GetRepeatedValue(num_repeats_to_set); |
846 | 292 | for (int i = 0; i < num_repeats_to_set; ++i) { |
847 | 256 | values[num_consumed + i] = repeated_value; |
848 | 256 | } |
849 | 36 | num_consumed += num_repeats_to_set; |
850 | 36 | continue; |
851 | 36 | } |
852 | | |
853 | | // Add remaining literal values, if any. |
854 | 20 | uint32_t num_literals = NextNumLiterals(); |
855 | 20 | if (num_literals == 0) { |
856 | 0 | break; |
857 | 0 | } |
858 | 20 | uint32_t num_literals_to_set = std::min(num_literals, batch_num - num_consumed); |
859 | 20 | if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) { |
860 | 0 | return 0; |
861 | 0 | } |
862 | 20 | num_consumed += num_literals_to_set; |
863 | 20 | } |
864 | 37 | return num_consumed; |
865 | 37 | } |
866 | | |
867 | | } // namespace doris |