/root/doris/be/src/util/rle_encoding.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | #pragma once |
18 | | |
19 | | #include <glog/logging.h> |
20 | | |
21 | | #include <limits> // IWYU pragma: keep |
22 | | |
23 | | #include "gutil/port.h" |
24 | | #include "util/bit_stream_utils.inline.h" |
25 | | #include "util/bit_util.h" |
26 | | |
27 | | namespace doris { |
28 | | |
29 | | // Utility classes to do run length encoding (RLE) for fixed bit width values. If runs |
30 | | // are sufficiently long, RLE is used, otherwise, the values are just bit-packed |
31 | | // (literal encoding). |
32 | | // For both types of runs, there is a byte-aligned indicator which encodes the length |
33 | | // of the run and the type of the run. |
34 | | // This encoding has the benefit that when there aren't any long enough runs, values |
35 | | // are always decoded at fixed (can be precomputed) bit offsets OR both the value and |
36 | | // the run length are byte aligned. This allows for very efficient decoding |
37 | | // implementations. |
38 | | // The encoding is: |
39 | | // encoded-block := run* |
40 | | // run := literal-run | repeated-run |
41 | | // literal-run := literal-indicator < literal bytes > |
42 | | // repeated-run := repeated-indicator < repeated value. padded to byte boundary > |
43 | | // literal-indicator := varint_encode( number_of_groups << 1 | 1) |
44 | | // repeated-indicator := varint_encode( number_of_repetitions << 1 ) |
45 | | // |
46 | | // Each run is preceded by a varint. The varint's least significant bit is |
47 | | // used to indicate whether the run is a literal run or a repeated run. The rest |
48 | | // of the varint is used to determine the length of the run (eg how many times the |
49 | | // value repeats). |
50 | | // |
51 | | // In the case of literal runs, the run length is always a multiple of 8 (i.e. encode |
52 | | // in groups of 8), so that no matter the bit-width of the value, the sequence will end |
53 | | // on a byte boundary without padding. |
54 | | // Given that we know it is a multiple of 8, we store the number of 8-groups rather than |
55 | | // the actual number of encoded ints. (This means that the total number of encoded values |
56 | | // can not be determined from the encoded data, since the number of values in the last |
57 | | // group may not be a multiple of 8). |
58 | | // There is a break-even point when it is more storage efficient to do run length |
59 | | // encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes |
60 | | // for both the repeated encoding or the literal encoding. This value can always |
61 | | // be computed based on the bit-width. |
62 | | // TODO: think about how to use this for strings. The bit packing isn't quite the same. |
63 | | // |
64 | | // Examples with bit-width 1 (eg encoding booleans): |
65 | | // ---------------------------------------- |
66 | | // 100 1s followed by 100 0s: |
67 | | // <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte> |
68 | | // - (total 4 bytes) |
69 | | // |
70 | | // alternating 1s and 0s (200 total): |
71 | | // 200 ints = 25 groups of 8 |
72 | | // <varint((25 << 1) | 1)> <25 bytes of values, bitpacked> |
73 | | // (total 26 bytes, 1 byte overhead) |
74 | | // |
75 | | |
76 | | // Decoder class for RLE encoded data. |
77 | | // |
78 | | // NOTE: the encoded format does not have any length prefix or any other way of |
79 | | // indicating that the encoded sequence ends at a certain point, so the Decoder |
80 | | // methods may return some extra bits at the end before the read methods start |
81 | | // to return 0/false. |
82 | | template <typename T> |
83 | | class RleDecoder { |
84 | | public: |
85 | | // Create a decoder object. buffer/buffer_len is the decoded data. |
86 | | // bit_width is the width of each value (before encoding). |
87 | | RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width) |
88 | | : bit_reader_(buffer, buffer_len), |
89 | | bit_width_(bit_width), |
90 | | current_value_(0), |
91 | | repeat_count_(0), |
92 | | literal_count_(0), |
93 | 853 | rewind_state_(CANT_REWIND) { |
94 | 853 | DCHECK_GE(bit_width_, 1); |
95 | 853 | DCHECK_LE(bit_width_, 64); |
96 | 853 | } _ZN5doris10RleDecoderImEC2EPKhii Line | Count | Source | 93 | 394 | rewind_state_(CANT_REWIND) { | 94 | 394 | DCHECK_GE(bit_width_, 1); | 95 | 394 | DCHECK_LE(bit_width_, 64); | 96 | 394 | } |
_ZN5doris10RleDecoderIbEC2EPKhii Line | Count | Source | 93 | 409 | rewind_state_(CANT_REWIND) { | 94 | 409 | DCHECK_GE(bit_width_, 1); | 95 | 409 | DCHECK_LE(bit_width_, 64); | 96 | 409 | } |
_ZN5doris10RleDecoderIhEC2EPKhii Line | Count | Source | 93 | 5 | rewind_state_(CANT_REWIND) { | 94 | 5 | DCHECK_GE(bit_width_, 1); | 95 | 5 | DCHECK_LE(bit_width_, 64); | 96 | 5 | } |
_ZN5doris10RleDecoderIsEC2EPKhii Line | Count | Source | 93 | 45 | rewind_state_(CANT_REWIND) { | 94 | 45 | DCHECK_GE(bit_width_, 1); | 95 | 45 | DCHECK_LE(bit_width_, 64); | 96 | 45 | } |
|
97 | | |
98 | 25.6k | RleDecoder() {} _ZN5doris10RleDecoderIbEC2Ev Line | Count | Source | 98 | 25.5k | RleDecoder() {} |
_ZN5doris10RleDecoderIhEC2Ev Line | Count | Source | 98 | 6 | RleDecoder() {} |
_ZN5doris10RleDecoderIsEC2Ev Line | Count | Source | 98 | 87 | RleDecoder() {} |
|
99 | | |
100 | | // Skip n values, and returns the number of non-zero entries skipped. |
101 | | size_t Skip(size_t to_skip); |
102 | | |
103 | | // Gets the next value. Returns false if there are no more. |
104 | | bool Get(T* val); |
105 | | |
106 | | // Seek to the previous value. |
107 | | void RewindOne(); |
108 | | |
109 | | // Gets the next run of the same 'val'. Returns 0 if there is no |
110 | | // more data to be decoded. Will return a run of at most 'max_run' |
111 | | // values. If there are more values than this, the next call to |
112 | | // GetNextRun will return more from the same run. |
113 | | size_t GetNextRun(T* val, size_t max_run); |
114 | | |
115 | | size_t get_values(T* values, size_t num_values); |
116 | | |
117 | | // Get the count of current repeated value |
118 | | size_t repeated_count(); |
119 | | |
120 | | // Get current repeated value, make sure that count equals repeated_count() |
121 | | T get_repeated_value(size_t count); |
122 | | |
123 | 0 | const BitReader& bit_reader() const { return bit_reader_; } |
124 | | |
125 | | private: |
126 | | bool ReadHeader(); |
127 | | |
128 | | enum RewindState { REWIND_LITERAL, REWIND_RUN, CANT_REWIND }; |
129 | | |
130 | | BitReader bit_reader_; |
131 | | int bit_width_; |
132 | | uint64_t current_value_; |
133 | | uint32_t repeat_count_; |
134 | | uint32_t literal_count_; |
135 | | RewindState rewind_state_; |
136 | | }; |
137 | | |
138 | | // Class to incrementally build the rle data. |
139 | | // The encoding has two modes: encoding repeated runs and literal runs. |
140 | | // If the run is sufficiently short, it is more efficient to encode as a literal run. |
141 | | // This class does so by buffering 8 values at a time. If they are not all the same |
142 | | // they are added to the literal run. If they are the same, they are added to the |
143 | | // repeated run. When we switch modes, the previous run is flushed out. |
144 | | template <typename T> |
145 | | class RleEncoder { |
146 | | public: |
147 | | // buffer: buffer to write bits to. |
148 | | // bit_width: max number of bits for value. |
149 | | // TODO: consider adding a min_repeated_run_length so the caller can control |
150 | | // when values should be encoded as repeated runs. Currently this is derived |
151 | | // based on the bit_width, which can determine a storage optimal choice. |
152 | | explicit RleEncoder(faststring* buffer, int bit_width) |
153 | 3.66k | : bit_width_(bit_width), bit_writer_(buffer) { |
154 | 3.66k | DCHECK_GE(bit_width_, 1); |
155 | 3.66k | DCHECK_LE(bit_width_, 64); |
156 | 3.66k | Clear(); |
157 | 3.66k | } _ZN5doris10RleEncoderImEC2EPNS_10faststringEi Line | Count | Source | 153 | 394 | : bit_width_(bit_width), bit_writer_(buffer) { | 154 | 394 | DCHECK_GE(bit_width_, 1); | 155 | 394 | DCHECK_LE(bit_width_, 64); | 156 | 394 | Clear(); | 157 | 394 | } |
_ZN5doris10RleEncoderIbEC2EPNS_10faststringEi Line | Count | Source | 153 | 3.27k | : bit_width_(bit_width), bit_writer_(buffer) { | 154 | 3.27k | DCHECK_GE(bit_width_, 1); | 155 | 3.27k | DCHECK_LE(bit_width_, 64); | 156 | 3.27k | Clear(); | 157 | 3.27k | } |
|
158 | | |
159 | | // Reserve 'num_bytes' bytes for a plain encoded header, set each |
160 | | // byte with 'val': this is used for the RLE-encoded data blocks in |
161 | | // order to be able to able to store the initial ordinal position |
162 | | // and number of elements. This is a part of RleEncoder in order to |
163 | | // maintain the correct offset in 'buffer'. |
164 | | void Reserve(int num_bytes, uint8_t val); |
165 | | |
166 | | // Encode value. This value must be representable with bit_width_ bits. |
167 | | void Put(T value, size_t run_length = 1); |
168 | | |
169 | | // Flushes any pending values to the underlying buffer. |
170 | | // Returns the total number of bytes written |
171 | | int Flush(); |
172 | | |
173 | | // Resets all the state in the encoder. |
174 | | void Clear(); |
175 | | |
176 | 302 | int32_t len() const { return bit_writer_.bytes_written(); } |
177 | | |
178 | | private: |
179 | | // Flushes any buffered values. If this is part of a repeated run, this is largely |
180 | | // a no-op. |
181 | | // If it is part of a literal run, this will call FlushLiteralRun, which writes |
182 | | // out the buffered literal values. |
183 | | // If 'done' is true, the current run would be written even if it would normally |
184 | | // have been buffered more. This should only be called at the end, when the |
185 | | // encoder has received all values even if it would normally continue to be |
186 | | // buffered. |
187 | | void FlushBufferedValues(bool done); |
188 | | |
189 | | // Flushes literal values to the underlying buffer. If update_indicator_byte, |
190 | | // then the current literal run is complete and the indicator byte is updated. |
191 | | void FlushLiteralRun(bool update_indicator_byte); |
192 | | |
193 | | // Flushes a repeated run to the underlying buffer. |
194 | | void FlushRepeatedRun(); |
195 | | |
196 | | // Number of bits needed to encode the value. |
197 | | const int bit_width_; |
198 | | |
199 | | // Underlying buffer. |
200 | | BitWriter bit_writer_; |
201 | | |
202 | | // We need to buffer at most 8 values for literals. This happens when the |
203 | | // bit_width is 1 (so 8 values fit in one byte). |
204 | | // TODO: generalize this to other bit widths |
205 | | uint64_t buffered_values_[8]; |
206 | | |
207 | | // Number of values in buffered_values_ |
208 | | int num_buffered_values_; |
209 | | |
210 | | // The current (also last) value that was written and the count of how |
211 | | // many times in a row that value has been seen. This is maintained even |
212 | | // if we are in a literal run. If the repeat_count_ get high enough, we switch |
213 | | // to encoding repeated runs. |
214 | | uint64_t current_value_; |
215 | | int repeat_count_; |
216 | | |
217 | | // Number of literals in the current run. This does not include the literals |
218 | | // that might be in buffered_values_. Only after we've got a group big enough |
219 | | // can we decide if they should part of the literal_count_ or repeat_count_ |
220 | | int literal_count_; |
221 | | |
222 | | // Index of a byte in the underlying buffer that stores the indicator byte. |
223 | | // This is reserved as soon as we need a literal run but the value is written |
224 | | // when the literal run is complete. We maintain an index rather than a pointer |
225 | | // into the underlying buffer because the pointer value may become invalid if |
226 | | // the underlying buffer is resized. |
227 | | int literal_indicator_byte_idx_; |
228 | | }; |
229 | | |
230 | | template <typename T> |
231 | 301k | bool RleDecoder<T>::ReadHeader() { |
232 | 301k | DCHECK(bit_reader_.is_initialized()); |
233 | 301k | if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) { |
234 | | // Read the next run's indicator int, it could be a literal or repeated run |
235 | | // The int is encoded as a vlq-encoded value. |
236 | 23.6k | uint32_t indicator_value = 0; |
237 | 23.6k | bool result = bit_reader_.GetVlqInt(&indicator_value); |
238 | 23.6k | if (PREDICT_FALSE(!result)) { |
239 | 276 | return false; |
240 | 276 | } |
241 | | |
242 | | // lsb indicates if it is a literal run or repeated run |
243 | 23.3k | bool is_literal = indicator_value & 1; |
244 | 23.3k | if (is_literal) { |
245 | 1.33k | literal_count_ = (indicator_value >> 1) * 8; |
246 | 1.33k | DCHECK_GT(literal_count_, 0); |
247 | 22.0k | } else { |
248 | 22.0k | repeat_count_ = indicator_value >> 1; |
249 | 22.0k | DCHECK_GT(repeat_count_, 0); |
250 | 22.0k | bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), |
251 | 22.0k | reinterpret_cast<T*>(¤t_value_)); |
252 | 22.0k | DCHECK(result); |
253 | 22.0k | } |
254 | 23.3k | } |
255 | 301k | return true; |
256 | 301k | } _ZN5doris10RleDecoderImE10ReadHeaderEv Line | Count | Source | 231 | 244k | bool RleDecoder<T>::ReadHeader() { | 232 | 244k | DCHECK(bit_reader_.is_initialized()); | 233 | 244k | if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) { | 234 | | // Read the next run's indicator int, it could be a literal or repeated run | 235 | | // The int is encoded as a vlq-encoded value. | 236 | 3.03k | uint32_t indicator_value = 0; | 237 | 3.03k | bool result = bit_reader_.GetVlqInt(&indicator_value); | 238 | 3.03k | if (PREDICT_FALSE(!result)) { | 239 | 0 | return false; | 240 | 0 | } | 241 | | | 242 | | // lsb indicates if it is a literal run or repeated run | 243 | 3.03k | bool is_literal = indicator_value & 1; | 244 | 3.03k | if (is_literal) { | 245 | 1.12k | literal_count_ = (indicator_value >> 1) * 8; | 246 | 1.12k | DCHECK_GT(literal_count_, 0); | 247 | 1.91k | } else { | 248 | 1.91k | repeat_count_ = indicator_value >> 1; | 249 | 1.91k | DCHECK_GT(repeat_count_, 0); | 250 | 1.91k | bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), | 251 | 1.91k | reinterpret_cast<T*>(¤t_value_)); | 252 | 1.91k | DCHECK(result); | 253 | 1.91k | } | 254 | 3.03k | } | 255 | 244k | return true; | 256 | 244k | } |
_ZN5doris10RleDecoderIbE10ReadHeaderEv Line | Count | Source | 231 | 57.5k | bool RleDecoder<T>::ReadHeader() { | 232 | 57.5k | DCHECK(bit_reader_.is_initialized()); | 233 | 57.5k | if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) { | 234 | | // Read the next run's indicator int, it could be a literal or repeated run | 235 | | // The int is encoded as a vlq-encoded value. | 236 | 20.5k | uint32_t indicator_value = 0; | 237 | 20.5k | bool result = bit_reader_.GetVlqInt(&indicator_value); | 238 | 20.5k | if (PREDICT_FALSE(!result)) { | 239 | 276 | return false; | 240 | 276 | } | 241 | | | 242 | | // lsb indicates if it is a literal run or repeated run | 243 | 20.3k | bool is_literal = indicator_value & 1; | 244 | 20.3k | if (is_literal) { | 245 | 206 | literal_count_ = (indicator_value >> 1) * 8; | 246 | 206 | DCHECK_GT(literal_count_, 0); | 247 | 20.1k | } else { | 248 | 20.1k | repeat_count_ = indicator_value >> 1; | 249 | 20.1k | DCHECK_GT(repeat_count_, 0); | 250 | 20.1k | bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), | 251 | 20.1k | reinterpret_cast<T*>(¤t_value_)); | 252 | 20.1k | DCHECK(result); | 253 | 20.1k | } | 254 | 20.3k | } | 255 | 57.3k | return true; | 256 | 57.5k | } |
_ZN5doris10RleDecoderIsE10ReadHeaderEv Line | Count | Source | 231 | 49 | bool RleDecoder<T>::ReadHeader() { | 232 | 49 | DCHECK(bit_reader_.is_initialized()); | 233 | 49 | if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) { | 234 | | // Read the next run's indicator int, it could be a literal or repeated run | 235 | | // The int is encoded as a vlq-encoded value. | 236 | 45 | uint32_t indicator_value = 0; | 237 | 45 | bool result = bit_reader_.GetVlqInt(&indicator_value); | 238 | 45 | if (PREDICT_FALSE(!result)) { | 239 | 0 | return false; | 240 | 0 | } | 241 | | | 242 | | // lsb indicates if it is a literal run or repeated run | 243 | 45 | bool is_literal = indicator_value & 1; | 244 | 45 | if (is_literal) { | 245 | 3 | literal_count_ = (indicator_value >> 1) * 8; | 246 | 3 | DCHECK_GT(literal_count_, 0); | 247 | 42 | } else { | 248 | 42 | repeat_count_ = indicator_value >> 1; | 249 | 42 | DCHECK_GT(repeat_count_, 0); | 250 | 42 | bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), | 251 | 42 | reinterpret_cast<T*>(¤t_value_)); | 252 | 42 | DCHECK(result); | 253 | 42 | } | 254 | 45 | } | 255 | 49 | return true; | 256 | 49 | } |
_ZN5doris10RleDecoderIhE10ReadHeaderEv Line | Count | Source | 231 | 5 | bool RleDecoder<T>::ReadHeader() { | 232 | 5 | DCHECK(bit_reader_.is_initialized()); | 233 | 5 | if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) { | 234 | | // Read the next run's indicator int, it could be a literal or repeated run | 235 | | // The int is encoded as a vlq-encoded value. | 236 | 5 | uint32_t indicator_value = 0; | 237 | 5 | bool result = bit_reader_.GetVlqInt(&indicator_value); | 238 | 5 | if (PREDICT_FALSE(!result)) { | 239 | 0 | return false; | 240 | 0 | } | 241 | | | 242 | | // lsb indicates if it is a literal run or repeated run | 243 | 5 | bool is_literal = indicator_value & 1; | 244 | 5 | if (is_literal) { | 245 | 5 | literal_count_ = (indicator_value >> 1) * 8; | 246 | 5 | DCHECK_GT(literal_count_, 0); | 247 | 5 | } else { | 248 | 0 | repeat_count_ = indicator_value >> 1; | 249 | 0 | DCHECK_GT(repeat_count_, 0); | 250 | 0 | bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), | 251 | 0 | reinterpret_cast<T*>(¤t_value_)); | 252 | 0 | DCHECK(result); | 253 | 0 | } | 254 | 5 | } | 255 | 5 | return true; | 256 | 5 | } |
|
257 | | |
258 | | template <typename T> |
259 | 247k | bool RleDecoder<T>::Get(T* val) { |
260 | 247k | DCHECK(bit_reader_.is_initialized()); |
261 | 247k | if (PREDICT_FALSE(!ReadHeader())) { |
262 | 0 | return false; |
263 | 0 | } |
264 | | |
265 | 247k | if (PREDICT_TRUE(repeat_count_ > 0)) { |
266 | 158k | *val = current_value_; |
267 | 158k | --repeat_count_; |
268 | 158k | rewind_state_ = REWIND_RUN; |
269 | 158k | } else { |
270 | 89.0k | DCHECK(literal_count_ > 0); |
271 | 89.0k | bool result = bit_reader_.GetValue(bit_width_, val); |
272 | 89.0k | DCHECK(result); |
273 | 89.0k | --literal_count_; |
274 | 89.0k | rewind_state_ = REWIND_LITERAL; |
275 | 89.0k | } |
276 | | |
277 | 247k | return true; |
278 | 247k | } _ZN5doris10RleDecoderImE3GetEPm Line | Count | Source | 259 | 244k | bool RleDecoder<T>::Get(T* val) { | 260 | 244k | DCHECK(bit_reader_.is_initialized()); | 261 | 244k | if (PREDICT_FALSE(!ReadHeader())) { | 262 | 0 | return false; | 263 | 0 | } | 264 | | | 265 | 244k | if (PREDICT_TRUE(repeat_count_ > 0)) { | 266 | 155k | *val = current_value_; | 267 | 155k | --repeat_count_; | 268 | 155k | rewind_state_ = REWIND_RUN; | 269 | 155k | } else { | 270 | 88.9k | DCHECK(literal_count_ > 0); | 271 | 88.9k | bool result = bit_reader_.GetValue(bit_width_, val); | 272 | 88.9k | DCHECK(result); | 273 | 88.9k | --literal_count_; | 274 | 88.9k | rewind_state_ = REWIND_LITERAL; | 275 | 88.9k | } | 276 | | | 277 | 244k | return true; | 278 | 244k | } |
_ZN5doris10RleDecoderIbE3GetEPb Line | Count | Source | 259 | 3.17k | bool RleDecoder<T>::Get(T* val) { | 260 | 3.17k | DCHECK(bit_reader_.is_initialized()); | 261 | 3.17k | if (PREDICT_FALSE(!ReadHeader())) { | 262 | 0 | return false; | 263 | 0 | } | 264 | | | 265 | 3.17k | if (PREDICT_TRUE(repeat_count_ > 0)) { | 266 | 3.08k | *val = current_value_; | 267 | 3.08k | --repeat_count_; | 268 | 3.08k | rewind_state_ = REWIND_RUN; | 269 | 3.08k | } else { | 270 | 93 | DCHECK(literal_count_ > 0); | 271 | 93 | bool result = bit_reader_.GetValue(bit_width_, val); | 272 | 93 | DCHECK(result); | 273 | 93 | --literal_count_; | 274 | 93 | rewind_state_ = REWIND_LITERAL; | 275 | 93 | } | 276 | | | 277 | 3.17k | return true; | 278 | 3.17k | } |
_ZN5doris10RleDecoderIsE3GetEPs Line | Count | Source | 259 | 6 | bool RleDecoder<T>::Get(T* val) { | 260 | 6 | DCHECK(bit_reader_.is_initialized()); | 261 | 6 | if (PREDICT_FALSE(!ReadHeader())) { | 262 | 0 | return false; | 263 | 0 | } | 264 | | | 265 | 6 | if (PREDICT_TRUE(repeat_count_ > 0)) { | 266 | 6 | *val = current_value_; | 267 | 6 | --repeat_count_; | 268 | 6 | rewind_state_ = REWIND_RUN; | 269 | 6 | } else { | 270 | 0 | DCHECK(literal_count_ > 0); | 271 | 0 | bool result = bit_reader_.GetValue(bit_width_, val); | 272 | 0 | DCHECK(result); | 273 | 0 | --literal_count_; | 274 | 0 | rewind_state_ = REWIND_LITERAL; | 275 | 0 | } | 276 | | | 277 | 6 | return true; | 278 | 6 | } |
|
279 | | |
280 | | template <typename T> |
281 | 2 | void RleDecoder<T>::RewindOne() { |
282 | 2 | DCHECK(bit_reader_.is_initialized()); |
283 | | |
284 | 2 | switch (rewind_state_) { |
285 | 0 | case CANT_REWIND: |
286 | 0 | throw Exception(Status::FatalError("Can't rewind more than once after each read!")); |
287 | 0 | break; |
288 | 2 | case REWIND_RUN: |
289 | 2 | ++repeat_count_; |
290 | 2 | break; |
291 | 0 | case REWIND_LITERAL: { |
292 | 0 | bit_reader_.Rewind(bit_width_); |
293 | 0 | ++literal_count_; |
294 | 0 | break; |
295 | 0 | } |
296 | 2 | } |
297 | | |
298 | 2 | rewind_state_ = CANT_REWIND; |
299 | 2 | } |
300 | | |
301 | | template <typename T> |
302 | 34.3k | size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) { |
303 | 34.3k | DCHECK(bit_reader_.is_initialized()); |
304 | 34.3k | DCHECK_GT(max_run, 0); |
305 | 34.3k | size_t ret = 0; |
306 | 34.3k | size_t rem = max_run; |
307 | 54.4k | while (ReadHeader()) { |
308 | 54.1k | if (PREDICT_TRUE(repeat_count_ > 0)) { |
309 | 43.6k | if (PREDICT_FALSE(ret > 0 && *val != current_value_)) { |
310 | 19.6k | return ret; |
311 | 19.6k | } |
312 | 24.0k | *val = current_value_; |
313 | 24.0k | if (repeat_count_ >= rem) { |
314 | | // The next run is longer than the amount of remaining data |
315 | | // that the caller wants to read. Only consume it partially. |
316 | 4.19k | repeat_count_ -= rem; |
317 | 4.19k | ret += rem; |
318 | 4.19k | return ret; |
319 | 4.19k | } |
320 | 19.8k | ret += repeat_count_; |
321 | 19.8k | rem -= repeat_count_; |
322 | 19.8k | repeat_count_ = 0; |
323 | 19.8k | } else { |
324 | 10.4k | DCHECK(literal_count_ > 0); |
325 | 10.4k | if (ret == 0) { |
326 | 10.4k | bool has_more = bit_reader_.GetValue(bit_width_, val); |
327 | 10.4k | DCHECK(has_more); |
328 | 10.4k | literal_count_--; |
329 | 10.4k | ret++; |
330 | 10.4k | rem--; |
331 | 10.4k | } |
332 | | |
333 | 41.9k | while (literal_count_ > 0) { |
334 | 41.7k | bool result = bit_reader_.GetValue(bit_width_, ¤t_value_); |
335 | 41.7k | DCHECK(result); |
336 | 41.7k | if (current_value_ != *val || rem == 0) { |
337 | 10.3k | bit_reader_.Rewind(bit_width_); |
338 | 10.3k | return ret; |
339 | 10.3k | } |
340 | 31.4k | ret++; |
341 | 31.4k | rem--; |
342 | 31.4k | literal_count_--; |
343 | 31.4k | } |
344 | 10.4k | } |
345 | 54.1k | } |
346 | 276 | return ret; |
347 | 34.3k | } _ZN5doris10RleDecoderIbE10GetNextRunEPbm Line | Count | Source | 302 | 34.3k | size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) { | 303 | 34.3k | DCHECK(bit_reader_.is_initialized()); | 304 | 34.3k | DCHECK_GT(max_run, 0); | 305 | 34.3k | size_t ret = 0; | 306 | 34.3k | size_t rem = max_run; | 307 | 54.4k | while (ReadHeader()) { | 308 | 54.1k | if (PREDICT_TRUE(repeat_count_ > 0)) { | 309 | 43.6k | if (PREDICT_FALSE(ret > 0 && *val != current_value_)) { | 310 | 19.6k | return ret; | 311 | 19.6k | } | 312 | 24.0k | *val = current_value_; | 313 | 24.0k | if (repeat_count_ >= rem) { | 314 | | // The next run is longer than the amount of remaining data | 315 | | // that the caller wants to read. Only consume it partially. | 316 | 4.18k | repeat_count_ -= rem; | 317 | 4.18k | ret += rem; | 318 | 4.18k | return ret; | 319 | 4.18k | } | 320 | 19.8k | ret += repeat_count_; | 321 | 19.8k | rem -= repeat_count_; | 322 | 19.8k | repeat_count_ = 0; | 323 | 19.8k | } else { | 324 | 10.4k | DCHECK(literal_count_ > 0); | 325 | 10.4k | if (ret == 0) { | 326 | 10.4k | bool has_more = bit_reader_.GetValue(bit_width_, val); | 327 | 10.4k | DCHECK(has_more); | 328 | 10.4k | literal_count_--; | 329 | 10.4k | ret++; | 330 | 10.4k | rem--; | 331 | 10.4k | } | 332 | | | 333 | 41.9k | while (literal_count_ > 0) { | 334 | 41.7k | bool result = bit_reader_.GetValue(bit_width_, ¤t_value_); | 335 | 41.7k | DCHECK(result); | 336 | 41.7k | if (current_value_ != *val || rem == 0) { | 337 | 10.3k | bit_reader_.Rewind(bit_width_); | 338 | 10.3k | return ret; | 339 | 10.3k | } | 340 | 31.4k | ret++; | 341 | 31.4k | rem--; | 342 | 31.4k | literal_count_--; | 343 | 31.4k | } | 344 | 10.4k | } | 345 | 54.1k | } | 346 | 276 | return ret; | 347 | 34.3k | } |
_ZN5doris10RleDecoderIsE10GetNextRunEPsm Line | Count | Source | 302 | 7 | size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) { | 303 | 7 | DCHECK(bit_reader_.is_initialized()); | 304 | 7 | DCHECK_GT(max_run, 0); | 305 | 7 | size_t ret = 0; | 306 | 7 | size_t rem = max_run; | 307 | 7 | while (ReadHeader()) { | 308 | 7 | if (PREDICT_TRUE(repeat_count_ > 0)) { | 309 | 7 | if (PREDICT_FALSE(ret > 0 && *val != current_value_)) { | 310 | 0 | return ret; | 311 | 0 | } | 312 | 7 | *val = current_value_; | 313 | 7 | if (repeat_count_ >= rem) { | 314 | | // The next run is longer than the amount of remaining data | 315 | | // that the caller wants to read. Only consume it partially. | 316 | 7 | repeat_count_ -= rem; | 317 | 7 | ret += rem; | 318 | 7 | return ret; | 319 | 7 | } | 320 | 0 | ret += repeat_count_; | 321 | 0 | rem -= repeat_count_; | 322 | 0 | repeat_count_ = 0; | 323 | 0 | } else { | 324 | 0 | DCHECK(literal_count_ > 0); | 325 | 0 | if (ret == 0) { | 326 | 0 | bool has_more = bit_reader_.GetValue(bit_width_, val); | 327 | 0 | DCHECK(has_more); | 328 | 0 | literal_count_--; | 329 | 0 | ret++; | 330 | 0 | rem--; | 331 | 0 | } | 332 | |
| 333 | 0 | while (literal_count_ > 0) { | 334 | 0 | bool result = bit_reader_.GetValue(bit_width_, ¤t_value_); | 335 | 0 | DCHECK(result); | 336 | 0 | if (current_value_ != *val || rem == 0) { | 337 | 0 | bit_reader_.Rewind(bit_width_); | 338 | 0 | return ret; | 339 | 0 | } | 340 | 0 | ret++; | 341 | 0 | rem--; | 342 | 0 | literal_count_--; | 343 | 0 | } | 344 | 0 | } | 345 | 7 | } | 346 | 0 | return ret; | 347 | 7 | } |
|
348 | | |
349 | | template <typename T> |
350 | 39 | size_t RleDecoder<T>::get_values(T* values, size_t num_values) { |
351 | 39 | size_t read_num = 0; |
352 | 120 | while (read_num < num_values) { |
353 | 81 | size_t read_this_time = num_values - read_num; |
354 | | |
355 | 81 | if (LIKELY(repeat_count_ > 0)) { |
356 | 33 | read_this_time = std::min((size_t)repeat_count_, read_this_time); |
357 | 33 | std::fill(values, values + read_this_time, current_value_); |
358 | 33 | values += read_this_time; |
359 | 33 | repeat_count_ -= read_this_time; |
360 | 33 | read_num += read_this_time; |
361 | 48 | } else if (literal_count_ > 0) { |
362 | 8 | read_this_time = std::min((size_t)literal_count_, read_this_time); |
363 | 59 | for (int i = 0; i < read_this_time; ++i) { |
364 | 51 | bool result = bit_reader_.GetValue(bit_width_, values); |
365 | 51 | DCHECK(result); |
366 | 51 | values++; |
367 | 51 | } |
368 | 8 | literal_count_ -= read_this_time; |
369 | 8 | read_num += read_this_time; |
370 | 40 | } else { |
371 | 40 | if (!ReadHeader()) { |
372 | 0 | return read_num; |
373 | 0 | } |
374 | 40 | } |
375 | 81 | } |
376 | 39 | return read_num; |
377 | 39 | } _ZN5doris10RleDecoderIhE10get_valuesEPhm Line | Count | Source | 350 | 5 | size_t RleDecoder<T>::get_values(T* values, size_t num_values) { | 351 | 5 | size_t read_num = 0; | 352 | 14 | while (read_num < num_values) { | 353 | 9 | size_t read_this_time = num_values - read_num; | 354 | | | 355 | 9 | if (LIKELY(repeat_count_ > 0)) { | 356 | 0 | read_this_time = std::min((size_t)repeat_count_, read_this_time); | 357 | 0 | std::fill(values, values + read_this_time, current_value_); | 358 | 0 | values += read_this_time; | 359 | 0 | repeat_count_ -= read_this_time; | 360 | 0 | read_num += read_this_time; | 361 | 9 | } else if (literal_count_ > 0) { | 362 | 5 | read_this_time = std::min((size_t)literal_count_, read_this_time); | 363 | 40 | for (int i = 0; i < read_this_time; ++i) { | 364 | 35 | bool result = bit_reader_.GetValue(bit_width_, values); | 365 | 35 | DCHECK(result); | 366 | 35 | values++; | 367 | 35 | } | 368 | 5 | literal_count_ -= read_this_time; | 369 | 5 | read_num += read_this_time; | 370 | 5 | } else { | 371 | 4 | if (!ReadHeader()) { | 372 | 0 | return read_num; | 373 | 0 | } | 374 | 4 | } | 375 | 9 | } | 376 | 5 | return read_num; | 377 | 5 | } |
_ZN5doris10RleDecoderIsE10get_valuesEPsm Line | Count | Source | 350 | 34 | size_t RleDecoder<T>::get_values(T* values, size_t num_values) { | 351 | 34 | size_t read_num = 0; | 352 | 106 | while (read_num < num_values) { | 353 | 72 | size_t read_this_time = num_values - read_num; | 354 | | | 355 | 72 | if (LIKELY(repeat_count_ > 0)) { | 356 | 33 | read_this_time = std::min((size_t)repeat_count_, read_this_time); | 357 | 33 | std::fill(values, values + read_this_time, current_value_); | 358 | 33 | values += read_this_time; | 359 | 33 | repeat_count_ -= read_this_time; | 360 | 33 | read_num += read_this_time; | 361 | 39 | } else if (literal_count_ > 0) { | 362 | 3 | read_this_time = std::min((size_t)literal_count_, read_this_time); | 363 | 19 | for (int i = 0; i < read_this_time; ++i) { | 364 | 16 | bool result = bit_reader_.GetValue(bit_width_, values); | 365 | 16 | DCHECK(result); | 366 | 16 | values++; | 367 | 16 | } | 368 | 3 | literal_count_ -= read_this_time; | 369 | 3 | read_num += read_this_time; | 370 | 36 | } else { | 371 | 36 | if (!ReadHeader()) { | 372 | 0 | return read_num; | 373 | 0 | } | 374 | 36 | } | 375 | 72 | } | 376 | 34 | return read_num; | 377 | 34 | } |
|
378 | | |
379 | | template <typename T> |
380 | | size_t RleDecoder<T>::repeated_count() { |
381 | | if (repeat_count_ > 0) { |
382 | | return repeat_count_; |
383 | | } |
384 | | if (literal_count_ == 0) { |
385 | | ReadHeader(); |
386 | | } |
387 | | return repeat_count_; |
388 | | } |
389 | | |
390 | | template <typename T> |
391 | | T RleDecoder<T>::get_repeated_value(size_t count) { |
392 | | DCHECK_GE(repeat_count_, count); |
393 | | repeat_count_ -= count; |
394 | | return current_value_; |
395 | | } |
396 | | |
397 | | template <typename T> |
398 | 5 | size_t RleDecoder<T>::Skip(size_t to_skip) { |
399 | 5 | DCHECK(bit_reader_.is_initialized()); |
400 | | |
401 | 5 | size_t set_count = 0; |
402 | 16 | while (to_skip > 0) { |
403 | 11 | bool result = ReadHeader(); |
404 | 11 | DCHECK(result); |
405 | | |
406 | 11 | if (PREDICT_TRUE(repeat_count_ > 0)) { |
407 | 7 | size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip; |
408 | 7 | repeat_count_ -= nskip; |
409 | 7 | to_skip -= nskip; |
410 | 7 | if (current_value_ != 0) { |
411 | 3 | set_count += nskip; |
412 | 3 | } |
413 | 7 | } else { |
414 | 4 | DCHECK(literal_count_ > 0); |
415 | 4 | size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip; |
416 | 4 | literal_count_ -= nskip; |
417 | 4 | to_skip -= nskip; |
418 | 60 | for (; nskip > 0; nskip--) { |
419 | 56 | T value = 0; |
420 | 56 | bool result = bit_reader_.GetValue(bit_width_, &value); |
421 | 56 | DCHECK(result); |
422 | 56 | if (value != 0) { |
423 | 28 | set_count++; |
424 | 28 | } |
425 | 56 | } |
426 | 4 | } |
427 | 11 | } |
428 | 5 | return set_count; |
429 | 5 | } _ZN5doris10RleDecoderIbE4SkipEm Line | Count | Source | 398 | 4 | size_t RleDecoder<T>::Skip(size_t to_skip) { | 399 | 4 | DCHECK(bit_reader_.is_initialized()); | 400 | | | 401 | 4 | size_t set_count = 0; | 402 | 14 | while (to_skip > 0) { | 403 | 10 | bool result = ReadHeader(); | 404 | 10 | DCHECK(result); | 405 | | | 406 | 10 | if (PREDICT_TRUE(repeat_count_ > 0)) { | 407 | 7 | size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip; | 408 | 7 | repeat_count_ -= nskip; | 409 | 7 | to_skip -= nskip; | 410 | 7 | if (current_value_ != 0) { | 411 | 3 | set_count += nskip; | 412 | 3 | } | 413 | 7 | } else { | 414 | 3 | DCHECK(literal_count_ > 0); | 415 | 3 | size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip; | 416 | 3 | literal_count_ -= nskip; | 417 | 3 | to_skip -= nskip; | 418 | 56 | for (; nskip > 0; nskip--) { | 419 | 53 | T value = 0; | 420 | 53 | bool result = bit_reader_.GetValue(bit_width_, &value); | 421 | 53 | DCHECK(result); | 422 | 53 | if (value != 0) { | 423 | 26 | set_count++; | 424 | 26 | } | 425 | 53 | } | 426 | 3 | } | 427 | 10 | } | 428 | 4 | return set_count; | 429 | 4 | } |
_ZN5doris10RleDecoderIhE4SkipEm Line | Count | Source | 398 | 1 | size_t RleDecoder<T>::Skip(size_t to_skip) { | 399 | 1 | DCHECK(bit_reader_.is_initialized()); | 400 | | | 401 | 1 | size_t set_count = 0; | 402 | 2 | while (to_skip > 0) { | 403 | 1 | bool result = ReadHeader(); | 404 | 1 | DCHECK(result); | 405 | | | 406 | 1 | if (PREDICT_TRUE(repeat_count_ > 0)) { | 407 | 0 | size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip; | 408 | 0 | repeat_count_ -= nskip; | 409 | 0 | to_skip -= nskip; | 410 | 0 | if (current_value_ != 0) { | 411 | 0 | set_count += nskip; | 412 | 0 | } | 413 | 1 | } else { | 414 | 1 | DCHECK(literal_count_ > 0); | 415 | 1 | size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip; | 416 | 1 | literal_count_ -= nskip; | 417 | 1 | to_skip -= nskip; | 418 | 4 | for (; nskip > 0; nskip--) { | 419 | 3 | T value = 0; | 420 | 3 | bool result = bit_reader_.GetValue(bit_width_, &value); | 421 | 3 | DCHECK(result); | 422 | 3 | if (value != 0) { | 423 | 2 | set_count++; | 424 | 2 | } | 425 | 3 | } | 426 | 1 | } | 427 | 1 | } | 428 | 1 | return set_count; | 429 | 1 | } |
|
430 | | |
431 | | // This function buffers input values 8 at a time. After seeing all 8 values, |
432 | | // it decides whether they should be encoded as a literal or repeated run. |
433 | | template <typename T> |
434 | 732k | void RleEncoder<T>::Put(T value, size_t run_length) { |
435 | 732k | DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); |
436 | | |
437 | | // TODO(perf): remove the loop and use the repeat_count_ |
438 | 3.35M | for (; run_length > 0; run_length--) { |
439 | 2.61M | if (PREDICT_TRUE(current_value_ == value)) { |
440 | 2.51M | ++repeat_count_; |
441 | 2.51M | if (repeat_count_ > 8) { |
442 | | // This is just a continuation of the current run, no need to buffer the |
443 | | // values. |
444 | | // Note that this is the fast path for long repeated runs. |
445 | 2.29M | continue; |
446 | 2.29M | } |
447 | 2.51M | } else { |
448 | 107k | if (repeat_count_ >= 8) { |
449 | | // We had a run that was long enough but it has ended. Flush the |
450 | | // current repeated run. |
451 | 21.4k | DCHECK_EQ(literal_count_, 0); |
452 | 21.4k | FlushRepeatedRun(); |
453 | 21.4k | } |
454 | 107k | repeat_count_ = 1; |
455 | 107k | current_value_ = value; |
456 | 107k | } |
457 | | |
458 | 329k | buffered_values_[num_buffered_values_] = value; |
459 | 329k | if (++num_buffered_values_ == 8) { |
460 | 41.0k | DCHECK_EQ(literal_count_ % 8, 0); |
461 | 41.0k | FlushBufferedValues(false); |
462 | 41.0k | } |
463 | 329k | } |
464 | 732k | } _ZN5doris10RleEncoderImE3PutEmm Line | Count | Source | 434 | 244k | void RleEncoder<T>::Put(T value, size_t run_length) { | 435 | 244k | DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); | 436 | | | 437 | | // TODO(perf): remove the loop and use the repeat_count_ | 438 | 488k | for (; run_length > 0; run_length--) { | 439 | 244k | if (PREDICT_TRUE(current_value_ == value)) { | 440 | 167k | ++repeat_count_; | 441 | 167k | if (repeat_count_ > 8) { | 442 | | // This is just a continuation of the current run, no need to buffer the | 443 | | // values. | 444 | | // Note that this is the fast path for long repeated runs. | 445 | 140k | continue; | 446 | 140k | } | 447 | 167k | } else { | 448 | 76.9k | if (repeat_count_ >= 8) { | 449 | | // We had a run that was long enough but it has ended. Flush the | 450 | | // current repeated run. | 451 | 1.65k | DCHECK_EQ(literal_count_, 0); | 452 | 1.65k | FlushRepeatedRun(); | 453 | 1.65k | } | 454 | 76.9k | repeat_count_ = 1; | 455 | 76.9k | current_value_ = value; | 456 | 76.9k | } | 457 | | | 458 | 103k | buffered_values_[num_buffered_values_] = value; | 459 | 103k | if (++num_buffered_values_ == 8) { | 460 | 12.9k | DCHECK_EQ(literal_count_ % 8, 0); | 461 | 12.9k | FlushBufferedValues(false); | 462 | 12.9k | } | 463 | 103k | } | 464 | 244k | } |
_ZN5doris10RleEncoderIbE3PutEbm Line | Count | Source | 434 | 487k | void RleEncoder<T>::Put(T value, size_t run_length) { | 435 | 487k | DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); | 436 | | | 437 | | // TODO(perf): remove the loop and use the repeat_count_ | 438 | 2.86M | for (; run_length > 0; run_length--) { | 439 | 2.37M | if (PREDICT_TRUE(current_value_ == value)) { | 440 | 2.34M | ++repeat_count_; | 441 | 2.34M | if (repeat_count_ > 8) { | 442 | | // This is just a continuation of the current run, no need to buffer the | 443 | | // values. | 444 | | // Note that this is the fast path for long repeated runs. | 445 | 2.14M | continue; | 446 | 2.14M | } | 447 | 2.34M | } else { | 448 | 30.1k | if (repeat_count_ >= 8) { | 449 | | // We had a run that was long enough but it has ended. Flush the | 450 | | // current repeated run. | 451 | 19.8k | DCHECK_EQ(literal_count_, 0); | 452 | 19.8k | FlushRepeatedRun(); | 453 | 19.8k | } | 454 | 30.1k | repeat_count_ = 1; | 455 | 30.1k | current_value_ = value; | 456 | 30.1k | } | 457 | | | 458 | 225k | buffered_values_[num_buffered_values_] = value; | 459 | 225k | if (++num_buffered_values_ == 8) { | 460 | 28.1k | DCHECK_EQ(literal_count_ % 8, 0); | 461 | 28.1k | FlushBufferedValues(false); | 462 | 28.1k | } | 463 | 225k | } | 464 | 487k | } |
|
465 | | |
466 | | template <typename T> |
467 | 17.4k | void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) { |
468 | 17.4k | if (literal_indicator_byte_idx_ < 0) { |
469 | | // The literal indicator byte has not been reserved yet, get one now. |
470 | 1.32k | literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1); |
471 | 1.32k | DCHECK_GE(literal_indicator_byte_idx_, 0); |
472 | 1.32k | } |
473 | | |
474 | | // Write all the buffered values as bit packed literals |
475 | 148k | for (int i = 0; i < num_buffered_values_; ++i) { |
476 | 130k | bit_writer_.PutValue(buffered_values_[i], bit_width_); |
477 | 130k | } |
478 | 17.4k | num_buffered_values_ = 0; |
479 | | |
480 | 17.4k | if (update_indicator_byte) { |
481 | | // At this point we need to write the indicator byte for the literal run. |
482 | | // We only reserve one byte, to allow for streaming writes of literal values. |
483 | | // The logic makes sure we flush literal runs often enough to not overrun |
484 | | // the 1 byte. |
485 | 1.32k | int num_groups = BitUtil::Ceil(literal_count_, 8); |
486 | 1.32k | int32_t indicator_value = (num_groups << 1) | 1; |
487 | 1.32k | DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); |
488 | 1.32k | bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value; |
489 | 1.32k | literal_indicator_byte_idx_ = -1; |
490 | 1.32k | literal_count_ = 0; |
491 | 1.32k | } |
492 | 17.4k | } _ZN5doris10RleEncoderImE15FlushLiteralRunEb Line | Count | Source | 467 | 12.0k | void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) { | 468 | 12.0k | if (literal_indicator_byte_idx_ < 0) { | 469 | | // The literal indicator byte has not been reserved yet, get one now. | 470 | 1.12k | literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1); | 471 | 1.12k | DCHECK_GE(literal_indicator_byte_idx_, 0); | 472 | 1.12k | } | 473 | | | 474 | | // Write all the buffered values as bit packed literals | 475 | 101k | for (int i = 0; i < num_buffered_values_; ++i) { | 476 | 88.9k | bit_writer_.PutValue(buffered_values_[i], bit_width_); | 477 | 88.9k | } | 478 | 12.0k | num_buffered_values_ = 0; | 479 | | | 480 | 12.0k | if (update_indicator_byte) { | 481 | | // At this point we need to write the indicator byte for the literal run. | 482 | | // We only reserve one byte, to allow for streaming writes of literal values. | 483 | | // The logic makes sure we flush literal runs often enough to not overrun | 484 | | // the 1 byte. | 485 | 1.12k | int num_groups = BitUtil::Ceil(literal_count_, 8); | 486 | 1.12k | int32_t indicator_value = (num_groups << 1) | 1; | 487 | 1.12k | DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); | 488 | 1.12k | bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value; | 489 | 1.12k | literal_indicator_byte_idx_ = -1; | 490 | 1.12k | literal_count_ = 0; | 491 | 1.12k | } | 492 | 12.0k | } |
_ZN5doris10RleEncoderIbE15FlushLiteralRunEb Line | Count | Source | 467 | 5.35k | void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) { | 468 | 5.35k | if (literal_indicator_byte_idx_ < 0) { | 469 | | // The literal indicator byte has not been reserved yet, get one now. | 470 | 206 | literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1); | 471 | 206 | DCHECK_GE(literal_indicator_byte_idx_, 0); | 472 | 206 | } | 473 | | | 474 | | // Write all the buffered values as bit packed literals | 475 | 47.1k | for (int i = 0; i < num_buffered_values_; ++i) { | 476 | 41.7k | bit_writer_.PutValue(buffered_values_[i], bit_width_); | 477 | 41.7k | } | 478 | 5.35k | num_buffered_values_ = 0; | 479 | | | 480 | 5.35k | if (update_indicator_byte) { | 481 | | // At this point we need to write the indicator byte for the literal run. | 482 | | // We only reserve one byte, to allow for streaming writes of literal values. | 483 | | // The logic makes sure we flush literal runs often enough to not overrun | 484 | | // the 1 byte. | 485 | 206 | int num_groups = BitUtil::Ceil(literal_count_, 8); | 486 | 206 | int32_t indicator_value = (num_groups << 1) | 1; | 487 | 206 | DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); | 488 | 206 | bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value; | 489 | 206 | literal_indicator_byte_idx_ = -1; | 490 | 206 | literal_count_ = 0; | 491 | 206 | } | 492 | 5.35k | } |
|
493 | | |
494 | | template <typename T> |
495 | 22.0k | void RleEncoder<T>::FlushRepeatedRun() { |
496 | 22.0k | DCHECK_GT(repeat_count_, 0); |
497 | | // The lsb of 0 indicates this is a repeated run |
498 | 22.0k | int32_t indicator_value = repeat_count_ << 1 | 0; |
499 | 22.0k | bit_writer_.PutVlqInt(indicator_value); |
500 | 22.0k | bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); |
501 | 22.0k | num_buffered_values_ = 0; |
502 | 22.0k | repeat_count_ = 0; |
503 | 22.0k | } _ZN5doris10RleEncoderImE16FlushRepeatedRunEv Line | Count | Source | 495 | 1.91k | void RleEncoder<T>::FlushRepeatedRun() { | 496 | 1.91k | DCHECK_GT(repeat_count_, 0); | 497 | | // The lsb of 0 indicates this is a repeated run | 498 | 1.91k | int32_t indicator_value = repeat_count_ << 1 | 0; | 499 | 1.91k | bit_writer_.PutVlqInt(indicator_value); | 500 | 1.91k | bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); | 501 | 1.91k | num_buffered_values_ = 0; | 502 | 1.91k | repeat_count_ = 0; | 503 | 1.91k | } |
_ZN5doris10RleEncoderIbE16FlushRepeatedRunEv Line | Count | Source | 495 | 20.1k | void RleEncoder<T>::FlushRepeatedRun() { | 496 | 20.1k | DCHECK_GT(repeat_count_, 0); | 497 | | // The lsb of 0 indicates this is a repeated run | 498 | 20.1k | int32_t indicator_value = repeat_count_ << 1 | 0; | 499 | 20.1k | bit_writer_.PutVlqInt(indicator_value); | 500 | 20.1k | bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); | 501 | 20.1k | num_buffered_values_ = 0; | 502 | 20.1k | repeat_count_ = 0; | 503 | 20.1k | } |
|
504 | | |
505 | | // Flush the values that have been buffered. At this point we decide whether |
506 | | // we need to switch between the run types or continue the current one. |
507 | | template <typename T> |
508 | 41.0k | void RleEncoder<T>::FlushBufferedValues(bool done) { |
509 | 41.0k | if (repeat_count_ >= 8) { |
510 | | // Clear the buffered values. They are part of the repeated run now and we |
511 | | // don't want to flush them out as literals. |
512 | 24.7k | num_buffered_values_ = 0; |
513 | 24.7k | if (literal_count_ != 0) { |
514 | | // There was a current literal run. All the values in it have been flushed |
515 | | // but we still need to update the indicator byte. |
516 | 918 | DCHECK_EQ(literal_count_ % 8, 0); |
517 | 918 | DCHECK_EQ(repeat_count_, 8); |
518 | 918 | FlushLiteralRun(true); |
519 | 918 | } |
520 | 24.7k | DCHECK_EQ(literal_count_, 0); |
521 | 24.7k | return; |
522 | 24.7k | } |
523 | | |
524 | 16.2k | literal_count_ += num_buffered_values_; |
525 | 16.2k | int num_groups = BitUtil::Ceil(literal_count_, 8); |
526 | 16.2k | if (num_groups + 1 >= (1 << 6)) { |
527 | | // We need to start a new literal run because the indicator byte we've reserved |
528 | | // cannot store more values. |
529 | 167 | DCHECK_GE(literal_indicator_byte_idx_, 0); |
530 | 167 | FlushLiteralRun(true); |
531 | 16.0k | } else { |
532 | 16.0k | FlushLiteralRun(done); |
533 | 16.0k | } |
534 | 16.2k | repeat_count_ = 0; |
535 | 16.2k | } _ZN5doris10RleEncoderImE19FlushBufferedValuesEb Line | Count | Source | 508 | 12.9k | void RleEncoder<T>::FlushBufferedValues(bool done) { | 509 | 12.9k | if (repeat_count_ >= 8) { | 510 | | // Clear the buffered values. They are part of the repeated run now and we | 511 | | // don't want to flush them out as literals. | 512 | 1.85k | num_buffered_values_ = 0; | 513 | 1.85k | if (literal_count_ != 0) { | 514 | | // There was a current literal run. All the values in it have been flushed | 515 | | // but we still need to update the indicator byte. | 516 | 858 | DCHECK_EQ(literal_count_ % 8, 0); | 517 | 858 | DCHECK_EQ(repeat_count_, 8); | 518 | 858 | FlushLiteralRun(true); | 519 | 858 | } | 520 | 1.85k | DCHECK_EQ(literal_count_, 0); | 521 | 1.85k | return; | 522 | 1.85k | } | 523 | | | 524 | 11.0k | literal_count_ += num_buffered_values_; | 525 | 11.0k | int num_groups = BitUtil::Ceil(literal_count_, 8); | 526 | 11.0k | if (num_groups + 1 >= (1 << 6)) { | 527 | | // We need to start a new literal run because the indicator byte we've reserved | 528 | | // cannot store more values. | 529 | 128 | DCHECK_GE(literal_indicator_byte_idx_, 0); | 530 | 128 | FlushLiteralRun(true); | 531 | 10.9k | } else { | 532 | 10.9k | FlushLiteralRun(done); | 533 | 10.9k | } | 534 | 11.0k | repeat_count_ = 0; | 535 | 11.0k | } |
_ZN5doris10RleEncoderIbE19FlushBufferedValuesEb Line | Count | Source | 508 | 28.1k | void RleEncoder<T>::FlushBufferedValues(bool done) { | 509 | 28.1k | if (repeat_count_ >= 8) { | 510 | | // Clear the buffered values. They are part of the repeated run now and we | 511 | | // don't want to flush them out as literals. | 512 | 22.9k | num_buffered_values_ = 0; | 513 | 22.9k | if (literal_count_ != 0) { | 514 | | // There was a current literal run. All the values in it have been flushed | 515 | | // but we still need to update the indicator byte. | 516 | 60 | DCHECK_EQ(literal_count_ % 8, 0); | 517 | 60 | DCHECK_EQ(repeat_count_, 8); | 518 | 60 | FlushLiteralRun(true); | 519 | 60 | } | 520 | 22.9k | DCHECK_EQ(literal_count_, 0); | 521 | 22.9k | return; | 522 | 22.9k | } | 523 | | | 524 | 5.18k | literal_count_ += num_buffered_values_; | 525 | 5.18k | int num_groups = BitUtil::Ceil(literal_count_, 8); | 526 | 5.18k | if (num_groups + 1 >= (1 << 6)) { | 527 | | // We need to start a new literal run because the indicator byte we've reserved | 528 | | // cannot store more values. | 529 | 39 | DCHECK_GE(literal_indicator_byte_idx_, 0); | 530 | 39 | FlushLiteralRun(true); | 531 | 5.14k | } else { | 532 | 5.14k | FlushLiteralRun(done); | 533 | 5.14k | } | 534 | 5.18k | repeat_count_ = 0; | 535 | 5.18k | } |
|
536 | | |
537 | | template <typename T> |
538 | 0 | void RleEncoder<T>::Reserve(int num_bytes, uint8_t val) { |
539 | 0 | for (int i = 0; i < num_bytes; ++i) { |
540 | 0 | bit_writer_.PutValue(val, 8); |
541 | 0 | } |
542 | 0 | } |
543 | | |
544 | | template <typename T> |
545 | 804 | int RleEncoder<T>::Flush() { |
546 | 804 | if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { |
547 | 802 | bool all_repeat = literal_count_ == 0 && |
548 | 802 | (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); |
549 | | // There is something pending, figure out if it's a repeated or literal run |
550 | 802 | if (repeat_count_ > 0 && all_repeat) { |
551 | 561 | FlushRepeatedRun(); |
552 | 561 | } else { |
553 | 241 | literal_count_ += num_buffered_values_; |
554 | 241 | FlushLiteralRun(true); |
555 | 241 | repeat_count_ = 0; |
556 | 241 | } |
557 | 802 | } |
558 | 804 | bit_writer_.Flush(); |
559 | 804 | DCHECK_EQ(num_buffered_values_, 0); |
560 | 804 | DCHECK_EQ(literal_count_, 0); |
561 | 804 | DCHECK_EQ(repeat_count_, 0); |
562 | 804 | return bit_writer_.bytes_written(); |
563 | 804 | } _ZN5doris10RleEncoderImE5FlushEv Line | Count | Source | 545 | 394 | int RleEncoder<T>::Flush() { | 546 | 394 | if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { | 547 | 394 | bool all_repeat = literal_count_ == 0 && | 548 | 394 | (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); | 549 | | // There is something pending, figure out if it's a repeated or literal run | 550 | 394 | if (repeat_count_ > 0 && all_repeat) { | 551 | 260 | FlushRepeatedRun(); | 552 | 260 | } else { | 553 | 134 | literal_count_ += num_buffered_values_; | 554 | 134 | FlushLiteralRun(true); | 555 | 134 | repeat_count_ = 0; | 556 | 134 | } | 557 | 394 | } | 558 | 394 | bit_writer_.Flush(); | 559 | 394 | DCHECK_EQ(num_buffered_values_, 0); | 560 | 394 | DCHECK_EQ(literal_count_, 0); | 561 | 394 | DCHECK_EQ(repeat_count_, 0); | 562 | 394 | return bit_writer_.bytes_written(); | 563 | 394 | } |
_ZN5doris10RleEncoderIbE5FlushEv Line | Count | Source | 545 | 410 | int RleEncoder<T>::Flush() { | 546 | 410 | if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { | 547 | 408 | bool all_repeat = literal_count_ == 0 && | 548 | 408 | (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); | 549 | | // There is something pending, figure out if it's a repeated or literal run | 550 | 408 | if (repeat_count_ > 0 && all_repeat) { | 551 | 301 | FlushRepeatedRun(); | 552 | 301 | } else { | 553 | 107 | literal_count_ += num_buffered_values_; | 554 | 107 | FlushLiteralRun(true); | 555 | 107 | repeat_count_ = 0; | 556 | 107 | } | 557 | 408 | } | 558 | 410 | bit_writer_.Flush(); | 559 | 410 | DCHECK_EQ(num_buffered_values_, 0); | 560 | 410 | DCHECK_EQ(literal_count_, 0); | 561 | 410 | DCHECK_EQ(repeat_count_, 0); | 562 | 410 | return bit_writer_.bytes_written(); | 563 | 410 | } |
|
564 | | |
565 | | template <typename T> |
566 | 6.54k | void RleEncoder<T>::Clear() { |
567 | 6.54k | current_value_ = 0; |
568 | 6.54k | repeat_count_ = 0; |
569 | 6.54k | num_buffered_values_ = 0; |
570 | 6.54k | literal_count_ = 0; |
571 | 6.54k | literal_indicator_byte_idx_ = -1; |
572 | 6.54k | bit_writer_.Clear(); |
573 | 6.54k | } _ZN5doris10RleEncoderImE5ClearEv Line | Count | Source | 566 | 394 | void RleEncoder<T>::Clear() { | 567 | 394 | current_value_ = 0; | 568 | 394 | repeat_count_ = 0; | 569 | 394 | num_buffered_values_ = 0; | 570 | 394 | literal_count_ = 0; | 571 | 394 | literal_indicator_byte_idx_ = -1; | 572 | 394 | bit_writer_.Clear(); | 573 | 394 | } |
_ZN5doris10RleEncoderIbE5ClearEv Line | Count | Source | 566 | 6.14k | void RleEncoder<T>::Clear() { | 567 | 6.14k | current_value_ = 0; | 568 | 6.14k | repeat_count_ = 0; | 569 | 6.14k | num_buffered_values_ = 0; | 570 | 6.14k | literal_count_ = 0; | 571 | 6.14k | literal_indicator_byte_idx_ = -1; | 572 | 6.14k | bit_writer_.Clear(); | 573 | 6.14k | } |
|
574 | | |
575 | | // Copy from https://github.com/apache/impala/blob/master/be/src/util/rle-encoding.h |
576 | | // Utility classes to do run length encoding (RLE) for fixed bit width values. If runs |
577 | | // are sufficiently long, RLE is used, otherwise, the values are just bit-packed |
578 | | // (literal encoding). |
579 | | // |
580 | | // For both types of runs, there is a byte-aligned indicator which encodes the length |
581 | | // of the run and the type of the run. |
582 | | // |
583 | | // This encoding has the benefit that when there aren't any long enough runs, values |
584 | | // are always decoded at fixed (can be precomputed) bit offsets OR both the value and |
585 | | // the run length are byte aligned. This allows for very efficient decoding |
586 | | // implementations. |
587 | | // The encoding is: |
588 | | // encoded-block := run* |
589 | | // run := literal-run | repeated-run |
590 | | // literal-run := literal-indicator < literal bytes > |
591 | | // repeated-run := repeated-indicator < repeated value. padded to byte boundary > |
592 | | // literal-indicator := varint_encode( number_of_groups << 1 | 1) |
593 | | // repeated-indicator := varint_encode( number_of_repetitions << 1 ) |
594 | | // |
595 | | // Each run is preceded by a varint. The varint's least significant bit is |
596 | | // used to indicate whether the run is a literal run or a repeated run. The rest |
597 | | // of the varint is used to determine the length of the run (eg how many times the |
598 | | // value repeats). |
599 | | // |
600 | | // In the case of literal runs, the run length is always a multiple of 8 (i.e. encode |
601 | | // in groups of 8), so that no matter the bit-width of the value, the sequence will end |
602 | | // on a byte boundary without padding. |
603 | | // Given that we know it is a multiple of 8, we store the number of 8-groups rather than |
604 | | // the actual number of encoded ints. (This means that the total number of encoded values |
605 | | // can not be determined from the encoded data, since the number of values in the last |
606 | | // group may not be a multiple of 8). For the last group of literal runs, we pad |
607 | | // the group to 8 with zeros. This allows for 8 at a time decoding on the read side |
608 | | // without the need for additional checks. |
609 | | // |
610 | | // There is a break-even point when it is more storage efficient to do run length |
611 | | // encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes |
612 | | // for both the repeated encoding or the literal encoding. This value can always |
613 | | // be computed based on the bit-width. |
614 | | // TODO: For 1 bit-width values it can be optimal to use 16 or 24 values, but more |
615 | | // investigation is needed to do this efficiently, see the reverted IMPALA-6658. |
616 | | // TODO: think about how to use this for strings. The bit packing isn't quite the same. |
617 | | // |
618 | | // Examples with bit-width 1 (eg encoding booleans): |
619 | | // ---------------------------------------- |
620 | | // 100 1s followed by 100 0s: |
621 | | // <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte> |
622 | | // - (total 4 bytes) |
623 | | // |
624 | | // alternating 1s and 0s (200 total): |
625 | | // 200 ints = 25 groups of 8 |
626 | | // <varint((25 << 1) | 1)> <25 bytes of values, bitpacked> |
627 | | // (total 26 bytes, 1 byte overhead) |
628 | | |
629 | | // RLE decoder with a batch-oriented interface that enables fast decoding. |
630 | | // Users of this class must first initialize the class to point to a buffer of |
631 | | // RLE-encoded data, passed into the constructor or Reset(). The provided |
632 | | // bit_width must be at most min(sizeof(T) * 8, BatchedBitReader::MAX_BITWIDTH). |
633 | | // Then they can decode data by checking NextNumRepeats()/NextNumLiterals() to |
634 | | // see if the next run is a repeated or literal run, then calling |
635 | | // GetRepeatedValue() or GetLiteralValues() respectively to read the values. |
636 | | // |
637 | | // End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0. |
638 | | // Other decoding errors are signalled by functions returning false. If an |
639 | | // error is encountered then it is not valid to read any more data until |
640 | | // Reset() is called. |
641 | | |
642 | | //bit-packed-run-len and rle-run-len must be in the range [1, 2^31 - 1]. |
643 | | // This means that a Parquet implementation can always store the run length in a signed 32-bit integer. |
644 | | template <typename T> |
645 | | class RleBatchDecoder { |
646 | | public: |
647 | 35 | RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) { |
648 | 35 | Reset(buffer, buffer_len, bit_width); |
649 | 35 | } |
650 | | |
651 | | RleBatchDecoder() = default; |
652 | | |
653 | | // Reset the decoder to read from a new buffer. |
654 | | void Reset(uint8_t* buffer, int buffer_len, int bit_width); |
655 | | |
656 | | // Return the size of the current repeated run. Returns zero if the current run is |
657 | | // a literal run or if no more runs can be read from the input. |
658 | | int32_t NextNumRepeats(); |
659 | | |
660 | | // Get the value of the current repeated run and consume the given number of repeats. |
661 | | // Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot |
662 | | // be greater than the remaining number of repeats in the run. 'num_repeats_to_consume' |
663 | | // can be set to 0 to peek at the value without consuming repeats. |
664 | | T GetRepeatedValue(int32_t num_repeats_to_consume); |
665 | | |
666 | | // Return the size of the current literal run. Returns zero if the current run is |
667 | | // a repeated run or if no more runs can be read from the input. |
668 | | int32_t NextNumLiterals(); |
669 | | |
670 | | // Consume 'num_literals_to_consume' literals from the current literal run, |
671 | | // copying the values to 'values'. 'num_literals_to_consume' must be <= |
672 | | // NextNumLiterals(). Returns true if the requested number of literals were |
673 | | // successfully read or false if an error was encountered, e.g. the input was |
674 | | // truncated. |
675 | | bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT; |
676 | | |
677 | | // Consume 'num_values_to_consume' values and copy them to 'values'. |
678 | | // Returns the number of consumed values or 0 if an error occurred. |
679 | | uint32_t GetBatch(T* values, uint32_t batch_num); |
680 | | |
681 | | private: |
682 | | // Called when both 'literal_count_' and 'repeat_count_' have been exhausted. |
683 | | // Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal |
684 | | // or repeated run, or leaves both at 0 if no more values can be read (either because |
685 | | // the end of the input was reached or an error was encountered decoding). |
686 | | void NextCounts(); |
687 | | |
688 | | /// Fill the literal buffer. Invalid to call if there are already buffered literals. |
689 | | /// Return false if the input was truncated. This does not advance 'literal_count_'. |
690 | | bool FillLiteralBuffer() WARN_UNUSED_RESULT; |
691 | | |
692 | 20 | bool HaveBufferedLiterals() const { return literal_buffer_pos_ < num_buffered_literals_; } |
693 | | |
694 | | /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing |
695 | | /// 'literal_count_'. Returns the number of literals outputted. |
696 | | int32_t OutputBufferedLiterals(int32_t max_to_output, T* values); |
697 | | |
698 | | BatchedBitReader bit_reader_; |
699 | | |
700 | | // Number of bits needed to encode the value. Must be between 0 and 64 after |
701 | | // the decoder is initialized with a buffer. -1 indicates the decoder was not |
702 | | // initialized. |
703 | | int bit_width_ = -1; |
704 | | |
705 | | // If a repeated run, the number of repeats remaining in the current run to be read. |
706 | | // If the current run is a literal run, this is 0. |
707 | | int32_t repeat_count_ = 0; |
708 | | |
709 | | // If a literal run, the number of literals remaining in the current run to be read. |
710 | | // If the current run is a repeated run, this is 0. |
711 | | int32_t literal_count_ = 0; |
712 | | |
713 | | // If a repeated run, the current repeated value. |
714 | | T repeated_value_; |
715 | | |
716 | | // Size of buffer for literal values. Large enough to decode a full batch of 32 |
717 | | // literals. The buffer is needed to allow clients to read in batches that are not |
718 | | // multiples of 32. |
719 | | static constexpr int LITERAL_BUFFER_LEN = 32; |
720 | | |
721 | | // Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' is the |
722 | | // position of the next literal to be read from the buffer. |
723 | | T literal_buffer_[LITERAL_BUFFER_LEN]; |
724 | | int num_buffered_literals_ = 0; |
725 | | int literal_buffer_pos_ = 0; |
726 | | }; |
727 | | |
728 | | template <typename T> |
729 | 20 | int32_t RleBatchDecoder<T>::OutputBufferedLiterals(int32_t max_to_output, T* values) { |
730 | 20 | int32_t num_to_output = |
731 | 20 | std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_); |
732 | 20 | memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * num_to_output); |
733 | 20 | literal_buffer_pos_ += num_to_output; |
734 | 20 | literal_count_ -= num_to_output; |
735 | 20 | return num_to_output; |
736 | 20 | } |
737 | | |
738 | | template <typename T> |
739 | 35 | void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) { |
740 | 35 | bit_reader_.Reset(buffer, buffer_len); |
741 | 35 | bit_width_ = bit_width; |
742 | 35 | repeat_count_ = 0; |
743 | 35 | literal_count_ = 0; |
744 | 35 | num_buffered_literals_ = 0; |
745 | 35 | literal_buffer_pos_ = 0; |
746 | 35 | } |
747 | | |
748 | | template <typename T> |
749 | 56 | int32_t RleBatchDecoder<T>::NextNumRepeats() { |
750 | 56 | if (repeat_count_ > 0) return repeat_count_; |
751 | 54 | if (literal_count_ == 0) NextCounts(); |
752 | 54 | return repeat_count_; |
753 | 56 | } |
754 | | |
755 | | template <typename T> |
756 | 54 | void RleBatchDecoder<T>::NextCounts() { |
757 | | // Read the next run's indicator int, it could be a literal or repeated run. |
758 | | // The int is encoded as a ULEB128-encoded value. |
759 | 54 | uint32_t indicator_value = 0; |
760 | 54 | if (UNLIKELY(!bit_reader_.GetUleb128<uint32_t>(&indicator_value))) { |
761 | 0 | return; |
762 | 0 | } |
763 | | |
764 | | // lsb indicates if it is a literal run or repeated run |
765 | 54 | bool is_literal = indicator_value & 1; |
766 | | |
767 | | // Don't try to handle run lengths that don't fit in an int32_t - just fail gracefully. |
768 | | // The Parquet standard does not allow longer runs - see PARQUET-1290. |
769 | 54 | uint32_t run_len = indicator_value >> 1; |
770 | 54 | if (is_literal) { |
771 | | // Use int64_t to avoid overflowing multiplication. |
772 | 20 | int64_t literal_count = static_cast<int64_t>(run_len) * 8; |
773 | 20 | if (UNLIKELY(literal_count > std::numeric_limits<int32_t>::max())) return; |
774 | 20 | literal_count_ = literal_count; |
775 | 34 | } else { |
776 | 34 | if (UNLIKELY(run_len == 0)) return; |
777 | 34 | bool result = bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_); |
778 | 34 | if (UNLIKELY(!result)) return; |
779 | 34 | repeat_count_ = run_len; |
780 | 34 | } |
781 | 54 | } |
782 | | |
783 | | template <typename T> |
784 | 36 | T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) { |
785 | 36 | repeat_count_ -= num_repeats_to_consume; |
786 | 36 | return repeated_value_; |
787 | 36 | } |
788 | | |
789 | | template <typename T> |
790 | 20 | int32_t RleBatchDecoder<T>::NextNumLiterals() { |
791 | 20 | if (literal_count_ > 0) return literal_count_; |
792 | 0 | if (repeat_count_ == 0) NextCounts(); |
793 | 0 | return literal_count_; |
794 | 20 | } |
795 | | |
796 | | template <typename T> |
797 | 20 | bool RleBatchDecoder<T>::GetLiteralValues(int32_t num_literals_to_consume, T* values) { |
798 | 20 | int32_t num_consumed = 0; |
799 | | // Copy any buffered literals left over from previous calls. |
800 | 20 | if (HaveBufferedLiterals()) { |
801 | 0 | num_consumed = OutputBufferedLiterals(num_literals_to_consume, values); |
802 | 0 | } |
803 | | |
804 | 20 | int32_t num_remaining = num_literals_to_consume - num_consumed; |
805 | | // Copy literals directly to the output, bypassing 'literal_buffer_' when possible. |
806 | | // Need to round to a batch of 32 if the caller is consuming only part of the current |
807 | | // run avoid ending on a non-byte boundary. |
808 | 20 | int32_t num_to_bypass = |
809 | 20 | std::min<int32_t>(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32)); |
810 | 20 | if (num_to_bypass > 0) { |
811 | 0 | int num_read = bit_reader_.UnpackBatch(bit_width_, num_to_bypass, values + num_consumed); |
812 | | // If we couldn't read the expected number, that means the input was truncated. |
813 | 0 | if (num_read < num_to_bypass) return false; |
814 | 0 | literal_count_ -= num_to_bypass; |
815 | 0 | num_consumed += num_to_bypass; |
816 | 0 | num_remaining = num_literals_to_consume - num_consumed; |
817 | 0 | } |
818 | | |
819 | 20 | if (num_remaining > 0) { |
820 | | // We weren't able to copy all the literals requested directly from the input. |
821 | | // Buffer literals and copy over the requested number. |
822 | 20 | if (UNLIKELY(!FillLiteralBuffer())) return false; |
823 | 20 | OutputBufferedLiterals(num_remaining, values + num_consumed); |
824 | 20 | } |
825 | 20 | return true; |
826 | 20 | } |
827 | | |
828 | | template <typename T> |
829 | 20 | bool RleBatchDecoder<T>::FillLiteralBuffer() { |
830 | 20 | int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN, literal_count_); |
831 | 20 | num_buffered_literals_ = bit_reader_.UnpackBatch(bit_width_, num_to_buffer, literal_buffer_); |
832 | | // If we couldn't read the expected number, that means the input was truncated. |
833 | 20 | if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false; |
834 | 20 | literal_buffer_pos_ = 0; |
835 | 20 | return true; |
836 | 20 | } |
837 | | |
838 | | template <typename T> |
839 | 37 | uint32_t RleBatchDecoder<T>::GetBatch(T* values, uint32_t batch_num) { |
840 | 37 | uint32_t num_consumed = 0; |
841 | 93 | while (num_consumed < batch_num) { |
842 | | // Add RLE encoded values by repeating the current value this number of times. |
843 | 56 | uint32_t num_repeats = NextNumRepeats(); |
844 | 56 | if (num_repeats > 0) { |
845 | 36 | int32_t num_repeats_to_set = std::min(num_repeats, batch_num - num_consumed); |
846 | 36 | T repeated_value = GetRepeatedValue(num_repeats_to_set); |
847 | 292 | for (int i = 0; i < num_repeats_to_set; ++i) { |
848 | 256 | values[num_consumed + i] = repeated_value; |
849 | 256 | } |
850 | 36 | num_consumed += num_repeats_to_set; |
851 | 36 | continue; |
852 | 36 | } |
853 | | |
854 | | // Add remaining literal values, if any. |
855 | 20 | uint32_t num_literals = NextNumLiterals(); |
856 | 20 | if (num_literals == 0) { |
857 | 0 | break; |
858 | 0 | } |
859 | 20 | uint32_t num_literals_to_set = std::min(num_literals, batch_num - num_consumed); |
860 | 20 | if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) { |
861 | 0 | return 0; |
862 | 0 | } |
863 | 20 | num_consumed += num_literals_to_set; |
864 | 20 | } |
865 | 37 | return num_consumed; |
866 | 37 | } |
867 | | |
868 | | } // namespace doris |