/root/doris/be/src/util/rle_encoding.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | #pragma once |
18 | | |
19 | | #include <glog/logging.h> |
20 | | |
21 | | #include <limits> // IWYU pragma: keep |
22 | | |
23 | | #include "common/cast_set.h" |
24 | | #include "util/bit_stream_utils.inline.h" |
25 | | #include "util/bit_util.h" |
26 | | |
27 | | namespace doris { |
28 | | #include "common/compile_check_begin.h" |
29 | | |
30 | | // Utility classes to do run length encoding (RLE) for fixed bit width values. If runs |
31 | | // are sufficiently long, RLE is used, otherwise, the values are just bit-packed |
32 | | // (literal encoding). |
33 | | // For both types of runs, there is a byte-aligned indicator which encodes the length |
34 | | // of the run and the type of the run. |
35 | | // This encoding has the benefit that when there aren't any long enough runs, values |
36 | | // are always decoded at fixed (can be precomputed) bit offsets OR both the value and |
37 | | // the run length are byte aligned. This allows for very efficient decoding |
38 | | // implementations. |
39 | | // The encoding is: |
40 | | // encoded-block := run* |
41 | | // run := literal-run | repeated-run |
42 | | // literal-run := literal-indicator < literal bytes > |
43 | | // repeated-run := repeated-indicator < repeated value. padded to byte boundary > |
44 | | // literal-indicator := varint_encode( number_of_groups << 1 | 1) |
45 | | // repeated-indicator := varint_encode( number_of_repetitions << 1 ) |
46 | | // |
47 | | // Each run is preceded by a varint. The varint's least significant bit is |
48 | | // used to indicate whether the run is a literal run or a repeated run. The rest |
49 | | // of the varint is used to determine the length of the run (eg how many times the |
50 | | // value repeats). |
51 | | // |
52 | | // In the case of literal runs, the run length is always a multiple of 8 (i.e. encode |
53 | | // in groups of 8), so that no matter the bit-width of the value, the sequence will end |
54 | | // on a byte boundary without padding. |
55 | | // Given that we know it is a multiple of 8, we store the number of 8-groups rather than |
56 | | // the actual number of encoded ints. (This means that the total number of encoded values |
57 | | // can not be determined from the encoded data, since the number of values in the last |
58 | | // group may not be a multiple of 8). |
59 | | // There is a break-even point when it is more storage efficient to do run length |
60 | | // encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes |
61 | | // for both the repeated encoding or the literal encoding. This value can always |
62 | | // be computed based on the bit-width. |
63 | | // TODO: think about how to use this for strings. The bit packing isn't quite the same. |
64 | | // |
65 | | // Examples with bit-width 1 (eg encoding booleans): |
66 | | // ---------------------------------------- |
67 | | // 100 1s followed by 100 0s: |
68 | | // <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte> |
69 | | // - (total 4 bytes) |
70 | | // |
71 | | // alternating 1s and 0s (200 total): |
72 | | // 200 ints = 25 groups of 8 |
73 | | // <varint((25 << 1) | 1)> <25 bytes of values, bitpacked> |
74 | | // (total 26 bytes, 1 byte overhead) |
75 | | // |
76 | | |
77 | | // Decoder class for RLE encoded data. |
78 | | // |
79 | | // NOTE: the encoded format does not have any length prefix or any other way of |
80 | | // indicating that the encoded sequence ends at a certain point, so the Decoder |
81 | | // methods may return some extra bits at the end before the read methods start |
82 | | // to return 0/false. |
83 | | template <typename T> |
84 | | class RleDecoder { |
85 | | public: |
86 | | // Create a decoder object. buffer/buffer_len is the decoded data. |
87 | | // bit_width is the width of each value (before encoding). |
88 | | RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width) |
89 | 1.20k | : bit_reader_(buffer, buffer_len), |
90 | 1.20k | bit_width_(bit_width), |
91 | 1.20k | current_value_(0), |
92 | 1.20k | repeat_count_(0), |
93 | 1.20k | literal_count_(0), |
94 | 1.20k | rewind_state_(CANT_REWIND) { |
95 | 1.20k | DCHECK_GE(bit_width_, 1); |
96 | 1.20k | DCHECK_LE(bit_width_, 64); |
97 | 1.20k | } _ZN5doris10RleDecoderImEC2EPKhii Line | Count | Source | 89 | 412 | : bit_reader_(buffer, buffer_len), | 90 | 412 | bit_width_(bit_width), | 91 | 412 | current_value_(0), | 92 | 412 | repeat_count_(0), | 93 | 412 | literal_count_(0), | 94 | 412 | rewind_state_(CANT_REWIND) { | 95 | 412 | DCHECK_GE(bit_width_, 1); | 96 | | DCHECK_LE(bit_width_, 64); | 97 | 412 | } |
_ZN5doris10RleDecoderIbEC2EPKhii Line | Count | Source | 89 | 641 | : bit_reader_(buffer, buffer_len), | 90 | 641 | bit_width_(bit_width), | 91 | 641 | current_value_(0), | 92 | 641 | repeat_count_(0), | 93 | 641 | literal_count_(0), | 94 | 641 | rewind_state_(CANT_REWIND) { | 95 | 641 | DCHECK_GE(bit_width_, 1); | 96 | | DCHECK_LE(bit_width_, 64); | 97 | 641 | } |
_ZN5doris10RleDecoderIhEC2EPKhii Line | Count | Source | 89 | 6 | : bit_reader_(buffer, buffer_len), | 90 | 6 | bit_width_(bit_width), | 91 | 6 | current_value_(0), | 92 | 6 | repeat_count_(0), | 93 | 6 | literal_count_(0), | 94 | 6 | rewind_state_(CANT_REWIND) { | 95 | 6 | DCHECK_GE(bit_width_, 1); | 96 | | DCHECK_LE(bit_width_, 64); | 97 | 6 | } |
_ZN5doris10RleDecoderIsEC2EPKhii Line | Count | Source | 89 | 146 | : bit_reader_(buffer, buffer_len), | 90 | 146 | bit_width_(bit_width), | 91 | 146 | current_value_(0), | 92 | 146 | repeat_count_(0), | 93 | 146 | literal_count_(0), | 94 | 146 | rewind_state_(CANT_REWIND) { | 95 | 146 | DCHECK_GE(bit_width_, 1); | 96 | | DCHECK_LE(bit_width_, 64); | 97 | 146 | } |
|
98 | | |
99 | 30.0k | RleDecoder() {}_ZN5doris10RleDecoderIbEC2Ev Line | Count | Source | 99 | 29.7k | RleDecoder() {} |
_ZN5doris10RleDecoderIhEC2Ev Line | Count | Source | 99 | 6 | RleDecoder() {} |
_ZN5doris10RleDecoderIsEC2Ev Line | Count | Source | 99 | 303 | RleDecoder() {} |
|
100 | | |
101 | | // Skip n values, and returns the number of non-zero entries skipped. |
102 | | size_t Skip(size_t to_skip); |
103 | | |
104 | | // Gets the next value. Returns false if there are no more. |
105 | | bool Get(T* val); |
106 | | |
107 | | // Seek to the previous value. |
108 | | void RewindOne(); |
109 | | |
110 | | // Gets the next run of the same 'val'. Returns 0 if there is no |
111 | | // more data to be decoded. Will return a run of at most 'max_run' |
112 | | // values. If there are more values than this, the next call to |
113 | | // GetNextRun will return more from the same run. |
114 | | size_t GetNextRun(T* val, size_t max_run); |
115 | | |
116 | | size_t get_values(T* values, size_t num_values); |
117 | | |
118 | | // Get the count of current repeated value |
119 | | size_t repeated_count(); |
120 | | |
121 | | // Get current repeated value, make sure that count equals repeated_count() |
122 | | T get_repeated_value(size_t count); |
123 | | |
124 | 0 | const BitReader& bit_reader() const { return bit_reader_; } |
125 | | |
126 | | private: |
127 | | bool ReadHeader(); |
128 | | |
129 | | enum RewindState { REWIND_LITERAL, REWIND_RUN, CANT_REWIND }; |
130 | | |
131 | | BitReader bit_reader_; |
132 | | int bit_width_; |
133 | | uint64_t current_value_; |
134 | | uint32_t repeat_count_; |
135 | | uint32_t literal_count_; |
136 | | RewindState rewind_state_; |
137 | | }; |
138 | | |
139 | | // Class to incrementally build the rle data. |
140 | | // The encoding has two modes: encoding repeated runs and literal runs. |
141 | | // If the run is sufficiently short, it is more efficient to encode as a literal run. |
142 | | // This class does so by buffering 8 values at a time. If they are not all the same |
143 | | // they are added to the literal run. If they are the same, they are added to the |
144 | | // repeated run. When we switch modes, the previous run is flushed out. |
145 | | template <typename T> |
146 | | class RleEncoder { |
147 | | public: |
148 | | // buffer: buffer to write bits to. |
149 | | // bit_width: max number of bits for value. |
150 | | // TODO: consider adding a min_repeated_run_length so the caller can control |
151 | | // when values should be encoded as repeated runs. Currently this is derived |
152 | | // based on the bit_width, which can determine a storage optimal choice. |
153 | | explicit RleEncoder(faststring* buffer, int bit_width) |
154 | 11.0M | : bit_width_(bit_width), bit_writer_(buffer) { |
155 | 11.0M | DCHECK_GE(bit_width_, 1); |
156 | 11.0M | DCHECK_LE(bit_width_, 64); |
157 | 11.0M | Clear(); |
158 | 11.0M | } _ZN5doris10RleEncoderImEC2EPNS_10faststringEi Line | Count | Source | 154 | 11.0M | : bit_width_(bit_width), bit_writer_(buffer) { | 155 | 11.0M | DCHECK_GE(bit_width_, 1); | 156 | | DCHECK_LE(bit_width_, 64); | 157 | 11.0M | Clear(); | 158 | 11.0M | } |
_ZN5doris10RleEncoderIbEC2EPNS_10faststringEi Line | Count | Source | 154 | 6.50k | : bit_width_(bit_width), bit_writer_(buffer) { | 155 | 6.50k | DCHECK_GE(bit_width_, 1); | 156 | | DCHECK_LE(bit_width_, 64); | 157 | 6.50k | Clear(); | 158 | 6.50k | } |
_ZN5doris10RleEncoderIhEC2EPNS_10faststringEi Line | Count | Source | 154 | 1 | : bit_width_(bit_width), bit_writer_(buffer) { | 155 | 1 | DCHECK_GE(bit_width_, 1); | 156 | | DCHECK_LE(bit_width_, 64); | 157 | 1 | Clear(); | 158 | 1 | } |
|
159 | | |
160 | | // Reserve 'num_bytes' bytes for a plain encoded header, set each |
161 | | // byte with 'val': this is used for the RLE-encoded data blocks in |
162 | | // order to be able to able to store the initial ordinal position |
163 | | // and number of elements. This is a part of RleEncoder in order to |
164 | | // maintain the correct offset in 'buffer'. |
165 | | void Reserve(int num_bytes, uint8_t val); |
166 | | |
167 | | // Encode value. This value must be representable with bit_width_ bits. |
168 | | void Put(T value, size_t run_length = 1); |
169 | | |
170 | | // Flushes any pending values to the underlying buffer. |
171 | | // Returns the total number of bytes written |
172 | | int Flush(); |
173 | | |
174 | | // Resets all the state in the encoder. |
175 | | void Clear(); |
176 | | |
177 | 307 | int32_t len() const { return bit_writer_.bytes_written(); }_ZNK5doris10RleEncoderIbE3lenEv Line | Count | Source | 177 | 306 | int32_t len() const { return bit_writer_.bytes_written(); } |
_ZNK5doris10RleEncoderIhE3lenEv Line | Count | Source | 177 | 1 | int32_t len() const { return bit_writer_.bytes_written(); } |
|
178 | | |
179 | | private: |
180 | | // Flushes any buffered values. If this is part of a repeated run, this is largely |
181 | | // a no-op. |
182 | | // If it is part of a literal run, this will call FlushLiteralRun, which writes |
183 | | // out the buffered literal values. |
184 | | // If 'done' is true, the current run would be written even if it would normally |
185 | | // have been buffered more. This should only be called at the end, when the |
186 | | // encoder has received all values even if it would normally continue to be |
187 | | // buffered. |
188 | | void FlushBufferedValues(bool done); |
189 | | |
190 | | // Flushes literal values to the underlying buffer. If update_indicator_byte, |
191 | | // then the current literal run is complete and the indicator byte is updated. |
192 | | void FlushLiteralRun(bool update_indicator_byte); |
193 | | |
194 | | // Flushes a repeated run to the underlying buffer. |
195 | | void FlushRepeatedRun(); |
196 | | |
197 | | // Number of bits needed to encode the value. |
198 | | const int bit_width_; |
199 | | |
200 | | // Underlying buffer. |
201 | | BitWriter bit_writer_; |
202 | | |
203 | | // We need to buffer at most 8 values for literals. This happens when the |
204 | | // bit_width is 1 (so 8 values fit in one byte). |
205 | | // TODO: generalize this to other bit widths |
206 | | uint64_t buffered_values_[8]; |
207 | | |
208 | | // Number of values in buffered_values_ |
209 | | int num_buffered_values_; |
210 | | |
211 | | // The current (also last) value that was written and the count of how |
212 | | // many times in a row that value has been seen. This is maintained even |
213 | | // if we are in a literal run. If the repeat_count_ get high enough, we switch |
214 | | // to encoding repeated runs. |
215 | | uint64_t current_value_; |
216 | | int repeat_count_; |
217 | | |
218 | | // Number of literals in the current run. This does not include the literals |
219 | | // that might be in buffered_values_. Only after we've got a group big enough |
220 | | // can we decide if they should part of the literal_count_ or repeat_count_ |
221 | | int literal_count_; |
222 | | |
223 | | // Index of a byte in the underlying buffer that stores the indicator byte. |
224 | | // This is reserved as soon as we need a literal run but the value is written |
225 | | // when the literal run is complete. We maintain an index rather than a pointer |
226 | | // into the underlying buffer because the pointer value may become invalid if |
227 | | // the underlying buffer is resized. |
228 | | int literal_indicator_byte_idx_; |
229 | | }; |
230 | | |
231 | | template <typename T> |
232 | 337k | bool RleDecoder<T>::ReadHeader() { |
233 | 337k | DCHECK(bit_reader_.is_initialized()); |
234 | 337k | if (literal_count_ == 0 && repeat_count_ == 0) [[unlikely]] { |
235 | | // Read the next run's indicator int, it could be a literal or repeated run |
236 | | // The int is encoded as a vlq-encoded value. |
237 | 26.1k | uint32_t indicator_value = 0; |
238 | 26.1k | bool result = bit_reader_.GetVlqInt(&indicator_value); |
239 | 26.1k | if (!result) [[unlikely]] { |
240 | 308 | return false; |
241 | 308 | } |
242 | | |
243 | | // lsb indicates if it is a literal run or repeated run |
244 | 25.8k | bool is_literal = indicator_value & 1; |
245 | 25.8k | if (is_literal) { |
246 | 2.62k | literal_count_ = (indicator_value >> 1) * 8; |
247 | 2.62k | DCHECK_GT(literal_count_, 0); |
248 | 23.2k | } else { |
249 | 23.2k | repeat_count_ = indicator_value >> 1; |
250 | 23.2k | DCHECK_GT(repeat_count_, 0); |
251 | 23.2k | bool result1 = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), |
252 | 23.2k | reinterpret_cast<T*>(¤t_value_)); |
253 | 23.2k | DCHECK(result1); |
254 | 23.2k | } |
255 | 25.8k | } |
256 | 336k | return true; |
257 | 337k | } _ZN5doris10RleDecoderIsE10ReadHeaderEv Line | Count | Source | 232 | 290 | bool RleDecoder<T>::ReadHeader() { | 233 | 290 | DCHECK(bit_reader_.is_initialized()); | 234 | 290 | if (literal_count_ == 0 && repeat_count_ == 0) [[unlikely]] { | 235 | | // Read the next run's indicator int, it could be a literal or repeated run | 236 | | // The int is encoded as a vlq-encoded value. | 237 | 132 | uint32_t indicator_value = 0; | 238 | 132 | bool result = bit_reader_.GetVlqInt(&indicator_value); | 239 | 132 | if (!result) [[unlikely]] { | 240 | 0 | return false; | 241 | 0 | } | 242 | | | 243 | | // lsb indicates if it is a literal run or repeated run | 244 | 132 | bool is_literal = indicator_value & 1; | 245 | 132 | if (is_literal) { | 246 | 25 | literal_count_ = (indicator_value >> 1) * 8; | 247 | 25 | DCHECK_GT(literal_count_, 0); | 248 | 107 | } else { | 249 | 107 | repeat_count_ = indicator_value >> 1; | 250 | 107 | DCHECK_GT(repeat_count_, 0); | 251 | 107 | bool result1 = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), | 252 | 107 | reinterpret_cast<T*>(¤t_value_)); | 253 | 107 | DCHECK(result1); | 254 | 107 | } | 255 | 132 | } | 256 | 290 | return true; | 257 | 290 | } |
_ZN5doris10RleDecoderImE10ReadHeaderEv Line | Count | Source | 232 | 248k | bool RleDecoder<T>::ReadHeader() { | 233 | 248k | DCHECK(bit_reader_.is_initialized()); | 234 | 248k | if (literal_count_ == 0 && repeat_count_ == 0) [[unlikely]] { | 235 | | // Read the next run's indicator int, it could be a literal or repeated run | 236 | | // The int is encoded as a vlq-encoded value. | 237 | 3.07k | uint32_t indicator_value = 0; | 238 | 3.07k | bool result = bit_reader_.GetVlqInt(&indicator_value); | 239 | 3.07k | if (!result) [[unlikely]] { | 240 | 3 | return false; | 241 | 3 | } | 242 | | | 243 | | // lsb indicates if it is a literal run or repeated run | 244 | 3.07k | bool is_literal = indicator_value & 1; | 245 | 3.07k | if (is_literal) { | 246 | 1.12k | literal_count_ = (indicator_value >> 1) * 8; | 247 | 1.12k | DCHECK_GT(literal_count_, 0); | 248 | 1.94k | } else { | 249 | 1.94k | repeat_count_ = indicator_value >> 1; | 250 | 1.94k | DCHECK_GT(repeat_count_, 0); | 251 | 1.94k | bool result1 = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), | 252 | 1.94k | reinterpret_cast<T*>(¤t_value_)); | 253 | 1.94k | DCHECK(result1); | 254 | 1.94k | } | 255 | 3.07k | } | 256 | 248k | return true; | 257 | 248k | } |
_ZN5doris10RleDecoderIbE10ReadHeaderEv Line | Count | Source | 232 | 88.0k | bool RleDecoder<T>::ReadHeader() { | 233 | 88.0k | DCHECK(bit_reader_.is_initialized()); | 234 | 88.0k | if (literal_count_ == 0 && repeat_count_ == 0) [[unlikely]] { | 235 | | // Read the next run's indicator int, it could be a literal or repeated run | 236 | | // The int is encoded as a vlq-encoded value. | 237 | 22.9k | uint32_t indicator_value = 0; | 238 | 22.9k | bool result = bit_reader_.GetVlqInt(&indicator_value); | 239 | 22.9k | if (!result) [[unlikely]] { | 240 | 305 | return false; | 241 | 305 | } | 242 | | | 243 | | // lsb indicates if it is a literal run or repeated run | 244 | 22.6k | bool is_literal = indicator_value & 1; | 245 | 22.6k | if (is_literal) { | 246 | 1.46k | literal_count_ = (indicator_value >> 1) * 8; | 247 | 1.46k | DCHECK_GT(literal_count_, 0); | 248 | 21.2k | } else { | 249 | 21.2k | repeat_count_ = indicator_value >> 1; | 250 | 21.2k | DCHECK_GT(repeat_count_, 0); | 251 | 21.2k | bool result1 = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), | 252 | 21.2k | reinterpret_cast<T*>(¤t_value_)); | 253 | 21.2k | DCHECK(result1); | 254 | 21.2k | } | 255 | 22.6k | } | 256 | 87.7k | return true; | 257 | 88.0k | } |
_ZN5doris10RleDecoderIhE10ReadHeaderEv Line | Count | Source | 232 | 355 | bool RleDecoder<T>::ReadHeader() { | 233 | 355 | DCHECK(bit_reader_.is_initialized()); | 234 | 355 | if (literal_count_ == 0 && repeat_count_ == 0) [[unlikely]] { | 235 | | // Read the next run's indicator int, it could be a literal or repeated run | 236 | | // The int is encoded as a vlq-encoded value. | 237 | 8 | uint32_t indicator_value = 0; | 238 | 8 | bool result = bit_reader_.GetVlqInt(&indicator_value); | 239 | 8 | if (!result) [[unlikely]] { | 240 | 0 | return false; | 241 | 0 | } | 242 | | | 243 | | // lsb indicates if it is a literal run or repeated run | 244 | 8 | bool is_literal = indicator_value & 1; | 245 | 8 | if (is_literal) { | 246 | 5 | literal_count_ = (indicator_value >> 1) * 8; | 247 | 5 | DCHECK_GT(literal_count_, 0); | 248 | 5 | } else { | 249 | 3 | repeat_count_ = indicator_value >> 1; | 250 | 3 | DCHECK_GT(repeat_count_, 0); | 251 | 3 | bool result1 = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8), | 252 | 3 | reinterpret_cast<T*>(¤t_value_)); | 253 | 3 | DCHECK(result1); | 254 | 3 | } | 255 | 8 | } | 256 | 355 | return true; | 257 | 355 | } |
|
258 | | |
259 | | template <typename T> |
260 | 259k | bool RleDecoder<T>::Get(T* val) { |
261 | 259k | DCHECK(bit_reader_.is_initialized()); |
262 | 259k | if (!ReadHeader()) [[unlikely]] { |
263 | 5 | return false; |
264 | 5 | } |
265 | | |
266 | 259k | if (repeat_count_ > 0) [[likely]] { |
267 | 169k | *val = cast_set<T>(current_value_); |
268 | 169k | --repeat_count_; |
269 | 169k | rewind_state_ = REWIND_RUN; |
270 | 169k | } else { |
271 | 89.2k | DCHECK(literal_count_ > 0); |
272 | 89.2k | bool result = bit_reader_.GetValue(bit_width_, val); |
273 | 89.2k | DCHECK(result); |
274 | 89.2k | --literal_count_; |
275 | 89.2k | rewind_state_ = REWIND_LITERAL; |
276 | 89.2k | } |
277 | | |
278 | 259k | return true; |
279 | 259k | } _ZN5doris10RleDecoderIsE3GetEPs Line | Count | Source | 260 | 14 | bool RleDecoder<T>::Get(T* val) { | 261 | 14 | DCHECK(bit_reader_.is_initialized()); | 262 | 14 | if (!ReadHeader()) [[unlikely]] { | 263 | 0 | return false; | 264 | 0 | } | 265 | | | 266 | 14 | if (repeat_count_ > 0) [[likely]] { | 267 | 6 | *val = cast_set<T>(current_value_); | 268 | 6 | --repeat_count_; | 269 | 6 | rewind_state_ = REWIND_RUN; | 270 | 8 | } else { | 271 | 8 | DCHECK(literal_count_ > 0); | 272 | 8 | bool result = bit_reader_.GetValue(bit_width_, val); | 273 | 8 | DCHECK(result); | 274 | 8 | --literal_count_; | 275 | 8 | rewind_state_ = REWIND_LITERAL; | 276 | 8 | } | 277 | | | 278 | 14 | return true; | 279 | 14 | } |
_ZN5doris10RleDecoderImE3GetEPm Line | Count | Source | 260 | 248k | bool RleDecoder<T>::Get(T* val) { | 261 | 248k | DCHECK(bit_reader_.is_initialized()); | 262 | 248k | if (!ReadHeader()) [[unlikely]] { | 263 | 3 | return false; | 264 | 3 | } | 265 | | | 266 | 248k | if (repeat_count_ > 0) [[likely]] { | 267 | 159k | *val = cast_set<T>(current_value_); | 268 | 159k | --repeat_count_; | 269 | 159k | rewind_state_ = REWIND_RUN; | 270 | 159k | } else { | 271 | 89.1k | DCHECK(literal_count_ > 0); | 272 | 89.1k | bool result = bit_reader_.GetValue(bit_width_, val); | 273 | 89.1k | DCHECK(result); | 274 | 89.1k | --literal_count_; | 275 | 89.1k | rewind_state_ = REWIND_LITERAL; | 276 | 89.1k | } | 277 | | | 278 | 248k | return true; | 279 | 248k | } |
_ZN5doris10RleDecoderIbE3GetEPb Line | Count | Source | 260 | 10.4k | bool RleDecoder<T>::Get(T* val) { | 261 | 10.4k | DCHECK(bit_reader_.is_initialized()); | 262 | 10.4k | if (!ReadHeader()) [[unlikely]] { | 263 | 2 | return false; | 264 | 2 | } | 265 | | | 266 | 10.4k | if (repeat_count_ > 0) [[likely]] { | 267 | 10.3k | *val = cast_set<T>(current_value_); | 268 | 10.3k | --repeat_count_; | 269 | 10.3k | rewind_state_ = REWIND_RUN; | 270 | 10.3k | } else { | 271 | 111 | DCHECK(literal_count_ > 0); | 272 | 111 | bool result = bit_reader_.GetValue(bit_width_, val); | 273 | 111 | DCHECK(result); | 274 | 111 | --literal_count_; | 275 | 111 | rewind_state_ = REWIND_LITERAL; | 276 | 111 | } | 277 | | | 278 | 10.4k | return true; | 279 | 10.4k | } |
_ZN5doris10RleDecoderIhE3GetEPh Line | Count | Source | 260 | 350 | bool RleDecoder<T>::Get(T* val) { | 261 | 350 | DCHECK(bit_reader_.is_initialized()); | 262 | 350 | if (!ReadHeader()) [[unlikely]] { | 263 | 0 | return false; | 264 | 0 | } | 265 | | | 266 | 350 | if (repeat_count_ > 0) [[likely]] { | 267 | 350 | *val = cast_set<T>(current_value_); | 268 | 350 | --repeat_count_; | 269 | 350 | rewind_state_ = REWIND_RUN; | 270 | 350 | } else { | 271 | 0 | DCHECK(literal_count_ > 0); | 272 | 0 | bool result = bit_reader_.GetValue(bit_width_, val); | 273 | 0 | DCHECK(result); | 274 | 0 | --literal_count_; | 275 | 0 | rewind_state_ = REWIND_LITERAL; | 276 | 0 | } | 277 | | | 278 | 350 | return true; | 279 | 350 | } |
|
280 | | |
281 | | template <typename T> |
282 | 4 | void RleDecoder<T>::RewindOne() { |
283 | 4 | DCHECK(bit_reader_.is_initialized()); |
284 | | |
285 | 4 | switch (rewind_state_) { |
286 | 0 | case CANT_REWIND: |
287 | 0 | throw Exception(Status::FatalError("Can't rewind more than once after each read!")); |
288 | 0 | break; |
289 | 2 | case REWIND_RUN: |
290 | 2 | ++repeat_count_; |
291 | 2 | break; |
292 | 2 | case REWIND_LITERAL: { |
293 | 2 | bit_reader_.Rewind(bit_width_); |
294 | 2 | ++literal_count_; |
295 | 2 | break; |
296 | 0 | } |
297 | 4 | } |
298 | | |
299 | 4 | rewind_state_ = CANT_REWIND; |
300 | 4 | } |
301 | | |
302 | | template <typename T> |
303 | 55.5k | size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) { |
304 | 55.5k | DCHECK(bit_reader_.is_initialized()); |
305 | 55.5k | DCHECK_GT(max_run, 0); |
306 | 55.5k | size_t ret = 0; |
307 | 55.5k | size_t rem = max_run; |
308 | 77.8k | while (ReadHeader()) { |
309 | 77.5k | if (repeat_count_ > 0) [[likely]] { |
310 | 44.6k | if (ret > 0 && *val != current_value_) [[unlikely]] { |
311 | 19.8k | return ret; |
312 | 19.8k | } |
313 | 24.8k | *val = cast_set<T>(current_value_); |
314 | 24.8k | if (repeat_count_ >= rem) { |
315 | | // The next run is longer than the amount of remaining data |
316 | | // that the caller wants to read. Only consume it partially. |
317 | 3.85k | repeat_count_ -= rem; |
318 | 3.85k | ret += rem; |
319 | 3.85k | return ret; |
320 | 3.85k | } |
321 | 20.9k | ret += repeat_count_; |
322 | 20.9k | rem -= repeat_count_; |
323 | 20.9k | repeat_count_ = 0; |
324 | 32.8k | } else { |
325 | 32.8k | DCHECK(literal_count_ > 0); |
326 | 32.8k | if (ret == 0) { |
327 | 31.6k | bool has_more = bit_reader_.GetValue(bit_width_, val); |
328 | 31.6k | DCHECK(has_more); |
329 | 31.6k | literal_count_--; |
330 | 31.6k | ret++; |
331 | 31.6k | rem--; |
332 | 31.6k | } |
333 | | |
334 | 92.6k | while (literal_count_ > 0) { |
335 | 91.3k | bool result = bit_reader_.GetValue(bit_width_, ¤t_value_); |
336 | 91.3k | DCHECK(result); |
337 | 91.3k | if (current_value_ != *val || rem == 0) { |
338 | 31.5k | bit_reader_.Rewind(bit_width_); |
339 | 31.5k | return ret; |
340 | 31.5k | } |
341 | 59.7k | ret++; |
342 | 59.7k | rem--; |
343 | 59.7k | literal_count_--; |
344 | 59.7k | } |
345 | 32.8k | } |
346 | 77.5k | } |
347 | 303 | return ret; |
348 | 55.5k | } _ZN5doris10RleDecoderIsE10GetNextRunEPsm Line | Count | Source | 303 | 232 | size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) { | 304 | 232 | DCHECK(bit_reader_.is_initialized()); | 305 | 232 | DCHECK_GT(max_run, 0); | 306 | 232 | size_t ret = 0; | 307 | 232 | size_t rem = max_run; | 308 | 232 | while (ReadHeader()) { | 309 | 232 | if (repeat_count_ > 0) [[likely]] { | 310 | 199 | if (ret > 0 && *val != current_value_) [[unlikely]] { | 311 | 0 | return ret; | 312 | 0 | } | 313 | 199 | *val = cast_set<T>(current_value_); | 314 | 199 | if (repeat_count_ >= rem) { | 315 | | // The next run is longer than the amount of remaining data | 316 | | // that the caller wants to read. Only consume it partially. | 317 | 199 | repeat_count_ -= rem; | 318 | 199 | ret += rem; | 319 | 199 | return ret; | 320 | 199 | } | 321 | 0 | ret += repeat_count_; | 322 | 0 | rem -= repeat_count_; | 323 | 0 | repeat_count_ = 0; | 324 | 33 | } else { | 325 | 33 | DCHECK(literal_count_ > 0); | 326 | 33 | if (ret == 0) { | 327 | 33 | bool has_more = bit_reader_.GetValue(bit_width_, val); | 328 | 33 | DCHECK(has_more); | 329 | 33 | literal_count_--; | 330 | 33 | ret++; | 331 | 33 | rem--; | 332 | 33 | } | 333 | | | 334 | 53 | while (literal_count_ > 0) { | 335 | 53 | bool result = bit_reader_.GetValue(bit_width_, ¤t_value_); | 336 | 53 | DCHECK(result); | 337 | 53 | if (current_value_ != *val || rem == 0) { | 338 | 33 | bit_reader_.Rewind(bit_width_); | 339 | 33 | return ret; | 340 | 33 | } | 341 | 20 | ret++; | 342 | 20 | rem--; | 343 | 20 | literal_count_--; | 344 | 20 | } | 345 | 33 | } | 346 | 232 | } | 347 | 0 | return ret; | 348 | 232 | } |
_ZN5doris10RleDecoderIbE10GetNextRunEPbm Line | Count | Source | 303 | 55.3k | size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) { | 304 | 55.3k | DCHECK(bit_reader_.is_initialized()); | 305 | 55.3k | DCHECK_GT(max_run, 0); | 306 | 55.3k | size_t ret = 0; | 307 | 55.3k | size_t rem = max_run; | 308 | 77.5k | while (ReadHeader()) { | 309 | 77.2k | if (repeat_count_ > 0) [[likely]] { | 310 | 44.4k | if (ret > 0 && *val != current_value_) [[unlikely]] { | 311 | 19.8k | return ret; | 312 | 19.8k | } | 313 | 24.6k | *val = cast_set<T>(current_value_); | 314 | 24.6k | if (repeat_count_ >= rem) { | 315 | | // The next run is longer than the amount of remaining data | 316 | | // that the caller wants to read. Only consume it partially. | 317 | 3.65k | repeat_count_ -= rem; | 318 | 3.65k | ret += rem; | 319 | 3.65k | return ret; | 320 | 3.65k | } | 321 | 20.9k | ret += repeat_count_; | 322 | 20.9k | rem -= repeat_count_; | 323 | 20.9k | repeat_count_ = 0; | 324 | 32.7k | } else { | 325 | 32.7k | DCHECK(literal_count_ > 0); | 326 | 32.7k | if (ret == 0) { | 327 | 31.6k | bool has_more = bit_reader_.GetValue(bit_width_, val); | 328 | 31.6k | DCHECK(has_more); | 329 | 31.6k | literal_count_--; | 330 | 31.6k | ret++; | 331 | 31.6k | rem--; | 332 | 31.6k | } | 333 | | | 334 | 92.5k | while (literal_count_ > 0) { | 335 | 91.3k | bool result = bit_reader_.GetValue(bit_width_, ¤t_value_); | 336 | 91.3k | DCHECK(result); | 337 | 91.3k | if (current_value_ != *val || rem == 0) { | 338 | 31.5k | bit_reader_.Rewind(bit_width_); | 339 | 31.5k | return ret; | 340 | 31.5k | } | 341 | 59.7k | ret++; | 342 | 59.7k | rem--; | 343 | 59.7k | literal_count_--; | 344 | 59.7k | } | 345 | 32.7k | } | 346 | 77.2k | } | 347 | 303 | return ret; | 348 | 55.3k | } |
|
349 | | |
350 | | template <typename T> |
351 | 49 | size_t RleDecoder<T>::get_values(T* values, size_t num_values) { |
352 | 49 | size_t read_num = 0; |
353 | 146 | while (read_num < num_values) { |
354 | 97 | size_t read_this_time = num_values - read_num; |
355 | | |
356 | 97 | if (LIKELY(repeat_count_ > 0)) { |
357 | 33 | read_this_time = std::min((size_t)repeat_count_, read_this_time); |
358 | 33 | std::fill(values, values + read_this_time, current_value_); |
359 | 33 | values += read_this_time; |
360 | 33 | repeat_count_ -= read_this_time; |
361 | 33 | read_num += read_this_time; |
362 | 64 | } else if (literal_count_ > 0) { |
363 | 16 | read_this_time = std::min((size_t)literal_count_, read_this_time); |
364 | 79 | for (int i = 0; i < read_this_time; ++i) { |
365 | 63 | bool result = bit_reader_.GetValue(bit_width_, values); |
366 | 63 | DCHECK(result); |
367 | 63 | values++; |
368 | 63 | } |
369 | 16 | literal_count_ -= read_this_time; |
370 | 16 | read_num += read_this_time; |
371 | 48 | } else { |
372 | 48 | if (!ReadHeader()) { |
373 | 0 | return read_num; |
374 | 0 | } |
375 | 48 | } |
376 | 97 | } |
377 | 49 | return read_num; |
378 | 49 | } _ZN5doris10RleDecoderIhE10get_valuesEPhm Line | Count | Source | 351 | 5 | size_t RleDecoder<T>::get_values(T* values, size_t num_values) { | 352 | 5 | size_t read_num = 0; | 353 | 14 | while (read_num < num_values) { | 354 | 9 | size_t read_this_time = num_values - read_num; | 355 | | | 356 | 9 | if (LIKELY(repeat_count_ > 0)) { | 357 | 0 | read_this_time = std::min((size_t)repeat_count_, read_this_time); | 358 | 0 | std::fill(values, values + read_this_time, current_value_); | 359 | 0 | values += read_this_time; | 360 | 0 | repeat_count_ -= read_this_time; | 361 | 0 | read_num += read_this_time; | 362 | 9 | } else if (literal_count_ > 0) { | 363 | 5 | read_this_time = std::min((size_t)literal_count_, read_this_time); | 364 | 40 | for (int i = 0; i < read_this_time; ++i) { | 365 | 35 | bool result = bit_reader_.GetValue(bit_width_, values); | 366 | 35 | DCHECK(result); | 367 | 35 | values++; | 368 | 35 | } | 369 | 5 | literal_count_ -= read_this_time; | 370 | 5 | read_num += read_this_time; | 371 | 5 | } else { | 372 | 4 | if (!ReadHeader()) { | 373 | 0 | return read_num; | 374 | 0 | } | 375 | 4 | } | 376 | 9 | } | 377 | 5 | return read_num; | 378 | 5 | } |
_ZN5doris10RleDecoderIsE10get_valuesEPsm Line | Count | Source | 351 | 44 | size_t RleDecoder<T>::get_values(T* values, size_t num_values) { | 352 | 44 | size_t read_num = 0; | 353 | 132 | while (read_num < num_values) { | 354 | 88 | size_t read_this_time = num_values - read_num; | 355 | | | 356 | 88 | if (LIKELY(repeat_count_ > 0)) { | 357 | 33 | read_this_time = std::min((size_t)repeat_count_, read_this_time); | 358 | 33 | std::fill(values, values + read_this_time, current_value_); | 359 | 33 | values += read_this_time; | 360 | 33 | repeat_count_ -= read_this_time; | 361 | 33 | read_num += read_this_time; | 362 | 55 | } else if (literal_count_ > 0) { | 363 | 11 | read_this_time = std::min((size_t)literal_count_, read_this_time); | 364 | 39 | for (int i = 0; i < read_this_time; ++i) { | 365 | 28 | bool result = bit_reader_.GetValue(bit_width_, values); | 366 | 28 | DCHECK(result); | 367 | 28 | values++; | 368 | 28 | } | 369 | 11 | literal_count_ -= read_this_time; | 370 | 11 | read_num += read_this_time; | 371 | 44 | } else { | 372 | 44 | if (!ReadHeader()) { | 373 | 0 | return read_num; | 374 | 0 | } | 375 | 44 | } | 376 | 88 | } | 377 | 44 | return read_num; | 378 | 44 | } |
|
379 | | |
380 | | template <typename T> |
381 | | size_t RleDecoder<T>::repeated_count() { |
382 | | if (repeat_count_ > 0) { |
383 | | return repeat_count_; |
384 | | } |
385 | | if (literal_count_ == 0) { |
386 | | ReadHeader(); |
387 | | } |
388 | | return repeat_count_; |
389 | | } |
390 | | |
391 | | template <typename T> |
392 | | T RleDecoder<T>::get_repeated_value(size_t count) { |
393 | | DCHECK_GE(repeat_count_, count); |
394 | | repeat_count_ -= count; |
395 | | return current_value_; |
396 | | } |
397 | | |
398 | | template <typename T> |
399 | 8 | size_t RleDecoder<T>::Skip(size_t to_skip) { |
400 | 8 | DCHECK(bit_reader_.is_initialized()); |
401 | | |
402 | 8 | size_t set_count = 0; |
403 | 19 | while (to_skip > 0) { |
404 | 11 | bool result = ReadHeader(); |
405 | 11 | DCHECK(result); |
406 | | |
407 | 11 | if (repeat_count_ > 0) [[likely]] { |
408 | 7 | size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip; |
409 | 7 | repeat_count_ -= nskip; |
410 | 7 | to_skip -= nskip; |
411 | 7 | if (current_value_ != 0) { |
412 | 3 | set_count += nskip; |
413 | 3 | } |
414 | 7 | } else { |
415 | 4 | DCHECK(literal_count_ > 0); |
416 | 4 | size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip; |
417 | 4 | literal_count_ -= nskip; |
418 | 4 | to_skip -= nskip; |
419 | 60 | for (; nskip > 0; nskip--) { |
420 | 56 | T value = 0; |
421 | 56 | bool result1 = bit_reader_.GetValue(bit_width_, &value); |
422 | 56 | DCHECK(result1); |
423 | 56 | if (value != 0) { |
424 | 28 | set_count++; |
425 | 28 | } |
426 | 56 | } |
427 | 4 | } |
428 | 11 | } |
429 | 8 | return set_count; |
430 | 8 | } _ZN5doris10RleDecoderIbE4SkipEm Line | Count | Source | 399 | 7 | size_t RleDecoder<T>::Skip(size_t to_skip) { | 400 | 7 | DCHECK(bit_reader_.is_initialized()); | 401 | | | 402 | 7 | size_t set_count = 0; | 403 | 17 | while (to_skip > 0) { | 404 | 10 | bool result = ReadHeader(); | 405 | 10 | DCHECK(result); | 406 | | | 407 | 10 | if (repeat_count_ > 0) [[likely]] { | 408 | 7 | size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip; | 409 | 7 | repeat_count_ -= nskip; | 410 | 7 | to_skip -= nskip; | 411 | 7 | if (current_value_ != 0) { | 412 | 3 | set_count += nskip; | 413 | 3 | } | 414 | 7 | } else { | 415 | 3 | DCHECK(literal_count_ > 0); | 416 | 3 | size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip; | 417 | 3 | literal_count_ -= nskip; | 418 | 3 | to_skip -= nskip; | 419 | 56 | for (; nskip > 0; nskip--) { | 420 | 53 | T value = 0; | 421 | 53 | bool result1 = bit_reader_.GetValue(bit_width_, &value); | 422 | 53 | DCHECK(result1); | 423 | 53 | if (value != 0) { | 424 | 26 | set_count++; | 425 | 26 | } | 426 | 53 | } | 427 | 3 | } | 428 | 10 | } | 429 | 7 | return set_count; | 430 | 7 | } |
_ZN5doris10RleDecoderIhE4SkipEm Line | Count | Source | 399 | 1 | size_t RleDecoder<T>::Skip(size_t to_skip) { | 400 | 1 | DCHECK(bit_reader_.is_initialized()); | 401 | | | 402 | 1 | size_t set_count = 0; | 403 | 2 | while (to_skip > 0) { | 404 | 1 | bool result = ReadHeader(); | 405 | 1 | DCHECK(result); | 406 | | | 407 | 1 | if (repeat_count_ > 0) [[likely]] { | 408 | 0 | size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip; | 409 | 0 | repeat_count_ -= nskip; | 410 | 0 | to_skip -= nskip; | 411 | 0 | if (current_value_ != 0) { | 412 | 0 | set_count += nskip; | 413 | 0 | } | 414 | 1 | } else { | 415 | 1 | DCHECK(literal_count_ > 0); | 416 | 1 | size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip; | 417 | 1 | literal_count_ -= nskip; | 418 | 1 | to_skip -= nskip; | 419 | 4 | for (; nskip > 0; nskip--) { | 420 | 3 | T value = 0; | 421 | 3 | bool result1 = bit_reader_.GetValue(bit_width_, &value); | 422 | 3 | DCHECK(result1); | 423 | 3 | if (value != 0) { | 424 | 2 | set_count++; | 425 | 2 | } | 426 | 3 | } | 427 | 1 | } | 428 | 1 | } | 429 | 1 | return set_count; | 430 | 1 | } |
|
431 | | |
432 | | // This function buffers input values 8 at a time. After seeing all 8 values, |
433 | | // it decides whether they should be encoded as a literal or repeated run. |
434 | | template <typename T> |
435 | 20.0M | void RleEncoder<T>::Put(T value, size_t run_length) { |
436 | 20.0M | DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); |
437 | | |
438 | | // Fast path: if this is a continuation of the current repeated run and |
439 | | // we've already buffered enough values, just increment repeat_count_ |
440 | 20.0M | if (current_value_ == value && repeat_count_ >= 8 && run_length > 0) [[likely]] { |
441 | 790k | repeat_count_ += run_length; |
442 | 790k | return; |
443 | 790k | } |
444 | | |
445 | | // Handle run_length > 1 more efficiently |
446 | 37.4M | while (run_length > 0) { |
447 | 37.2M | if (current_value_ == value) [[likely]] { |
448 | | // Need to buffer values until we reach 8 |
449 | 19.1M | size_t to_buffer = std::min(run_length, size_t(8 - num_buffered_values_)); |
450 | 153M | for (size_t i = 0; i < to_buffer; ++i) { |
451 | 134M | buffered_values_[num_buffered_values_++] = value; |
452 | 134M | ++repeat_count_; |
453 | 134M | } |
454 | 19.1M | run_length -= to_buffer; |
455 | 19.1M | if (num_buffered_values_ == 8) { |
456 | 19.0M | DCHECK_EQ(literal_count_ % 8, 0); |
457 | 19.0M | FlushBufferedValues(false); |
458 | | // After flushing, if we still have a repeated run and more values, |
459 | | // we can add them directly to repeat_count_ |
460 | 19.0M | if (repeat_count_ >= 8 && run_length > 0) { |
461 | 19.0M | repeat_count_ += run_length; |
462 | 19.0M | return; |
463 | 19.0M | } |
464 | 19.0M | } |
465 | 19.1M | } else { |
466 | | // Value changed |
467 | 18.1M | if (repeat_count_ >= 8) { |
468 | | // We had a run that was long enough but it has ended. Flush the |
469 | | // current repeated run. |
470 | 8.02M | DCHECK_EQ(literal_count_, 0); |
471 | 8.02M | FlushRepeatedRun(); |
472 | 8.02M | } |
473 | 18.1M | repeat_count_ = 1; |
474 | 18.1M | current_value_ = value; |
475 | | |
476 | 18.1M | buffered_values_[num_buffered_values_++] = value; |
477 | 18.1M | --run_length; |
478 | 18.1M | if (num_buffered_values_ == 8) { |
479 | 16.1k | DCHECK_EQ(literal_count_ % 8, 0); |
480 | 16.1k | FlushBufferedValues(false); |
481 | 16.1k | } |
482 | 18.1M | } |
483 | 37.2M | } |
484 | 19.2M | } _ZN5doris10RleEncoderImE3PutEmm Line | Count | Source | 435 | 19.2M | void RleEncoder<T>::Put(T value, size_t run_length) { | 436 | 19.2M | DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); | 437 | | | 438 | | // Fast path: if this is a continuation of the current repeated run and | 439 | | // we've already buffered enough values, just increment repeat_count_ | 440 | 19.2M | if (current_value_ == value && repeat_count_ >= 8 && run_length > 0) [[likely]] { | 441 | 154k | repeat_count_ += run_length; | 442 | 154k | return; | 443 | 154k | } | 444 | | | 445 | | // Handle run_length > 1 more efficiently | 446 | 37.2M | while (run_length > 0) { | 447 | 37.1M | if (current_value_ == value) [[likely]] { | 448 | | // Need to buffer values until we reach 8 | 449 | 19.0M | size_t to_buffer = std::min(run_length, size_t(8 - num_buffered_values_)); | 450 | 153M | for (size_t i = 0; i < to_buffer; ++i) { | 451 | 134M | buffered_values_[num_buffered_values_++] = value; | 452 | 134M | ++repeat_count_; | 453 | 134M | } | 454 | 19.0M | run_length -= to_buffer; | 455 | 19.0M | if (num_buffered_values_ == 8) { | 456 | 19.0M | DCHECK_EQ(literal_count_ % 8, 0); | 457 | 19.0M | FlushBufferedValues(false); | 458 | | // After flushing, if we still have a repeated run and more values, | 459 | | // we can add them directly to repeat_count_ | 460 | 19.0M | if (repeat_count_ >= 8 && run_length > 0) { | 461 | 19.0M | repeat_count_ += run_length; | 462 | 19.0M | return; | 463 | 19.0M | } | 464 | 19.0M | } | 465 | 19.0M | } else { | 466 | | // Value changed | 467 | 18.0M | if (repeat_count_ >= 8) { | 468 | | // We had a run that was long enough but it has ended. Flush the | 469 | | // current repeated run. | 470 | 8.00M | DCHECK_EQ(literal_count_, 0); | 471 | 8.00M | FlushRepeatedRun(); | 472 | 8.00M | } | 473 | 18.0M | repeat_count_ = 1; | 474 | 18.0M | current_value_ = value; | 475 | | | 476 | 18.0M | buffered_values_[num_buffered_values_++] = value; | 477 | 18.0M | --run_length; | 478 | 18.0M | if (num_buffered_values_ == 8) { | 479 | | DCHECK_EQ(literal_count_ % 8, 0); | 480 | 9.46k | FlushBufferedValues(false); | 481 | 9.46k | } | 482 | 18.0M | } | 483 | 37.1M | } | 484 | 19.1M | } |
_ZN5doris10RleEncoderIbE3PutEbm Line | Count | Source | 435 | 749k | void RleEncoder<T>::Put(T value, size_t run_length) { | 436 | 749k | DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); | 437 | | | 438 | | // Fast path: if this is a continuation of the current repeated run and | 439 | | // we've already buffered enough values, just increment repeat_count_ | 440 | 749k | if (current_value_ == value && repeat_count_ >= 8 && run_length > 0) [[likely]] { | 441 | 636k | repeat_count_ += run_length; | 442 | 636k | return; | 443 | 636k | } | 444 | | | 445 | | // Handle run_length > 1 more efficiently | 446 | 265k | while (run_length > 0) { | 447 | 179k | if (current_value_ == value) [[likely]] { | 448 | | // Need to buffer values until we reach 8 | 449 | 92.5k | size_t to_buffer = std::min(run_length, size_t(8 - num_buffered_values_)); | 450 | 443k | for (size_t i = 0; i < to_buffer; ++i) { | 451 | 351k | buffered_values_[num_buffered_values_++] = value; | 452 | 351k | ++repeat_count_; | 453 | 351k | } | 454 | 92.5k | run_length -= to_buffer; | 455 | 92.5k | if (num_buffered_values_ == 8) { | 456 | 47.0k | DCHECK_EQ(literal_count_ % 8, 0); | 457 | 47.0k | FlushBufferedValues(false); | 458 | | // After flushing, if we still have a repeated run and more values, | 459 | | // we can add them directly to repeat_count_ | 460 | 47.0k | if (repeat_count_ >= 8 && run_length > 0) { | 461 | 27.3k | repeat_count_ += run_length; | 462 | 27.3k | return; | 463 | 27.3k | } | 464 | 47.0k | } | 465 | 92.5k | } else { | 466 | | // Value changed | 467 | 87.1k | if (repeat_count_ >= 8) { | 468 | | // We had a run that was long enough but it has ended. Flush the | 469 | | // current repeated run. | 470 | 24.0k | DCHECK_EQ(literal_count_, 0); | 471 | 24.0k | FlushRepeatedRun(); | 472 | 24.0k | } | 473 | 87.1k | repeat_count_ = 1; | 474 | 87.1k | current_value_ = value; | 475 | | | 476 | 87.1k | buffered_values_[num_buffered_values_++] = value; | 477 | 87.1k | --run_length; | 478 | 87.1k | if (num_buffered_values_ == 8) { | 479 | | DCHECK_EQ(literal_count_ % 8, 0); | 480 | 6.72k | FlushBufferedValues(false); | 481 | 6.72k | } | 482 | 87.1k | } | 483 | 179k | } | 484 | 112k | } |
_ZN5doris10RleEncoderIhE3PutEhm Line | Count | Source | 435 | 3 | void RleEncoder<T>::Put(T value, size_t run_length) { | 436 | 3 | DCHECK(bit_width_ == 64 || value < (1LL << bit_width_)); | 437 | | | 438 | | // Fast path: if this is a continuation of the current repeated run and | 439 | | // we've already buffered enough values, just increment repeat_count_ | 440 | 3 | if (current_value_ == value && repeat_count_ >= 8 && run_length > 0) [[likely]] { | 441 | 0 | repeat_count_ += run_length; | 442 | 0 | return; | 443 | 0 | } | 444 | | | 445 | | // Handle run_length > 1 more efficiently | 446 | 6 | while (run_length > 0) { | 447 | 6 | if (current_value_ == value) [[likely]] { | 448 | | // Need to buffer values until we reach 8 | 449 | 3 | size_t to_buffer = std::min(run_length, size_t(8 - num_buffered_values_)); | 450 | 24 | for (size_t i = 0; i < to_buffer; ++i) { | 451 | 21 | buffered_values_[num_buffered_values_++] = value; | 452 | 21 | ++repeat_count_; | 453 | 21 | } | 454 | 3 | run_length -= to_buffer; | 455 | 3 | if (num_buffered_values_ == 8) { | 456 | 3 | DCHECK_EQ(literal_count_ % 8, 0); | 457 | 3 | FlushBufferedValues(false); | 458 | | // After flushing, if we still have a repeated run and more values, | 459 | | // we can add them directly to repeat_count_ | 460 | 3 | if (repeat_count_ >= 8 && run_length > 0) { | 461 | 3 | repeat_count_ += run_length; | 462 | 3 | return; | 463 | 3 | } | 464 | 3 | } | 465 | 3 | } else { | 466 | | // Value changed | 467 | 3 | if (repeat_count_ >= 8) { | 468 | | // We had a run that was long enough but it has ended. Flush the | 469 | | // current repeated run. | 470 | 2 | DCHECK_EQ(literal_count_, 0); | 471 | 2 | FlushRepeatedRun(); | 472 | 2 | } | 473 | 3 | repeat_count_ = 1; | 474 | 3 | current_value_ = value; | 475 | | | 476 | 3 | buffered_values_[num_buffered_values_++] = value; | 477 | 3 | --run_length; | 478 | 3 | if (num_buffered_values_ == 8) { | 479 | | DCHECK_EQ(literal_count_ % 8, 0); | 480 | 0 | FlushBufferedValues(false); | 481 | 0 | } | 482 | 3 | } | 483 | 6 | } | 484 | 3 | } |
|
485 | | |
486 | | template <typename T> |
487 | 38.7k | void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) { |
488 | 38.7k | if (literal_indicator_byte_idx_ < 0) { |
489 | | // The literal indicator byte has not been reserved yet, get one now. |
490 | 5.69k | literal_indicator_byte_idx_ = cast_set<int>(bit_writer_.GetByteIndexAndAdvance(1)); |
491 | 5.69k | DCHECK_GE(literal_indicator_byte_idx_, 0); |
492 | 5.69k | } |
493 | | |
494 | | // Write all the buffered values as bit packed literals |
495 | 307k | for (int i = 0; i < num_buffered_values_; ++i) { |
496 | 268k | bit_writer_.PutValue(buffered_values_[i], bit_width_); |
497 | 268k | } |
498 | 38.7k | num_buffered_values_ = 0; |
499 | | |
500 | 38.7k | if (update_indicator_byte) { |
501 | | // At this point we need to write the indicator byte for the literal run. |
502 | | // We only reserve one byte, to allow for streaming writes of literal values. |
503 | | // The logic makes sure we flush literal runs often enough to not overrun |
504 | | // the 1 byte. |
505 | 5.69k | int num_groups = BitUtil::Ceil(literal_count_, 8); |
506 | 5.69k | int32_t indicator_value = (num_groups << 1) | 1; |
507 | 5.69k | DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); |
508 | 5.69k | bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = |
509 | 5.69k | cast_set<uint8_t>(indicator_value); |
510 | 5.69k | literal_indicator_byte_idx_ = -1; |
511 | 5.69k | literal_count_ = 0; |
512 | 5.69k | } |
513 | 38.7k | } _ZN5doris10RleEncoderImE15FlushLiteralRunEb Line | Count | Source | 487 | 12.1k | void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) { | 488 | 12.1k | if (literal_indicator_byte_idx_ < 0) { | 489 | | // The literal indicator byte has not been reserved yet, get one now. | 490 | 1.13k | literal_indicator_byte_idx_ = cast_set<int>(bit_writer_.GetByteIndexAndAdvance(1)); | 491 | 1.13k | DCHECK_GE(literal_indicator_byte_idx_, 0); | 492 | 1.13k | } | 493 | | | 494 | | // Write all the buffered values as bit packed literals | 495 | 101k | for (int i = 0; i < num_buffered_values_; ++i) { | 496 | 89.2k | bit_writer_.PutValue(buffered_values_[i], bit_width_); | 497 | 89.2k | } | 498 | 12.1k | num_buffered_values_ = 0; | 499 | | | 500 | 12.1k | if (update_indicator_byte) { | 501 | | // At this point we need to write the indicator byte for the literal run. | 502 | | // We only reserve one byte, to allow for streaming writes of literal values. | 503 | | // The logic makes sure we flush literal runs often enough to not overrun | 504 | | // the 1 byte. | 505 | 1.13k | int num_groups = BitUtil::Ceil(literal_count_, 8); | 506 | 1.13k | int32_t indicator_value = (num_groups << 1) | 1; | 507 | | DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); | 508 | 1.13k | bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = | 509 | 1.13k | cast_set<uint8_t>(indicator_value); | 510 | 1.13k | literal_indicator_byte_idx_ = -1; | 511 | 1.13k | literal_count_ = 0; | 512 | 1.13k | } | 513 | 12.1k | } |
_ZN5doris10RleEncoderIbE15FlushLiteralRunEb Line | Count | Source | 487 | 26.6k | void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) { | 488 | 26.6k | if (literal_indicator_byte_idx_ < 0) { | 489 | | // The literal indicator byte has not been reserved yet, get one now. | 490 | 4.56k | literal_indicator_byte_idx_ = cast_set<int>(bit_writer_.GetByteIndexAndAdvance(1)); | 491 | 4.56k | DCHECK_GE(literal_indicator_byte_idx_, 0); | 492 | 4.56k | } | 493 | | | 494 | | // Write all the buffered values as bit packed literals | 495 | 205k | for (int i = 0; i < num_buffered_values_; ++i) { | 496 | 179k | bit_writer_.PutValue(buffered_values_[i], bit_width_); | 497 | 179k | } | 498 | 26.6k | num_buffered_values_ = 0; | 499 | | | 500 | 26.6k | if (update_indicator_byte) { | 501 | | // At this point we need to write the indicator byte for the literal run. | 502 | | // We only reserve one byte, to allow for streaming writes of literal values. | 503 | | // The logic makes sure we flush literal runs often enough to not overrun | 504 | | // the 1 byte. | 505 | 4.56k | int num_groups = BitUtil::Ceil(literal_count_, 8); | 506 | 4.56k | int32_t indicator_value = (num_groups << 1) | 1; | 507 | | DCHECK_EQ(indicator_value & 0xFFFFFF00, 0); | 508 | 4.56k | bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = | 509 | 4.56k | cast_set<uint8_t>(indicator_value); | 510 | 4.56k | literal_indicator_byte_idx_ = -1; | 511 | 4.56k | literal_count_ = 0; | 512 | 4.56k | } | 513 | 26.6k | } |
Unexecuted instantiation: _ZN5doris10RleEncoderIhE15FlushLiteralRunEb |
514 | | |
515 | | template <typename T> |
516 | 19.0M | void RleEncoder<T>::FlushRepeatedRun() { |
517 | 19.0M | DCHECK_GT(repeat_count_, 0); |
518 | | // The lsb of 0 indicates this is a repeated run |
519 | 19.0M | int32_t indicator_value = repeat_count_ << 1 | 0; |
520 | 19.0M | bit_writer_.PutVlqInt(indicator_value); |
521 | 19.0M | bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); |
522 | 19.0M | num_buffered_values_ = 0; |
523 | 19.0M | repeat_count_ = 0; |
524 | 19.0M | } _ZN5doris10RleEncoderImE16FlushRepeatedRunEv Line | Count | Source | 516 | 19.0M | void RleEncoder<T>::FlushRepeatedRun() { | 517 | 19.0M | DCHECK_GT(repeat_count_, 0); | 518 | | // The lsb of 0 indicates this is a repeated run | 519 | 19.0M | int32_t indicator_value = repeat_count_ << 1 | 0; | 520 | 19.0M | bit_writer_.PutVlqInt(indicator_value); | 521 | 19.0M | bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); | 522 | 19.0M | num_buffered_values_ = 0; | 523 | 19.0M | repeat_count_ = 0; | 524 | 19.0M | } |
_ZN5doris10RleEncoderIbE16FlushRepeatedRunEv Line | Count | Source | 516 | 24.4k | void RleEncoder<T>::FlushRepeatedRun() { | 517 | 24.4k | DCHECK_GT(repeat_count_, 0); | 518 | | // The lsb of 0 indicates this is a repeated run | 519 | 24.4k | int32_t indicator_value = repeat_count_ << 1 | 0; | 520 | 24.4k | bit_writer_.PutVlqInt(indicator_value); | 521 | 24.4k | bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); | 522 | 24.4k | num_buffered_values_ = 0; | 523 | 24.4k | repeat_count_ = 0; | 524 | 24.4k | } |
_ZN5doris10RleEncoderIhE16FlushRepeatedRunEv Line | Count | Source | 516 | 3 | void RleEncoder<T>::FlushRepeatedRun() { | 517 | 3 | DCHECK_GT(repeat_count_, 0); | 518 | | // The lsb of 0 indicates this is a repeated run | 519 | 3 | int32_t indicator_value = repeat_count_ << 1 | 0; | 520 | 3 | bit_writer_.PutVlqInt(indicator_value); | 521 | 3 | bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8)); | 522 | 3 | num_buffered_values_ = 0; | 523 | 3 | repeat_count_ = 0; | 524 | 3 | } |
|
525 | | |
526 | | // Flush the values that have been buffered. At this point we decide whether |
527 | | // we need to switch between the run types or continue the current one. |
528 | | template <typename T> |
529 | 19.0M | void RleEncoder<T>::FlushBufferedValues(bool done) { |
530 | 19.0M | if (repeat_count_ >= 8) { |
531 | | // Clear the buffered values. They are part of the repeated run now and we |
532 | | // don't want to flush them out as literals. |
533 | 19.0M | num_buffered_values_ = 0; |
534 | 19.0M | if (literal_count_ != 0) { |
535 | | // There was a current literal run. All the values in it have been flushed |
536 | | // but we still need to update the indicator byte. |
537 | 4.76k | DCHECK_EQ(literal_count_ % 8, 0); |
538 | 4.76k | DCHECK_EQ(repeat_count_, 8); |
539 | 4.76k | FlushLiteralRun(true); |
540 | 4.76k | } |
541 | 19.0M | DCHECK_EQ(literal_count_, 0); |
542 | 19.0M | return; |
543 | 19.0M | } |
544 | | |
545 | 33.2k | literal_count_ += num_buffered_values_; |
546 | 33.2k | int num_groups = BitUtil::Ceil(literal_count_, 8); |
547 | 33.2k | if (num_groups + 1 >= (1 << 6)) { |
548 | | // We need to start a new literal run because the indicator byte we've reserved |
549 | | // cannot store more values. |
550 | 170 | DCHECK_GE(literal_indicator_byte_idx_, 0); |
551 | 170 | FlushLiteralRun(true); |
552 | 33.0k | } else { |
553 | 33.0k | FlushLiteralRun(done); |
554 | 33.0k | } |
555 | 33.2k | repeat_count_ = 0; |
556 | 33.2k | } _ZN5doris10RleEncoderImE19FlushBufferedValuesEb Line | Count | Source | 529 | 19.0M | void RleEncoder<T>::FlushBufferedValues(bool done) { | 530 | 19.0M | if (repeat_count_ >= 8) { | 531 | | // Clear the buffered values. They are part of the repeated run now and we | 532 | | // don't want to flush them out as literals. | 533 | 19.0M | num_buffered_values_ = 0; | 534 | 19.0M | if (literal_count_ != 0) { | 535 | | // There was a current literal run. All the values in it have been flushed | 536 | | // but we still need to update the indicator byte. | 537 | 869 | DCHECK_EQ(literal_count_ % 8, 0); | 538 | 869 | DCHECK_EQ(repeat_count_, 8); | 539 | 869 | FlushLiteralRun(true); | 540 | 869 | } | 541 | 19.0M | DCHECK_EQ(literal_count_, 0); | 542 | 19.0M | return; | 543 | 19.0M | } | 544 | | | 545 | 11.1k | literal_count_ += num_buffered_values_; | 546 | 11.1k | int num_groups = BitUtil::Ceil(literal_count_, 8); | 547 | 11.1k | if (num_groups + 1 >= (1 << 6)) { | 548 | | // We need to start a new literal run because the indicator byte we've reserved | 549 | | // cannot store more values. | 550 | 128 | DCHECK_GE(literal_indicator_byte_idx_, 0); | 551 | 128 | FlushLiteralRun(true); | 552 | 10.9k | } else { | 553 | 10.9k | FlushLiteralRun(done); | 554 | 10.9k | } | 555 | 11.1k | repeat_count_ = 0; | 556 | 11.1k | } |
_ZN5doris10RleEncoderIbE19FlushBufferedValuesEb Line | Count | Source | 529 | 53.7k | void RleEncoder<T>::FlushBufferedValues(bool done) { | 530 | 53.7k | if (repeat_count_ >= 8) { | 531 | | // Clear the buffered values. They are part of the repeated run now and we | 532 | | // don't want to flush them out as literals. | 533 | 31.6k | num_buffered_values_ = 0; | 534 | 31.6k | if (literal_count_ != 0) { | 535 | | // There was a current literal run. All the values in it have been flushed | 536 | | // but we still need to update the indicator byte. | 537 | 3.89k | DCHECK_EQ(literal_count_ % 8, 0); | 538 | 3.89k | DCHECK_EQ(repeat_count_, 8); | 539 | 3.89k | FlushLiteralRun(true); | 540 | 3.89k | } | 541 | 31.6k | DCHECK_EQ(literal_count_, 0); | 542 | 31.6k | return; | 543 | 31.6k | } | 544 | | | 545 | 22.1k | literal_count_ += num_buffered_values_; | 546 | 22.1k | int num_groups = BitUtil::Ceil(literal_count_, 8); | 547 | 22.1k | if (num_groups + 1 >= (1 << 6)) { | 548 | | // We need to start a new literal run because the indicator byte we've reserved | 549 | | // cannot store more values. | 550 | 42 | DCHECK_GE(literal_indicator_byte_idx_, 0); | 551 | 42 | FlushLiteralRun(true); | 552 | 22.0k | } else { | 553 | 22.0k | FlushLiteralRun(done); | 554 | 22.0k | } | 555 | 22.1k | repeat_count_ = 0; | 556 | 22.1k | } |
_ZN5doris10RleEncoderIhE19FlushBufferedValuesEb Line | Count | Source | 529 | 3 | void RleEncoder<T>::FlushBufferedValues(bool done) { | 530 | 3 | if (repeat_count_ >= 8) { | 531 | | // Clear the buffered values. They are part of the repeated run now and we | 532 | | // don't want to flush them out as literals. | 533 | 3 | num_buffered_values_ = 0; | 534 | 3 | if (literal_count_ != 0) { | 535 | | // There was a current literal run. All the values in it have been flushed | 536 | | // but we still need to update the indicator byte. | 537 | 0 | DCHECK_EQ(literal_count_ % 8, 0); | 538 | 0 | DCHECK_EQ(repeat_count_, 8); | 539 | 0 | FlushLiteralRun(true); | 540 | 0 | } | 541 | 3 | DCHECK_EQ(literal_count_, 0); | 542 | 3 | return; | 543 | 3 | } | 544 | | | 545 | 0 | literal_count_ += num_buffered_values_; | 546 | 0 | int num_groups = BitUtil::Ceil(literal_count_, 8); | 547 | 0 | if (num_groups + 1 >= (1 << 6)) { | 548 | | // We need to start a new literal run because the indicator byte we've reserved | 549 | | // cannot store more values. | 550 | 0 | DCHECK_GE(literal_indicator_byte_idx_, 0); | 551 | 0 | FlushLiteralRun(true); | 552 | 0 | } else { | 553 | 0 | FlushLiteralRun(done); | 554 | 0 | } | 555 | 0 | repeat_count_ = 0; | 556 | 0 | } |
|
557 | | |
558 | | template <typename T> |
559 | 0 | void RleEncoder<T>::Reserve(int num_bytes, uint8_t val) { |
560 | 0 | for (int i = 0; i < num_bytes; ++i) { |
561 | 0 | bit_writer_.PutValue(val, 8); |
562 | 0 | } |
563 | 0 | } |
564 | | |
565 | | template <typename T> |
566 | 11.0M | int RleEncoder<T>::Flush() { |
567 | 11.0M | if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { |
568 | 11.0M | bool all_repeat = literal_count_ == 0 && |
569 | 11.0M | (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); |
570 | | // There is something pending, figure out if it's a repeated or literal run |
571 | 11.0M | if (repeat_count_ > 0 && all_repeat) { |
572 | 11.0M | FlushRepeatedRun(); |
573 | 11.0M | } else { |
574 | 764 | literal_count_ += num_buffered_values_; |
575 | 764 | FlushLiteralRun(true); |
576 | 764 | repeat_count_ = 0; |
577 | 764 | } |
578 | 11.0M | } |
579 | 11.0M | bit_writer_.Flush(); |
580 | 11.0M | DCHECK_EQ(num_buffered_values_, 0); |
581 | 11.0M | DCHECK_EQ(literal_count_, 0); |
582 | 11.0M | DCHECK_EQ(repeat_count_, 0); |
583 | 11.0M | return bit_writer_.bytes_written(); |
584 | 11.0M | } _ZN5doris10RleEncoderImE5FlushEv Line | Count | Source | 566 | 11.0M | int RleEncoder<T>::Flush() { | 567 | 11.0M | if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { | 568 | 11.0M | bool all_repeat = literal_count_ == 0 && | 569 | 11.0M | (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); | 570 | | // There is something pending, figure out if it's a repeated or literal run | 571 | 11.0M | if (repeat_count_ > 0 && all_repeat) { | 572 | 11.0M | FlushRepeatedRun(); | 573 | 11.0M | } else { | 574 | 139 | literal_count_ += num_buffered_values_; | 575 | 139 | FlushLiteralRun(true); | 576 | 139 | repeat_count_ = 0; | 577 | 139 | } | 578 | 11.0M | } | 579 | 11.0M | bit_writer_.Flush(); | 580 | 11.0M | DCHECK_EQ(num_buffered_values_, 0); | 581 | 11.0M | DCHECK_EQ(literal_count_, 0); | 582 | | DCHECK_EQ(repeat_count_, 0); | 583 | 11.0M | return bit_writer_.bytes_written(); | 584 | 11.0M | } |
_ZN5doris10RleEncoderIbE5FlushEv Line | Count | Source | 566 | 1.03k | int RleEncoder<T>::Flush() { | 567 | 1.03k | if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { | 568 | 1.03k | bool all_repeat = literal_count_ == 0 && | 569 | 1.03k | (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); | 570 | | // There is something pending, figure out if it's a repeated or literal run | 571 | 1.03k | if (repeat_count_ > 0 && all_repeat) { | 572 | 406 | FlushRepeatedRun(); | 573 | 625 | } else { | 574 | 625 | literal_count_ += num_buffered_values_; | 575 | 625 | FlushLiteralRun(true); | 576 | 625 | repeat_count_ = 0; | 577 | 625 | } | 578 | 1.03k | } | 579 | 1.03k | bit_writer_.Flush(); | 580 | 1.03k | DCHECK_EQ(num_buffered_values_, 0); | 581 | 1.03k | DCHECK_EQ(literal_count_, 0); | 582 | | DCHECK_EQ(repeat_count_, 0); | 583 | 1.03k | return bit_writer_.bytes_written(); | 584 | 1.03k | } |
_ZN5doris10RleEncoderIhE5FlushEv Line | Count | Source | 566 | 1 | int RleEncoder<T>::Flush() { | 567 | 1 | if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) { | 568 | 1 | bool all_repeat = literal_count_ == 0 && | 569 | 1 | (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0); | 570 | | // There is something pending, figure out if it's a repeated or literal run | 571 | 1 | if (repeat_count_ > 0 && all_repeat) { | 572 | 1 | FlushRepeatedRun(); | 573 | 1 | } else { | 574 | 0 | literal_count_ += num_buffered_values_; | 575 | 0 | FlushLiteralRun(true); | 576 | 0 | repeat_count_ = 0; | 577 | 0 | } | 578 | 1 | } | 579 | 1 | bit_writer_.Flush(); | 580 | 1 | DCHECK_EQ(num_buffered_values_, 0); | 581 | 1 | DCHECK_EQ(literal_count_, 0); | 582 | | DCHECK_EQ(repeat_count_, 0); | 583 | 1 | return bit_writer_.bytes_written(); | 584 | 1 | } |
|
585 | | |
586 | | template <typename T> |
587 | 11.0M | void RleEncoder<T>::Clear() { |
588 | 11.0M | current_value_ = 0; |
589 | 11.0M | repeat_count_ = 0; |
590 | 11.0M | num_buffered_values_ = 0; |
591 | 11.0M | literal_count_ = 0; |
592 | 11.0M | literal_indicator_byte_idx_ = -1; |
593 | 11.0M | bit_writer_.Clear(); |
594 | 11.0M | } _ZN5doris10RleEncoderImE5ClearEv Line | Count | Source | 587 | 11.0M | void RleEncoder<T>::Clear() { | 588 | 11.0M | current_value_ = 0; | 589 | 11.0M | repeat_count_ = 0; | 590 | 11.0M | num_buffered_values_ = 0; | 591 | 11.0M | literal_count_ = 0; | 592 | 11.0M | literal_indicator_byte_idx_ = -1; | 593 | 11.0M | bit_writer_.Clear(); | 594 | 11.0M | } |
_ZN5doris10RleEncoderIbE5ClearEv Line | Count | Source | 587 | 16.0k | void RleEncoder<T>::Clear() { | 588 | 16.0k | current_value_ = 0; | 589 | 16.0k | repeat_count_ = 0; | 590 | 16.0k | num_buffered_values_ = 0; | 591 | 16.0k | literal_count_ = 0; | 592 | 16.0k | literal_indicator_byte_idx_ = -1; | 593 | 16.0k | bit_writer_.Clear(); | 594 | 16.0k | } |
_ZN5doris10RleEncoderIhE5ClearEv Line | Count | Source | 587 | 1 | void RleEncoder<T>::Clear() { | 588 | 1 | current_value_ = 0; | 589 | 1 | repeat_count_ = 0; | 590 | 1 | num_buffered_values_ = 0; | 591 | 1 | literal_count_ = 0; | 592 | 1 | literal_indicator_byte_idx_ = -1; | 593 | 1 | bit_writer_.Clear(); | 594 | 1 | } |
|
595 | | |
596 | | // Copy from https://github.com/apache/impala/blob/master/be/src/util/rle-encoding.h |
597 | | // Utility classes to do run length encoding (RLE) for fixed bit width values. If runs |
598 | | // are sufficiently long, RLE is used, otherwise, the values are just bit-packed |
599 | | // (literal encoding). |
600 | | // |
601 | | // For both types of runs, there is a byte-aligned indicator which encodes the length |
602 | | // of the run and the type of the run. |
603 | | // |
604 | | // This encoding has the benefit that when there aren't any long enough runs, values |
605 | | // are always decoded at fixed (can be precomputed) bit offsets OR both the value and |
606 | | // the run length are byte aligned. This allows for very efficient decoding |
607 | | // implementations. |
608 | | // The encoding is: |
609 | | // encoded-block := run* |
610 | | // run := literal-run | repeated-run |
611 | | // literal-run := literal-indicator < literal bytes > |
612 | | // repeated-run := repeated-indicator < repeated value. padded to byte boundary > |
613 | | // literal-indicator := varint_encode( number_of_groups << 1 | 1) |
614 | | // repeated-indicator := varint_encode( number_of_repetitions << 1 ) |
615 | | // |
616 | | // Each run is preceded by a varint. The varint's least significant bit is |
617 | | // used to indicate whether the run is a literal run or a repeated run. The rest |
618 | | // of the varint is used to determine the length of the run (eg how many times the |
619 | | // value repeats). |
620 | | // |
621 | | // In the case of literal runs, the run length is always a multiple of 8 (i.e. encode |
622 | | // in groups of 8), so that no matter the bit-width of the value, the sequence will end |
623 | | // on a byte boundary without padding. |
624 | | // Given that we know it is a multiple of 8, we store the number of 8-groups rather than |
625 | | // the actual number of encoded ints. (This means that the total number of encoded values |
626 | | // can not be determined from the encoded data, since the number of values in the last |
627 | | // group may not be a multiple of 8). For the last group of literal runs, we pad |
628 | | // the group to 8 with zeros. This allows for 8 at a time decoding on the read side |
629 | | // without the need for additional checks. |
630 | | // |
631 | | // There is a break-even point when it is more storage efficient to do run length |
632 | | // encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes |
633 | | // for both the repeated encoding or the literal encoding. This value can always |
634 | | // be computed based on the bit-width. |
635 | | // TODO: For 1 bit-width values it can be optimal to use 16 or 24 values, but more |
636 | | // investigation is needed to do this efficiently, see the reverted IMPALA-6658. |
637 | | // TODO: think about how to use this for strings. The bit packing isn't quite the same. |
638 | | // |
639 | | // Examples with bit-width 1 (eg encoding booleans): |
640 | | // ---------------------------------------- |
641 | | // 100 1s followed by 100 0s: |
642 | | // <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte> |
643 | | // - (total 4 bytes) |
644 | | // |
645 | | // alternating 1s and 0s (200 total): |
646 | | // 200 ints = 25 groups of 8 |
647 | | // <varint((25 << 1) | 1)> <25 bytes of values, bitpacked> |
648 | | // (total 26 bytes, 1 byte overhead) |
649 | | |
650 | | // RLE decoder with a batch-oriented interface that enables fast decoding. |
651 | | // Users of this class must first initialize the class to point to a buffer of |
652 | | // RLE-encoded data, passed into the constructor or Reset(). The provided |
653 | | // bit_width must be at most min(sizeof(T) * 8, BatchedBitReader::MAX_BITWIDTH). |
654 | | // Then they can decode data by checking NextNumRepeats()/NextNumLiterals() to |
655 | | // see if the next run is a repeated or literal run, then calling |
656 | | // GetRepeatedValue() or GetLiteralValues() respectively to read the values. |
657 | | // |
658 | | // End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0. |
659 | | // Other decoding errors are signalled by functions returning false. If an |
660 | | // error is encountered then it is not valid to read any more data until |
661 | | // Reset() is called. |
662 | | |
663 | | //bit-packed-run-len and rle-run-len must be in the range [1, 2^31 - 1]. |
664 | | // This means that a Parquet implementation can always store the run length in a signed 32-bit integer. |
665 | | template <typename T> |
666 | | class RleBatchDecoder { |
667 | | public: |
668 | 35 | RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) { |
669 | 35 | Reset(buffer, buffer_len, bit_width); |
670 | 35 | } |
671 | | |
672 | | RleBatchDecoder() = default; |
673 | | |
674 | | // Reset the decoder to read from a new buffer. |
675 | | void Reset(uint8_t* buffer, int buffer_len, int bit_width); |
676 | | |
677 | | // Return the size of the current repeated run. Returns zero if the current run is |
678 | | // a literal run or if no more runs can be read from the input. |
679 | | int32_t NextNumRepeats(); |
680 | | |
681 | | // Get the value of the current repeated run and consume the given number of repeats. |
682 | | // Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot |
683 | | // be greater than the remaining number of repeats in the run. 'num_repeats_to_consume' |
684 | | // can be set to 0 to peek at the value without consuming repeats. |
685 | | T GetRepeatedValue(int32_t num_repeats_to_consume); |
686 | | |
687 | | // Return the size of the current literal run. Returns zero if the current run is |
688 | | // a repeated run or if no more runs can be read from the input. |
689 | | int32_t NextNumLiterals(); |
690 | | |
691 | | // Consume 'num_literals_to_consume' literals from the current literal run, |
692 | | // copying the values to 'values'. 'num_literals_to_consume' must be <= |
693 | | // NextNumLiterals(). Returns true if the requested number of literals were |
694 | | // successfully read or false if an error was encountered, e.g. the input was |
695 | | // truncated. |
696 | | bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT; |
697 | | |
698 | | // Consume 'num_values_to_consume' values and copy them to 'values'. |
699 | | // Returns the number of consumed values or 0 if an error occurred. |
700 | | uint32_t GetBatch(T* values, uint32_t batch_num); |
701 | | |
702 | | private: |
703 | | // Called when both 'literal_count_' and 'repeat_count_' have been exhausted. |
704 | | // Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal |
705 | | // or repeated run, or leaves both at 0 if no more values can be read (either because |
706 | | // the end of the input was reached or an error was encountered decoding). |
707 | | void NextCounts(); |
708 | | |
709 | | /// Fill the literal buffer. Invalid to call if there are already buffered literals. |
710 | | /// Return false if the input was truncated. This does not advance 'literal_count_'. |
711 | | bool FillLiteralBuffer() WARN_UNUSED_RESULT; |
712 | | |
713 | 20 | bool HaveBufferedLiterals() const { return literal_buffer_pos_ < num_buffered_literals_; } |
714 | | |
715 | | /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing |
716 | | /// 'literal_count_'. Returns the number of literals outputted. |
717 | | int32_t OutputBufferedLiterals(int32_t max_to_output, T* values); |
718 | | |
719 | | BatchedBitReader bit_reader_; |
720 | | |
721 | | // Number of bits needed to encode the value. Must be between 0 and 64 after |
722 | | // the decoder is initialized with a buffer. -1 indicates the decoder was not |
723 | | // initialized. |
724 | | int bit_width_ = -1; |
725 | | |
726 | | // If a repeated run, the number of repeats remaining in the current run to be read. |
727 | | // If the current run is a literal run, this is 0. |
728 | | int32_t repeat_count_ = 0; |
729 | | |
730 | | // If a literal run, the number of literals remaining in the current run to be read. |
731 | | // If the current run is a repeated run, this is 0. |
732 | | int32_t literal_count_ = 0; |
733 | | |
734 | | // If a repeated run, the current repeated value. |
735 | | T repeated_value_; |
736 | | |
737 | | // Size of buffer for literal values. Large enough to decode a full batch of 32 |
738 | | // literals. The buffer is needed to allow clients to read in batches that are not |
739 | | // multiples of 32. |
740 | | static constexpr int LITERAL_BUFFER_LEN = 32; |
741 | | |
742 | | // Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' is the |
743 | | // position of the next literal to be read from the buffer. |
744 | | T literal_buffer_[LITERAL_BUFFER_LEN]; |
745 | | int num_buffered_literals_ = 0; |
746 | | int literal_buffer_pos_ = 0; |
747 | | }; |
748 | | |
749 | | template <typename T> |
750 | 20 | int32_t RleBatchDecoder<T>::OutputBufferedLiterals(int32_t max_to_output, T* values) { |
751 | 20 | int32_t num_to_output = |
752 | 20 | std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_); |
753 | 20 | memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * num_to_output); |
754 | 20 | literal_buffer_pos_ += num_to_output; |
755 | 20 | literal_count_ -= num_to_output; |
756 | 20 | return num_to_output; |
757 | 20 | } |
758 | | |
759 | | template <typename T> |
760 | 35 | void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) { |
761 | 35 | bit_reader_.Reset(buffer, buffer_len); |
762 | 35 | bit_width_ = bit_width; |
763 | 35 | repeat_count_ = 0; |
764 | 35 | literal_count_ = 0; |
765 | 35 | num_buffered_literals_ = 0; |
766 | 35 | literal_buffer_pos_ = 0; |
767 | 35 | } |
768 | | |
769 | | template <typename T> |
770 | 56 | int32_t RleBatchDecoder<T>::NextNumRepeats() { |
771 | 56 | if (repeat_count_ > 0) return repeat_count_; |
772 | 54 | if (literal_count_ == 0) NextCounts(); |
773 | 54 | return repeat_count_; |
774 | 56 | } |
775 | | |
776 | | template <typename T> |
777 | 54 | void RleBatchDecoder<T>::NextCounts() { |
778 | | // Read the next run's indicator int, it could be a literal or repeated run. |
779 | | // The int is encoded as a ULEB128-encoded value. |
780 | 54 | uint32_t indicator_value = 0; |
781 | 54 | if (UNLIKELY(!bit_reader_.GetUleb128<uint32_t>(&indicator_value))) { |
782 | 0 | return; |
783 | 0 | } |
784 | | |
785 | | // lsb indicates if it is a literal run or repeated run |
786 | 54 | bool is_literal = indicator_value & 1; |
787 | | |
788 | | // Don't try to handle run lengths that don't fit in an int32_t - just fail gracefully. |
789 | | // The Parquet standard does not allow longer runs - see PARQUET-1290. |
790 | 54 | uint32_t run_len = indicator_value >> 1; |
791 | 54 | if (is_literal) { |
792 | | // Use int64_t to avoid overflowing multiplication. |
793 | 20 | int64_t literal_count = static_cast<int64_t>(run_len) * 8; |
794 | 20 | if (UNLIKELY(literal_count > std::numeric_limits<int32_t>::max())) return; |
795 | 20 | literal_count_ = cast_set<int32_t>(literal_count); |
796 | 34 | } else { |
797 | 34 | if (UNLIKELY(run_len == 0)) return; |
798 | 34 | bool result = bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_); |
799 | 34 | if (UNLIKELY(!result)) return; |
800 | 34 | repeat_count_ = run_len; |
801 | 34 | } |
802 | 54 | } |
803 | | |
804 | | template <typename T> |
805 | 36 | T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) { |
806 | 36 | repeat_count_ -= num_repeats_to_consume; |
807 | 36 | return repeated_value_; |
808 | 36 | } |
809 | | |
810 | | template <typename T> |
811 | 20 | int32_t RleBatchDecoder<T>::NextNumLiterals() { |
812 | 20 | if (literal_count_ > 0) return literal_count_; |
813 | 0 | if (repeat_count_ == 0) NextCounts(); |
814 | 0 | return literal_count_; |
815 | 20 | } |
816 | | |
817 | | template <typename T> |
818 | 20 | bool RleBatchDecoder<T>::GetLiteralValues(int32_t num_literals_to_consume, T* values) { |
819 | 20 | int32_t num_consumed = 0; |
820 | | // Copy any buffered literals left over from previous calls. |
821 | 20 | if (HaveBufferedLiterals()) { |
822 | 0 | num_consumed = OutputBufferedLiterals(num_literals_to_consume, values); |
823 | 0 | } |
824 | | |
825 | 20 | int32_t num_remaining = num_literals_to_consume - num_consumed; |
826 | | // Copy literals directly to the output, bypassing 'literal_buffer_' when possible. |
827 | | // Need to round to a batch of 32 if the caller is consuming only part of the current |
828 | | // run avoid ending on a non-byte boundary. |
829 | 20 | int32_t num_to_bypass = |
830 | 20 | std::min<int32_t>(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32)); |
831 | 20 | if (num_to_bypass > 0) { |
832 | 0 | int num_read = bit_reader_.UnpackBatch(bit_width_, num_to_bypass, values + num_consumed); |
833 | | // If we couldn't read the expected number, that means the input was truncated. |
834 | 0 | if (num_read < num_to_bypass) return false; |
835 | 0 | literal_count_ -= num_to_bypass; |
836 | 0 | num_consumed += num_to_bypass; |
837 | 0 | num_remaining = num_literals_to_consume - num_consumed; |
838 | 0 | } |
839 | | |
840 | 20 | if (num_remaining > 0) { |
841 | | // We weren't able to copy all the literals requested directly from the input. |
842 | | // Buffer literals and copy over the requested number. |
843 | 20 | if (UNLIKELY(!FillLiteralBuffer())) return false; |
844 | 20 | OutputBufferedLiterals(num_remaining, values + num_consumed); |
845 | 20 | } |
846 | 20 | return true; |
847 | 20 | } |
848 | | |
849 | | template <typename T> |
850 | 20 | bool RleBatchDecoder<T>::FillLiteralBuffer() { |
851 | 20 | int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN, literal_count_); |
852 | 20 | num_buffered_literals_ = bit_reader_.UnpackBatch(bit_width_, num_to_buffer, literal_buffer_); |
853 | | // If we couldn't read the expected number, that means the input was truncated. |
854 | 20 | if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false; |
855 | 20 | literal_buffer_pos_ = 0; |
856 | 20 | return true; |
857 | 20 | } |
858 | | |
859 | | template <typename T> |
860 | 37 | uint32_t RleBatchDecoder<T>::GetBatch(T* values, uint32_t batch_num) { |
861 | 37 | uint32_t num_consumed = 0; |
862 | 93 | while (num_consumed < batch_num) { |
863 | | // Add RLE encoded values by repeating the current value this number of times. |
864 | 56 | uint32_t num_repeats = NextNumRepeats(); |
865 | 56 | if (num_repeats > 0) { |
866 | 36 | int32_t num_repeats_to_set = std::min(num_repeats, batch_num - num_consumed); |
867 | 36 | T repeated_value = GetRepeatedValue(num_repeats_to_set); |
868 | 292 | for (int i = 0; i < num_repeats_to_set; ++i) { |
869 | 256 | values[num_consumed + i] = repeated_value; |
870 | 256 | } |
871 | 36 | num_consumed += num_repeats_to_set; |
872 | 36 | continue; |
873 | 36 | } |
874 | | |
875 | | // Add remaining literal values, if any. |
876 | 20 | uint32_t num_literals = NextNumLiterals(); |
877 | 20 | if (num_literals == 0) { |
878 | 0 | break; |
879 | 0 | } |
880 | 20 | uint32_t num_literals_to_set = std::min(num_literals, batch_num - num_consumed); |
881 | 20 | if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) { |
882 | 0 | return 0; |
883 | 0 | } |
884 | 20 | num_consumed += num_literals_to_set; |
885 | 20 | } |
886 | 37 | return num_consumed; |
887 | 37 | } |
888 | | #include "common/compile_check_end.h" |
889 | | } // namespace doris |