Coverage Report

Created: 2024-11-21 23:52

/root/doris/be/src/util/rle_encoding.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#pragma once
18
19
#include <glog/logging.h>
20
21
#include <limits> // IWYU pragma: keep
22
23
#include "gutil/port.h"
24
#include "util/bit_stream_utils.inline.h"
25
#include "util/bit_util.h"
26
27
namespace doris {
28
29
// Utility classes to do run length encoding (RLE) for fixed bit width values.  If runs
30
// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
31
// (literal encoding).
32
// For both types of runs, there is a byte-aligned indicator which encodes the length
33
// of the run and the type of the run.
34
// This encoding has the benefit that when there aren't any long enough runs, values
35
// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
36
// the run length are byte aligned. This allows for very efficient decoding
37
// implementations.
38
// The encoding is:
39
//    encoded-block := run*
40
//    run := literal-run | repeated-run
41
//    literal-run := literal-indicator < literal bytes >
42
//    repeated-run := repeated-indicator < repeated value. padded to byte boundary >
43
//    literal-indicator := varint_encode( number_of_groups << 1 | 1)
44
//    repeated-indicator := varint_encode( number_of_repetitions << 1 )
45
//
46
// Each run is preceded by a varint. The varint's least significant bit is
47
// used to indicate whether the run is a literal run or a repeated run. The rest
48
// of the varint is used to determine the length of the run (eg how many times the
49
// value repeats).
50
//
51
// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
52
// in groups of 8), so that no matter the bit-width of the value, the sequence will end
53
// on a byte boundary without padding.
54
// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
55
// the actual number of encoded ints. (This means that the total number of encoded values
56
// can not be determined from the encoded data, since the number of values in the last
57
// group may not be a multiple of 8).
58
// There is a break-even point when it is more storage efficient to do run length
59
// encoding.  For 1 bit-width values, that point is 8 values.  They require 2 bytes
60
// for both the repeated encoding or the literal encoding.  This value can always
61
// be computed based on the bit-width.
62
// TODO: think about how to use this for strings.  The bit packing isn't quite the same.
63
//
64
// Examples with bit-width 1 (eg encoding booleans):
65
// ----------------------------------------
66
// 100 1s followed by 100 0s:
67
// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
68
//  - (total 4 bytes)
69
//
70
// alternating 1s and 0s (200 total):
71
// 200 ints = 25 groups of 8
72
// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
73
// (total 26 bytes, 1 byte overhead)
74
//
75
76
// Decoder class for RLE encoded data.
77
//
78
// NOTE: the encoded format does not have any length prefix or any other way of
79
// indicating that the encoded sequence ends at a certain point, so the Decoder
80
// methods may return some extra bits at the end before the read methods start
81
// to return 0/false.
82
template <typename T>
83
class RleDecoder {
84
public:
85
    // Create a decoder object. buffer/buffer_len is the decoded data.
86
    // bit_width is the width of each value (before encoding).
87
    RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
88
            : bit_reader_(buffer, buffer_len),
89
              bit_width_(bit_width),
90
              current_value_(0),
91
              repeat_count_(0),
92
              literal_count_(0),
93
842
              rewind_state_(CANT_REWIND) {
94
842
        DCHECK_GE(bit_width_, 1);
95
842
        DCHECK_LE(bit_width_, 64);
96
842
    }
_ZN5doris10RleDecoderImEC2EPKhii
Line
Count
Source
93
394
              rewind_state_(CANT_REWIND) {
94
394
        DCHECK_GE(bit_width_, 1);
95
394
        DCHECK_LE(bit_width_, 64);
96
394
    }
_ZN5doris10RleDecoderIbEC2EPKhii
Line
Count
Source
93
409
              rewind_state_(CANT_REWIND) {
94
409
        DCHECK_GE(bit_width_, 1);
95
409
        DCHECK_LE(bit_width_, 64);
96
409
    }
Unexecuted instantiation: _ZN5doris10RleDecoderIhEC2EPKhii
_ZN5doris10RleDecoderIsEC2EPKhii
Line
Count
Source
93
39
              rewind_state_(CANT_REWIND) {
94
39
        DCHECK_GE(bit_width_, 1);
95
39
        DCHECK_LE(bit_width_, 64);
96
39
    }
97
98
20.9k
    RleDecoder() {}
_ZN5doris10RleDecoderIbEC2Ev
Line
Count
Source
98
20.8k
    RleDecoder() {}
_ZN5doris10RleDecoderIsEC2Ev
Line
Count
Source
98
78
    RleDecoder() {}
Unexecuted instantiation: _ZN5doris10RleDecoderIhEC2Ev
99
100
    // Skip n values, and returns the number of non-zero entries skipped.
101
    size_t Skip(size_t to_skip);
102
103
    // Gets the next value.  Returns false if there are no more.
104
    bool Get(T* val);
105
106
    // Seek to the previous value.
107
    void RewindOne();
108
109
    // Gets the next run of the same 'val'. Returns 0 if there is no
110
    // more data to be decoded. Will return a run of at most 'max_run'
111
    // values. If there are more values than this, the next call to
112
    // GetNextRun will return more from the same run.
113
    size_t GetNextRun(T* val, size_t max_run);
114
115
    size_t get_values(T* values, size_t num_values);
116
117
    // Get the count of current repeated value
118
    size_t repeated_count();
119
120
    // Get current repeated value, make sure that count equals repeated_count()
121
    T get_repeated_value(size_t count);
122
123
0
    const BitReader& bit_reader() const { return bit_reader_; }
124
125
private:
126
    bool ReadHeader();
127
128
    enum RewindState { REWIND_LITERAL, REWIND_RUN, CANT_REWIND };
129
130
    BitReader bit_reader_;
131
    int bit_width_;
132
    uint64_t current_value_;
133
    uint32_t repeat_count_;
134
    uint32_t literal_count_;
135
    RewindState rewind_state_;
136
};
137
138
// Class to incrementally build the rle data.
139
// The encoding has two modes: encoding repeated runs and literal runs.
140
// If the run is sufficiently short, it is more efficient to encode as a literal run.
141
// This class does so by buffering 8 values at a time.  If they are not all the same
142
// they are added to the literal run.  If they are the same, they are added to the
143
// repeated run.  When we switch modes, the previous run is flushed out.
144
template <typename T>
145
class RleEncoder {
146
public:
147
    // buffer: buffer to write bits to.
148
    // bit_width: max number of bits for value.
149
    // TODO: consider adding a min_repeated_run_length so the caller can control
150
    // when values should be encoded as repeated runs.  Currently this is derived
151
    // based on the bit_width, which can determine a storage optimal choice.
152
    explicit RleEncoder(faststring* buffer, int bit_width)
153
1.31k
            : bit_width_(bit_width), bit_writer_(buffer) {
154
1.31k
        DCHECK_GE(bit_width_, 1);
155
1.31k
        DCHECK_LE(bit_width_, 64);
156
1.31k
        Clear();
157
1.31k
    }
_ZN5doris10RleEncoderImEC2EPNS_10faststringEi
Line
Count
Source
153
394
            : bit_width_(bit_width), bit_writer_(buffer) {
154
394
        DCHECK_GE(bit_width_, 1);
155
394
        DCHECK_LE(bit_width_, 64);
156
394
        Clear();
157
394
    }
_ZN5doris10RleEncoderIbEC2EPNS_10faststringEi
Line
Count
Source
153
925
            : bit_width_(bit_width), bit_writer_(buffer) {
154
925
        DCHECK_GE(bit_width_, 1);
155
925
        DCHECK_LE(bit_width_, 64);
156
925
        Clear();
157
925
    }
158
159
    // Reserve 'num_bytes' bytes for a plain encoded header, set each
160
    // byte with 'val': this is used for the RLE-encoded data blocks in
161
    // order to be able to able to store the initial ordinal position
162
    // and number of elements. This is a part of RleEncoder in order to
163
    // maintain the correct offset in 'buffer'.
164
    void Reserve(int num_bytes, uint8_t val);
165
166
    // Encode value. This value must be representable with bit_width_ bits.
167
    void Put(T value, size_t run_length = 1);
168
169
    // Flushes any pending values to the underlying buffer.
170
    // Returns the total number of bytes written
171
    int Flush();
172
173
    // Resets all the state in the encoder.
174
    void Clear();
175
176
302
    int32_t len() const { return bit_writer_.bytes_written(); }
177
178
private:
179
    // Flushes any buffered values.  If this is part of a repeated run, this is largely
180
    // a no-op.
181
    // If it is part of a literal run, this will call FlushLiteralRun, which writes
182
    // out the buffered literal values.
183
    // If 'done' is true, the current run would be written even if it would normally
184
    // have been buffered more.  This should only be called at the end, when the
185
    // encoder has received all values even if it would normally continue to be
186
    // buffered.
187
    void FlushBufferedValues(bool done);
188
189
    // Flushes literal values to the underlying buffer.  If update_indicator_byte,
190
    // then the current literal run is complete and the indicator byte is updated.
191
    void FlushLiteralRun(bool update_indicator_byte);
192
193
    // Flushes a repeated run to the underlying buffer.
194
    void FlushRepeatedRun();
195
196
    // Number of bits needed to encode the value.
197
    const int bit_width_;
198
199
    // Underlying buffer.
200
    BitWriter bit_writer_;
201
202
    // We need to buffer at most 8 values for literals.  This happens when the
203
    // bit_width is 1 (so 8 values fit in one byte).
204
    // TODO: generalize this to other bit widths
205
    uint64_t buffered_values_[8];
206
207
    // Number of values in buffered_values_
208
    int num_buffered_values_;
209
210
    // The current (also last) value that was written and the count of how
211
    // many times in a row that value has been seen.  This is maintained even
212
    // if we are in a literal run.  If the repeat_count_ get high enough, we switch
213
    // to encoding repeated runs.
214
    uint64_t current_value_;
215
    int repeat_count_;
216
217
    // Number of literals in the current run.  This does not include the literals
218
    // that might be in buffered_values_.  Only after we've got a group big enough
219
    // can we decide if they should part of the literal_count_ or repeat_count_
220
    int literal_count_;
221
222
    // Index of a byte in the underlying buffer that stores the indicator byte.
223
    // This is reserved as soon as we need a literal run but the value is written
224
    // when the literal run is complete. We maintain an index rather than a pointer
225
    // into the underlying buffer because the pointer value may become invalid if
226
    // the underlying buffer is resized.
227
    int literal_indicator_byte_idx_;
228
};
229
230
template <typename T>
231
301k
bool RleDecoder<T>::ReadHeader() {
232
301k
    DCHECK(bit_reader_.is_initialized());
233
301k
    if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
234
        // Read the next run's indicator int, it could be a literal or repeated run
235
        // The int is encoded as a vlq-encoded value.
236
23.6k
        uint32_t indicator_value = 0;
237
23.6k
        bool result = bit_reader_.GetVlqInt(&indicator_value);
238
23.6k
        if (PREDICT_FALSE(!result)) {
239
276
            return false;
240
276
        }
241
242
        // lsb indicates if it is a literal run or repeated run
243
23.3k
        bool is_literal = indicator_value & 1;
244
23.3k
        if (is_literal) {
245
1.32k
            literal_count_ = (indicator_value >> 1) * 8;
246
1.32k
            DCHECK_GT(literal_count_, 0);
247
22.0k
        } else {
248
22.0k
            repeat_count_ = indicator_value >> 1;
249
22.0k
            DCHECK_GT(repeat_count_, 0);
250
22.0k
            bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
251
22.0k
                                                    reinterpret_cast<T*>(&current_value_));
252
22.0k
            DCHECK(result);
253
22.0k
        }
254
23.3k
    }
255
301k
    return true;
256
301k
}
_ZN5doris10RleDecoderImE10ReadHeaderEv
Line
Count
Source
231
244k
bool RleDecoder<T>::ReadHeader() {
232
244k
    DCHECK(bit_reader_.is_initialized());
233
244k
    if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
234
        // Read the next run's indicator int, it could be a literal or repeated run
235
        // The int is encoded as a vlq-encoded value.
236
3.03k
        uint32_t indicator_value = 0;
237
3.03k
        bool result = bit_reader_.GetVlqInt(&indicator_value);
238
3.03k
        if (PREDICT_FALSE(!result)) {
239
0
            return false;
240
0
        }
241
242
        // lsb indicates if it is a literal run or repeated run
243
3.03k
        bool is_literal = indicator_value & 1;
244
3.03k
        if (is_literal) {
245
1.12k
            literal_count_ = (indicator_value >> 1) * 8;
246
1.12k
            DCHECK_GT(literal_count_, 0);
247
1.91k
        } else {
248
1.91k
            repeat_count_ = indicator_value >> 1;
249
1.91k
            DCHECK_GT(repeat_count_, 0);
250
1.91k
            bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
251
1.91k
                                                    reinterpret_cast<T*>(&current_value_));
252
1.91k
            DCHECK(result);
253
1.91k
        }
254
3.03k
    }
255
244k
    return true;
256
244k
}
_ZN5doris10RleDecoderIbE10ReadHeaderEv
Line
Count
Source
231
57.2k
bool RleDecoder<T>::ReadHeader() {
232
57.2k
    DCHECK(bit_reader_.is_initialized());
233
57.2k
    if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
234
        // Read the next run's indicator int, it could be a literal or repeated run
235
        // The int is encoded as a vlq-encoded value.
236
20.5k
        uint32_t indicator_value = 0;
237
20.5k
        bool result = bit_reader_.GetVlqInt(&indicator_value);
238
20.5k
        if (PREDICT_FALSE(!result)) {
239
276
            return false;
240
276
        }
241
242
        // lsb indicates if it is a literal run or repeated run
243
20.2k
        bool is_literal = indicator_value & 1;
244
20.2k
        if (is_literal) {
245
201
            literal_count_ = (indicator_value >> 1) * 8;
246
201
            DCHECK_GT(literal_count_, 0);
247
20.0k
        } else {
248
20.0k
            repeat_count_ = indicator_value >> 1;
249
20.0k
            DCHECK_GT(repeat_count_, 0);
250
20.0k
            bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
251
20.0k
                                                    reinterpret_cast<T*>(&current_value_));
252
20.0k
            DCHECK(result);
253
20.0k
        }
254
20.2k
    }
255
56.9k
    return true;
256
57.2k
}
_ZN5doris10RleDecoderIsE10ReadHeaderEv
Line
Count
Source
231
39
bool RleDecoder<T>::ReadHeader() {
232
39
    DCHECK(bit_reader_.is_initialized());
233
39
    if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
234
        // Read the next run's indicator int, it could be a literal or repeated run
235
        // The int is encoded as a vlq-encoded value.
236
39
        uint32_t indicator_value = 0;
237
39
        bool result = bit_reader_.GetVlqInt(&indicator_value);
238
39
        if (PREDICT_FALSE(!result)) {
239
0
            return false;
240
0
        }
241
242
        // lsb indicates if it is a literal run or repeated run
243
39
        bool is_literal = indicator_value & 1;
244
39
        if (is_literal) {
245
1
            literal_count_ = (indicator_value >> 1) * 8;
246
1
            DCHECK_GT(literal_count_, 0);
247
38
        } else {
248
38
            repeat_count_ = indicator_value >> 1;
249
38
            DCHECK_GT(repeat_count_, 0);
250
38
            bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
251
38
                                                    reinterpret_cast<T*>(&current_value_));
252
38
            DCHECK(result);
253
38
        }
254
39
    }
255
39
    return true;
256
39
}
Unexecuted instantiation: _ZN5doris10RleDecoderIhE10ReadHeaderEv
257
258
template <typename T>
259
247k
bool RleDecoder<T>::Get(T* val) {
260
247k
    DCHECK(bit_reader_.is_initialized());
261
247k
    if (PREDICT_FALSE(!ReadHeader())) {
262
0
        return false;
263
0
    }
264
265
247k
    if (PREDICT_TRUE(repeat_count_ > 0)) {
266
158k
        *val = current_value_;
267
158k
        --repeat_count_;
268
158k
        rewind_state_ = REWIND_RUN;
269
158k
    } else {
270
89.0k
        DCHECK(literal_count_ > 0);
271
89.0k
        bool result = bit_reader_.GetValue(bit_width_, val);
272
89.0k
        DCHECK(result);
273
89.0k
        --literal_count_;
274
89.0k
        rewind_state_ = REWIND_LITERAL;
275
89.0k
    }
276
277
247k
    return true;
278
247k
}
_ZN5doris10RleDecoderImE3GetEPm
Line
Count
Source
259
244k
bool RleDecoder<T>::Get(T* val) {
260
244k
    DCHECK(bit_reader_.is_initialized());
261
244k
    if (PREDICT_FALSE(!ReadHeader())) {
262
0
        return false;
263
0
    }
264
265
244k
    if (PREDICT_TRUE(repeat_count_ > 0)) {
266
155k
        *val = current_value_;
267
155k
        --repeat_count_;
268
155k
        rewind_state_ = REWIND_RUN;
269
155k
    } else {
270
88.9k
        DCHECK(literal_count_ > 0);
271
88.9k
        bool result = bit_reader_.GetValue(bit_width_, val);
272
88.9k
        DCHECK(result);
273
88.9k
        --literal_count_;
274
88.9k
        rewind_state_ = REWIND_LITERAL;
275
88.9k
    }
276
277
244k
    return true;
278
244k
}
_ZN5doris10RleDecoderIbE3GetEPb
Line
Count
Source
259
3.17k
bool RleDecoder<T>::Get(T* val) {
260
3.17k
    DCHECK(bit_reader_.is_initialized());
261
3.17k
    if (PREDICT_FALSE(!ReadHeader())) {
262
0
        return false;
263
0
    }
264
265
3.17k
    if (PREDICT_TRUE(repeat_count_ > 0)) {
266
3.08k
        *val = current_value_;
267
3.08k
        --repeat_count_;
268
3.08k
        rewind_state_ = REWIND_RUN;
269
3.08k
    } else {
270
93
        DCHECK(literal_count_ > 0);
271
93
        bool result = bit_reader_.GetValue(bit_width_, val);
272
93
        DCHECK(result);
273
93
        --literal_count_;
274
93
        rewind_state_ = REWIND_LITERAL;
275
93
    }
276
277
3.17k
    return true;
278
3.17k
}
Unexecuted instantiation: _ZN5doris10RleDecoderIsE3GetEPs
279
280
template <typename T>
281
0
void RleDecoder<T>::RewindOne() {
282
0
    DCHECK(bit_reader_.is_initialized());
283
284
0
    switch (rewind_state_) {
285
0
    case CANT_REWIND:
286
0
        LOG(FATAL) << "Can't rewind more than once after each read!";
287
0
        break;
288
0
    case REWIND_RUN:
289
0
        ++repeat_count_;
290
0
        break;
291
0
    case REWIND_LITERAL: {
292
0
        bit_reader_.Rewind(bit_width_);
293
0
        ++literal_count_;
294
0
        break;
295
0
    }
296
0
    }
297
298
0
    rewind_state_ = CANT_REWIND;
299
0
}
300
301
template <typename T>
302
34.0k
size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
303
34.0k
    DCHECK(bit_reader_.is_initialized());
304
34.0k
    DCHECK_GT(max_run, 0);
305
34.0k
    size_t ret = 0;
306
34.0k
    size_t rem = max_run;
307
54.0k
    while (ReadHeader()) {
308
53.7k
        if (PREDICT_TRUE(repeat_count_ > 0)) {
309
43.2k
            if (PREDICT_FALSE(ret > 0 && *val != current_value_)) {
310
19.6k
                return ret;
311
19.6k
            }
312
23.6k
            *val = current_value_;
313
23.6k
            if (repeat_count_ >= rem) {
314
                // The next run is longer than the amount of remaining data
315
                // that the caller wants to read. Only consume it partially.
316
3.81k
                repeat_count_ -= rem;
317
3.81k
                ret += rem;
318
3.81k
                return ret;
319
3.81k
            }
320
19.8k
            ret += repeat_count_;
321
19.8k
            rem -= repeat_count_;
322
19.8k
            repeat_count_ = 0;
323
19.8k
        } else {
324
10.4k
            DCHECK(literal_count_ > 0);
325
10.4k
            if (ret == 0) {
326
10.4k
                bool has_more = bit_reader_.GetValue(bit_width_, val);
327
10.4k
                DCHECK(has_more);
328
10.4k
                literal_count_--;
329
10.4k
                ret++;
330
10.4k
                rem--;
331
10.4k
            }
332
333
41.9k
            while (literal_count_ > 0) {
334
41.7k
                bool result = bit_reader_.GetValue(bit_width_, &current_value_);
335
41.7k
                DCHECK(result);
336
41.7k
                if (current_value_ != *val || rem == 0) {
337
10.3k
                    bit_reader_.Rewind(bit_width_);
338
10.3k
                    return ret;
339
10.3k
                }
340
31.4k
                ret++;
341
31.4k
                rem--;
342
31.4k
                literal_count_--;
343
31.4k
            }
344
10.4k
        }
345
53.7k
    }
346
276
    return ret;
347
34.0k
}
_ZN5doris10RleDecoderIbE10GetNextRunEPbm
Line
Count
Source
302
34.0k
size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
303
34.0k
    DCHECK(bit_reader_.is_initialized());
304
34.0k
    DCHECK_GT(max_run, 0);
305
34.0k
    size_t ret = 0;
306
34.0k
    size_t rem = max_run;
307
54.0k
    while (ReadHeader()) {
308
53.7k
        if (PREDICT_TRUE(repeat_count_ > 0)) {
309
43.2k
            if (PREDICT_FALSE(ret > 0 && *val != current_value_)) {
310
19.6k
                return ret;
311
19.6k
            }
312
23.6k
            *val = current_value_;
313
23.6k
            if (repeat_count_ >= rem) {
314
                // The next run is longer than the amount of remaining data
315
                // that the caller wants to read. Only consume it partially.
316
3.81k
                repeat_count_ -= rem;
317
3.81k
                ret += rem;
318
3.81k
                return ret;
319
3.81k
            }
320
19.8k
            ret += repeat_count_;
321
19.8k
            rem -= repeat_count_;
322
19.8k
            repeat_count_ = 0;
323
19.8k
        } else {
324
10.4k
            DCHECK(literal_count_ > 0);
325
10.4k
            if (ret == 0) {
326
10.4k
                bool has_more = bit_reader_.GetValue(bit_width_, val);
327
10.4k
                DCHECK(has_more);
328
10.4k
                literal_count_--;
329
10.4k
                ret++;
330
10.4k
                rem--;
331
10.4k
            }
332
333
41.9k
            while (literal_count_ > 0) {
334
41.7k
                bool result = bit_reader_.GetValue(bit_width_, &current_value_);
335
41.7k
                DCHECK(result);
336
41.7k
                if (current_value_ != *val || rem == 0) {
337
10.3k
                    bit_reader_.Rewind(bit_width_);
338
10.3k
                    return ret;
339
10.3k
                }
340
31.4k
                ret++;
341
31.4k
                rem--;
342
31.4k
                literal_count_--;
343
31.4k
            }
344
10.4k
        }
345
53.7k
    }
346
276
    return ret;
347
34.0k
}
_ZN5doris10RleDecoderIsE10GetNextRunEPsm
Line
Count
Source
302
7
size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
303
7
    DCHECK(bit_reader_.is_initialized());
304
7
    DCHECK_GT(max_run, 0);
305
7
    size_t ret = 0;
306
7
    size_t rem = max_run;
307
7
    while (ReadHeader()) {
308
7
        if (PREDICT_TRUE(repeat_count_ > 0)) {
309
7
            if (PREDICT_FALSE(ret > 0 && *val != current_value_)) {
310
0
                return ret;
311
0
            }
312
7
            *val = current_value_;
313
7
            if (repeat_count_ >= rem) {
314
                // The next run is longer than the amount of remaining data
315
                // that the caller wants to read. Only consume it partially.
316
7
                repeat_count_ -= rem;
317
7
                ret += rem;
318
7
                return ret;
319
7
            }
320
0
            ret += repeat_count_;
321
0
            rem -= repeat_count_;
322
0
            repeat_count_ = 0;
323
0
        } else {
324
0
            DCHECK(literal_count_ > 0);
325
0
            if (ret == 0) {
326
0
                bool has_more = bit_reader_.GetValue(bit_width_, val);
327
0
                DCHECK(has_more);
328
0
                literal_count_--;
329
0
                ret++;
330
0
                rem--;
331
0
            }
332
333
0
            while (literal_count_ > 0) {
334
0
                bool result = bit_reader_.GetValue(bit_width_, &current_value_);
335
0
                DCHECK(result);
336
0
                if (current_value_ != *val || rem == 0) {
337
0
                    bit_reader_.Rewind(bit_width_);
338
0
                    return ret;
339
0
                }
340
0
                ret++;
341
0
                rem--;
342
0
                literal_count_--;
343
0
            }
344
0
        }
345
7
    }
346
0
    return ret;
347
7
}
348
349
template <typename T>
350
32
size_t RleDecoder<T>::get_values(T* values, size_t num_values) {
351
32
    size_t read_num = 0;
352
96
    while (read_num < num_values) {
353
64
        size_t read_this_time = num_values - read_num;
354
355
64
        if (LIKELY(repeat_count_ > 0)) {
356
31
            read_this_time = std::min((size_t)repeat_count_, read_this_time);
357
31
            std::fill(values, values + read_this_time, current_value_);
358
31
            values += read_this_time;
359
31
            repeat_count_ -= read_this_time;
360
31
            read_num += read_this_time;
361
33
        } else if (literal_count_ > 0) {
362
1
            read_this_time = std::min((size_t)literal_count_, read_this_time);
363
11
            for (int i = 0; i < read_this_time; ++i) {
364
10
                bool result = bit_reader_.GetValue(bit_width_, values);
365
10
                DCHECK(result);
366
10
                values++;
367
10
            }
368
1
            literal_count_ -= read_this_time;
369
1
            read_num += read_this_time;
370
32
        } else {
371
32
            if (!ReadHeader()) {
372
0
                return read_num;
373
0
            }
374
32
        }
375
64
    }
376
32
    return read_num;
377
32
}
Unexecuted instantiation: _ZN5doris10RleDecoderIhE10get_valuesEPhm
_ZN5doris10RleDecoderIsE10get_valuesEPsm
Line
Count
Source
350
32
size_t RleDecoder<T>::get_values(T* values, size_t num_values) {
351
32
    size_t read_num = 0;
352
96
    while (read_num < num_values) {
353
64
        size_t read_this_time = num_values - read_num;
354
355
64
        if (LIKELY(repeat_count_ > 0)) {
356
31
            read_this_time = std::min((size_t)repeat_count_, read_this_time);
357
31
            std::fill(values, values + read_this_time, current_value_);
358
31
            values += read_this_time;
359
31
            repeat_count_ -= read_this_time;
360
31
            read_num += read_this_time;
361
33
        } else if (literal_count_ > 0) {
362
1
            read_this_time = std::min((size_t)literal_count_, read_this_time);
363
11
            for (int i = 0; i < read_this_time; ++i) {
364
10
                bool result = bit_reader_.GetValue(bit_width_, values);
365
10
                DCHECK(result);
366
10
                values++;
367
10
            }
368
1
            literal_count_ -= read_this_time;
369
1
            read_num += read_this_time;
370
32
        } else {
371
32
            if (!ReadHeader()) {
372
0
                return read_num;
373
0
            }
374
32
        }
375
64
    }
376
32
    return read_num;
377
32
}
378
379
template <typename T>
380
size_t RleDecoder<T>::repeated_count() {
381
    if (repeat_count_ > 0) {
382
        return repeat_count_;
383
    }
384
    if (literal_count_ == 0) {
385
        ReadHeader();
386
    }
387
    return repeat_count_;
388
}
389
390
template <typename T>
391
T RleDecoder<T>::get_repeated_value(size_t count) {
392
    DCHECK_GE(repeat_count_, count);
393
    repeat_count_ -= count;
394
    return current_value_;
395
}
396
397
template <typename T>
398
4
size_t RleDecoder<T>::Skip(size_t to_skip) {
399
4
    DCHECK(bit_reader_.is_initialized());
400
401
4
    size_t set_count = 0;
402
14
    while (to_skip > 0) {
403
10
        bool result = ReadHeader();
404
10
        DCHECK(result);
405
406
10
        if (PREDICT_TRUE(repeat_count_ > 0)) {
407
7
            size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip;
408
7
            repeat_count_ -= nskip;
409
7
            to_skip -= nskip;
410
7
            if (current_value_ != 0) {
411
3
                set_count += nskip;
412
3
            }
413
7
        } else {
414
3
            DCHECK(literal_count_ > 0);
415
3
            size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip;
416
3
            literal_count_ -= nskip;
417
3
            to_skip -= nskip;
418
56
            for (; nskip > 0; nskip--) {
419
53
                T value = 0;
420
53
                bool result = bit_reader_.GetValue(bit_width_, &value);
421
53
                DCHECK(result);
422
53
                if (value != 0) {
423
26
                    set_count++;
424
26
                }
425
53
            }
426
3
        }
427
10
    }
428
4
    return set_count;
429
4
}
430
431
// This function buffers input values 8 at a time.  After seeing all 8 values,
432
// it decides whether they should be encoded as a literal or repeated run.
433
template <typename T>
434
729k
void RleEncoder<T>::Put(T value, size_t run_length) {
435
729k
    DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
436
437
    // TODO(perf): remove the loop and use the repeat_count_
438
3.07M
    for (; run_length > 0; run_length--) {
439
2.34M
        if (PREDICT_TRUE(current_value_ == value)) {
440
2.23M
            ++repeat_count_;
441
2.23M
            if (repeat_count_ > 8) {
442
                // This is just a continuation of the current run, no need to buffer the
443
                // values.
444
                // Note that this is the fast path for long repeated runs.
445
2.03M
                continue;
446
2.03M
            }
447
2.23M
        } else {
448
107k
            if (repeat_count_ >= 8) {
449
                // We had a run that was long enough but it has ended.  Flush the
450
                // current repeated run.
451
21.4k
                DCHECK_EQ(literal_count_, 0);
452
21.4k
                FlushRepeatedRun();
453
21.4k
            }
454
107k
            repeat_count_ = 1;
455
107k
            current_value_ = value;
456
107k
        }
457
458
310k
        buffered_values_[num_buffered_values_] = value;
459
310k
        if (++num_buffered_values_ == 8) {
460
38.6k
            DCHECK_EQ(literal_count_ % 8, 0);
461
38.6k
            FlushBufferedValues(false);
462
38.6k
        }
463
310k
    }
464
729k
}
_ZN5doris10RleEncoderImE3PutEmm
Line
Count
Source
434
244k
void RleEncoder<T>::Put(T value, size_t run_length) {
435
244k
    DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
436
437
    // TODO(perf): remove the loop and use the repeat_count_
438
488k
    for (; run_length > 0; run_length--) {
439
244k
        if (PREDICT_TRUE(current_value_ == value)) {
440
167k
            ++repeat_count_;
441
167k
            if (repeat_count_ > 8) {
442
                // This is just a continuation of the current run, no need to buffer the
443
                // values.
444
                // Note that this is the fast path for long repeated runs.
445
140k
                continue;
446
140k
            }
447
167k
        } else {
448
76.9k
            if (repeat_count_ >= 8) {
449
                // We had a run that was long enough but it has ended.  Flush the
450
                // current repeated run.
451
1.65k
                DCHECK_EQ(literal_count_, 0);
452
1.65k
                FlushRepeatedRun();
453
1.65k
            }
454
76.9k
            repeat_count_ = 1;
455
76.9k
            current_value_ = value;
456
76.9k
        }
457
458
103k
        buffered_values_[num_buffered_values_] = value;
459
103k
        if (++num_buffered_values_ == 8) {
460
12.9k
            DCHECK_EQ(literal_count_ % 8, 0);
461
12.9k
            FlushBufferedValues(false);
462
12.9k
        }
463
103k
    }
464
244k
}
_ZN5doris10RleEncoderIbE3PutEbm
Line
Count
Source
434
485k
void RleEncoder<T>::Put(T value, size_t run_length) {
435
485k
    DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
436
437
    // TODO(perf): remove the loop and use the repeat_count_
438
2.58M
    for (; run_length > 0; run_length--) {
439
2.09M
        if (PREDICT_TRUE(current_value_ == value)) {
440
2.06M
            ++repeat_count_;
441
2.06M
            if (repeat_count_ > 8) {
442
                // This is just a continuation of the current run, no need to buffer the
443
                // values.
444
                // Note that this is the fast path for long repeated runs.
445
1.89M
                continue;
446
1.89M
            }
447
2.06M
        } else {
448
30.0k
            if (repeat_count_ >= 8) {
449
                // We had a run that was long enough but it has ended.  Flush the
450
                // current repeated run.
451
19.7k
                DCHECK_EQ(literal_count_, 0);
452
19.7k
                FlushRepeatedRun();
453
19.7k
            }
454
30.0k
            repeat_count_ = 1;
455
30.0k
            current_value_ = value;
456
30.0k
        }
457
458
206k
        buffered_values_[num_buffered_values_] = value;
459
206k
        if (++num_buffered_values_ == 8) {
460
25.7k
            DCHECK_EQ(literal_count_ % 8, 0);
461
25.7k
            FlushBufferedValues(false);
462
25.7k
        }
463
206k
    }
464
485k
}
465
466
template <typename T>
467
17.4k
void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
468
17.4k
    if (literal_indicator_byte_idx_ < 0) {
469
        // The literal indicator byte has not been reserved yet, get one now.
470
1.32k
        literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1);
471
1.32k
        DCHECK_GE(literal_indicator_byte_idx_, 0);
472
1.32k
    }
473
474
    // Write all the buffered values as bit packed literals
475
148k
    for (int i = 0; i < num_buffered_values_; ++i) {
476
130k
        bit_writer_.PutValue(buffered_values_[i], bit_width_);
477
130k
    }
478
17.4k
    num_buffered_values_ = 0;
479
480
17.4k
    if (update_indicator_byte) {
481
        // At this point we need to write the indicator byte for the literal run.
482
        // We only reserve one byte, to allow for streaming writes of literal values.
483
        // The logic makes sure we flush literal runs often enough to not overrun
484
        // the 1 byte.
485
1.32k
        int num_groups = BitUtil::Ceil(literal_count_, 8);
486
1.32k
        int32_t indicator_value = (num_groups << 1) | 1;
487
1.32k
        DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
488
1.32k
        bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value;
489
1.32k
        literal_indicator_byte_idx_ = -1;
490
1.32k
        literal_count_ = 0;
491
1.32k
    }
492
17.4k
}
_ZN5doris10RleEncoderImE15FlushLiteralRunEb
Line
Count
Source
467
12.0k
void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
468
12.0k
    if (literal_indicator_byte_idx_ < 0) {
469
        // The literal indicator byte has not been reserved yet, get one now.
470
1.12k
        literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1);
471
1.12k
        DCHECK_GE(literal_indicator_byte_idx_, 0);
472
1.12k
    }
473
474
    // Write all the buffered values as bit packed literals
475
101k
    for (int i = 0; i < num_buffered_values_; ++i) {
476
88.9k
        bit_writer_.PutValue(buffered_values_[i], bit_width_);
477
88.9k
    }
478
12.0k
    num_buffered_values_ = 0;
479
480
12.0k
    if (update_indicator_byte) {
481
        // At this point we need to write the indicator byte for the literal run.
482
        // We only reserve one byte, to allow for streaming writes of literal values.
483
        // The logic makes sure we flush literal runs often enough to not overrun
484
        // the 1 byte.
485
1.12k
        int num_groups = BitUtil::Ceil(literal_count_, 8);
486
1.12k
        int32_t indicator_value = (num_groups << 1) | 1;
487
1.12k
        DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
488
1.12k
        bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value;
489
1.12k
        literal_indicator_byte_idx_ = -1;
490
1.12k
        literal_count_ = 0;
491
1.12k
    }
492
12.0k
}
_ZN5doris10RleEncoderIbE15FlushLiteralRunEb
Line
Count
Source
467
5.34k
void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
468
5.34k
    if (literal_indicator_byte_idx_ < 0) {
469
        // The literal indicator byte has not been reserved yet, get one now.
470
201
        literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1);
471
201
        DCHECK_GE(literal_indicator_byte_idx_, 0);
472
201
    }
473
474
    // Write all the buffered values as bit packed literals
475
47.0k
    for (int i = 0; i < num_buffered_values_; ++i) {
476
41.7k
        bit_writer_.PutValue(buffered_values_[i], bit_width_);
477
41.7k
    }
478
5.34k
    num_buffered_values_ = 0;
479
480
5.34k
    if (update_indicator_byte) {
481
        // At this point we need to write the indicator byte for the literal run.
482
        // We only reserve one byte, to allow for streaming writes of literal values.
483
        // The logic makes sure we flush literal runs often enough to not overrun
484
        // the 1 byte.
485
201
        int num_groups = BitUtil::Ceil(literal_count_, 8);
486
201
        int32_t indicator_value = (num_groups << 1) | 1;
487
201
        DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
488
201
        bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value;
489
201
        literal_indicator_byte_idx_ = -1;
490
201
        literal_count_ = 0;
491
201
    }
492
5.34k
}
493
494
template <typename T>
495
22.0k
void RleEncoder<T>::FlushRepeatedRun() {
496
22.0k
    DCHECK_GT(repeat_count_, 0);
497
    // The lsb of 0 indicates this is a repeated run
498
22.0k
    int32_t indicator_value = repeat_count_ << 1 | 0;
499
22.0k
    bit_writer_.PutVlqInt(indicator_value);
500
22.0k
    bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
501
22.0k
    num_buffered_values_ = 0;
502
22.0k
    repeat_count_ = 0;
503
22.0k
}
_ZN5doris10RleEncoderImE16FlushRepeatedRunEv
Line
Count
Source
495
1.91k
void RleEncoder<T>::FlushRepeatedRun() {
496
1.91k
    DCHECK_GT(repeat_count_, 0);
497
    // The lsb of 0 indicates this is a repeated run
498
1.91k
    int32_t indicator_value = repeat_count_ << 1 | 0;
499
1.91k
    bit_writer_.PutVlqInt(indicator_value);
500
1.91k
    bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
501
1.91k
    num_buffered_values_ = 0;
502
1.91k
    repeat_count_ = 0;
503
1.91k
}
_ZN5doris10RleEncoderIbE16FlushRepeatedRunEv
Line
Count
Source
495
20.0k
void RleEncoder<T>::FlushRepeatedRun() {
496
20.0k
    DCHECK_GT(repeat_count_, 0);
497
    // The lsb of 0 indicates this is a repeated run
498
20.0k
    int32_t indicator_value = repeat_count_ << 1 | 0;
499
20.0k
    bit_writer_.PutVlqInt(indicator_value);
500
20.0k
    bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
501
20.0k
    num_buffered_values_ = 0;
502
20.0k
    repeat_count_ = 0;
503
20.0k
}
504
505
// Flush the values that have been buffered.  At this point we decide whether
506
// we need to switch between the run types or continue the current one.
507
template <typename T>
508
38.6k
void RleEncoder<T>::FlushBufferedValues(bool done) {
509
38.6k
    if (repeat_count_ >= 8) {
510
        // Clear the buffered values.  They are part of the repeated run now and we
511
        // don't want to flush them out as literals.
512
22.4k
        num_buffered_values_ = 0;
513
22.4k
        if (literal_count_ != 0) {
514
            // There was a current literal run.  All the values in it have been flushed
515
            // but we still need to update the indicator byte.
516
913
            DCHECK_EQ(literal_count_ % 8, 0);
517
913
            DCHECK_EQ(repeat_count_, 8);
518
913
            FlushLiteralRun(true);
519
913
        }
520
22.4k
        DCHECK_EQ(literal_count_, 0);
521
22.4k
        return;
522
22.4k
    }
523
524
16.2k
    literal_count_ += num_buffered_values_;
525
16.2k
    int num_groups = BitUtil::Ceil(literal_count_, 8);
526
16.2k
    if (num_groups + 1 >= (1 << 6)) {
527
        // We need to start a new literal run because the indicator byte we've reserved
528
        // cannot store more values.
529
167
        DCHECK_GE(literal_indicator_byte_idx_, 0);
530
167
        FlushLiteralRun(true);
531
16.0k
    } else {
532
16.0k
        FlushLiteralRun(done);
533
16.0k
    }
534
16.2k
    repeat_count_ = 0;
535
16.2k
}
_ZN5doris10RleEncoderImE19FlushBufferedValuesEb
Line
Count
Source
508
12.9k
void RleEncoder<T>::FlushBufferedValues(bool done) {
509
12.9k
    if (repeat_count_ >= 8) {
510
        // Clear the buffered values.  They are part of the repeated run now and we
511
        // don't want to flush them out as literals.
512
1.85k
        num_buffered_values_ = 0;
513
1.85k
        if (literal_count_ != 0) {
514
            // There was a current literal run.  All the values in it have been flushed
515
            // but we still need to update the indicator byte.
516
858
            DCHECK_EQ(literal_count_ % 8, 0);
517
858
            DCHECK_EQ(repeat_count_, 8);
518
858
            FlushLiteralRun(true);
519
858
        }
520
1.85k
        DCHECK_EQ(literal_count_, 0);
521
1.85k
        return;
522
1.85k
    }
523
524
11.0k
    literal_count_ += num_buffered_values_;
525
11.0k
    int num_groups = BitUtil::Ceil(literal_count_, 8);
526
11.0k
    if (num_groups + 1 >= (1 << 6)) {
527
        // We need to start a new literal run because the indicator byte we've reserved
528
        // cannot store more values.
529
128
        DCHECK_GE(literal_indicator_byte_idx_, 0);
530
128
        FlushLiteralRun(true);
531
10.9k
    } else {
532
10.9k
        FlushLiteralRun(done);
533
10.9k
    }
534
11.0k
    repeat_count_ = 0;
535
11.0k
}
_ZN5doris10RleEncoderIbE19FlushBufferedValuesEb
Line
Count
Source
508
25.7k
void RleEncoder<T>::FlushBufferedValues(bool done) {
509
25.7k
    if (repeat_count_ >= 8) {
510
        // Clear the buffered values.  They are part of the repeated run now and we
511
        // don't want to flush them out as literals.
512
20.5k
        num_buffered_values_ = 0;
513
20.5k
        if (literal_count_ != 0) {
514
            // There was a current literal run.  All the values in it have been flushed
515
            // but we still need to update the indicator byte.
516
55
            DCHECK_EQ(literal_count_ % 8, 0);
517
55
            DCHECK_EQ(repeat_count_, 8);
518
55
            FlushLiteralRun(true);
519
55
        }
520
20.5k
        DCHECK_EQ(literal_count_, 0);
521
20.5k
        return;
522
20.5k
    }
523
524
5.17k
    literal_count_ += num_buffered_values_;
525
5.17k
    int num_groups = BitUtil::Ceil(literal_count_, 8);
526
5.17k
    if (num_groups + 1 >= (1 << 6)) {
527
        // We need to start a new literal run because the indicator byte we've reserved
528
        // cannot store more values.
529
39
        DCHECK_GE(literal_indicator_byte_idx_, 0);
530
39
        FlushLiteralRun(true);
531
5.13k
    } else {
532
5.13k
        FlushLiteralRun(done);
533
5.13k
    }
534
5.17k
    repeat_count_ = 0;
535
5.17k
}
536
537
template <typename T>
538
0
void RleEncoder<T>::Reserve(int num_bytes, uint8_t val) {
539
0
    for (int i = 0; i < num_bytes; ++i) {
540
0
        bit_writer_.PutValue(val, 8);
541
0
    }
542
0
}
543
544
template <typename T>
545
804
int RleEncoder<T>::Flush() {
546
804
    if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
547
802
        bool all_repeat = literal_count_ == 0 &&
548
802
                          (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
549
        // There is something pending, figure out if it's a repeated or literal run
550
802
        if (repeat_count_ > 0 && all_repeat) {
551
561
            FlushRepeatedRun();
552
561
        } else {
553
241
            literal_count_ += num_buffered_values_;
554
241
            FlushLiteralRun(true);
555
241
            repeat_count_ = 0;
556
241
        }
557
802
    }
558
804
    bit_writer_.Flush();
559
804
    DCHECK_EQ(num_buffered_values_, 0);
560
804
    DCHECK_EQ(literal_count_, 0);
561
804
    DCHECK_EQ(repeat_count_, 0);
562
804
    return bit_writer_.bytes_written();
563
804
}
_ZN5doris10RleEncoderImE5FlushEv
Line
Count
Source
545
394
int RleEncoder<T>::Flush() {
546
394
    if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
547
394
        bool all_repeat = literal_count_ == 0 &&
548
394
                          (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
549
        // There is something pending, figure out if it's a repeated or literal run
550
394
        if (repeat_count_ > 0 && all_repeat) {
551
260
            FlushRepeatedRun();
552
260
        } else {
553
134
            literal_count_ += num_buffered_values_;
554
134
            FlushLiteralRun(true);
555
134
            repeat_count_ = 0;
556
134
        }
557
394
    }
558
394
    bit_writer_.Flush();
559
394
    DCHECK_EQ(num_buffered_values_, 0);
560
394
    DCHECK_EQ(literal_count_, 0);
561
394
    DCHECK_EQ(repeat_count_, 0);
562
394
    return bit_writer_.bytes_written();
563
394
}
_ZN5doris10RleEncoderIbE5FlushEv
Line
Count
Source
545
410
int RleEncoder<T>::Flush() {
546
410
    if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
547
408
        bool all_repeat = literal_count_ == 0 &&
548
408
                          (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
549
        // There is something pending, figure out if it's a repeated or literal run
550
408
        if (repeat_count_ > 0 && all_repeat) {
551
301
            FlushRepeatedRun();
552
301
        } else {
553
107
            literal_count_ += num_buffered_values_;
554
107
            FlushLiteralRun(true);
555
107
            repeat_count_ = 0;
556
107
        }
557
408
    }
558
410
    bit_writer_.Flush();
559
410
    DCHECK_EQ(num_buffered_values_, 0);
560
410
    DCHECK_EQ(literal_count_, 0);
561
410
    DCHECK_EQ(repeat_count_, 0);
562
410
    return bit_writer_.bytes_written();
563
410
}
564
565
template <typename T>
566
1.85k
void RleEncoder<T>::Clear() {
567
1.85k
    current_value_ = 0;
568
1.85k
    repeat_count_ = 0;
569
1.85k
    num_buffered_values_ = 0;
570
1.85k
    literal_count_ = 0;
571
1.85k
    literal_indicator_byte_idx_ = -1;
572
1.85k
    bit_writer_.Clear();
573
1.85k
}
_ZN5doris10RleEncoderImE5ClearEv
Line
Count
Source
566
394
void RleEncoder<T>::Clear() {
567
394
    current_value_ = 0;
568
394
    repeat_count_ = 0;
569
394
    num_buffered_values_ = 0;
570
394
    literal_count_ = 0;
571
394
    literal_indicator_byte_idx_ = -1;
572
394
    bit_writer_.Clear();
573
394
}
_ZN5doris10RleEncoderIbE5ClearEv
Line
Count
Source
566
1.45k
void RleEncoder<T>::Clear() {
567
1.45k
    current_value_ = 0;
568
1.45k
    repeat_count_ = 0;
569
1.45k
    num_buffered_values_ = 0;
570
1.45k
    literal_count_ = 0;
571
1.45k
    literal_indicator_byte_idx_ = -1;
572
1.45k
    bit_writer_.Clear();
573
1.45k
}
574
575
// Copy from https://github.com/apache/impala/blob/master/be/src/util/rle-encoding.h
576
// Utility classes to do run length encoding (RLE) for fixed bit width values.  If runs
577
// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
578
// (literal encoding).
579
//
580
// For both types of runs, there is a byte-aligned indicator which encodes the length
581
// of the run and the type of the run.
582
//
583
// This encoding has the benefit that when there aren't any long enough runs, values
584
// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
585
// the run length are byte aligned. This allows for very efficient decoding
586
// implementations.
587
// The encoding is:
588
//    encoded-block := run*
589
//    run := literal-run | repeated-run
590
//    literal-run := literal-indicator < literal bytes >
591
//    repeated-run := repeated-indicator < repeated value. padded to byte boundary >
592
//    literal-indicator := varint_encode( number_of_groups << 1 | 1)
593
//    repeated-indicator := varint_encode( number_of_repetitions << 1 )
594
//
595
// Each run is preceded by a varint. The varint's least significant bit is
596
// used to indicate whether the run is a literal run or a repeated run. The rest
597
// of the varint is used to determine the length of the run (eg how many times the
598
// value repeats).
599
//
600
// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
601
// in groups of 8), so that no matter the bit-width of the value, the sequence will end
602
// on a byte boundary without padding.
603
// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
604
// the actual number of encoded ints. (This means that the total number of encoded values
605
// can not be determined from the encoded data, since the number of values in the last
606
// group may not be a multiple of 8). For the last group of literal runs, we pad
607
// the group to 8 with zeros. This allows for 8 at a time decoding on the read side
608
// without the need for additional checks.
609
//
610
// There is a break-even point when it is more storage efficient to do run length
611
// encoding.  For 1 bit-width values, that point is 8 values.  They require 2 bytes
612
// for both the repeated encoding or the literal encoding.  This value can always
613
// be computed based on the bit-width.
614
// TODO: For 1 bit-width values it can be optimal to use 16 or 24 values, but more
615
// investigation is needed to do this efficiently, see the reverted IMPALA-6658.
616
// TODO: think about how to use this for strings.  The bit packing isn't quite the same.
617
//
618
// Examples with bit-width 1 (eg encoding booleans):
619
// ----------------------------------------
620
// 100 1s followed by 100 0s:
621
// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
622
//  - (total 4 bytes)
623
//
624
// alternating 1s and 0s (200 total):
625
// 200 ints = 25 groups of 8
626
// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
627
// (total 26 bytes, 1 byte overhead)
628
629
// RLE decoder with a batch-oriented interface that enables fast decoding.
630
// Users of this class must first initialize the class to point to a buffer of
631
// RLE-encoded data, passed into the constructor or Reset(). The provided
632
// bit_width must be at most min(sizeof(T) * 8, BatchedBitReader::MAX_BITWIDTH).
633
// Then they can decode data by checking NextNumRepeats()/NextNumLiterals() to
634
// see if the next run is a repeated or literal run, then calling
635
// GetRepeatedValue() or GetLiteralValues() respectively to read the values.
636
//
637
// End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0.
638
// Other decoding errors are signalled by functions returning false. If an
639
// error is encountered then it is not valid to read any more data until
640
// Reset() is called.
641
642
template <typename T>
643
class RleBatchDecoder {
644
public:
645
15
    RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) {
646
15
        Reset(buffer, buffer_len, bit_width);
647
15
    }
648
649
    RleBatchDecoder() = default;
650
651
    // Reset the decoder to read from a new buffer.
652
    void Reset(uint8_t* buffer, int buffer_len, int bit_width);
653
654
    // Return the size of the current repeated run. Returns zero if the current run is
655
    // a literal run or if no more runs can be read from the input.
656
    int32_t NextNumRepeats();
657
658
    // Get the value of the current repeated run and consume the given number of repeats.
659
    // Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot
660
    // be greater than the remaining number of repeats in the run. 'num_repeats_to_consume'
661
    // can be set to 0 to peek at the value without consuming repeats.
662
    T GetRepeatedValue(int32_t num_repeats_to_consume);
663
664
    // Return the size of the current literal run. Returns zero if the current run is
665
    // a repeated run or if no more runs can be read from the input.
666
    int32_t NextNumLiterals();
667
668
    // Consume 'num_literals_to_consume' literals from the current literal run,
669
    // copying the values to 'values'. 'num_literals_to_consume' must be <=
670
    // NextNumLiterals(). Returns true if the requested number of literals were
671
    // successfully read or false if an error was encountered, e.g. the input was
672
    // truncated.
673
    bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT;
674
675
    // Consume 'num_values_to_consume' values and copy them to 'values'.
676
    // Returns the number of consumed values or 0 if an error occurred.
677
    int32_t GetBatch(T* values, int32_t batch_num);
678
679
private:
680
    // Called when both 'literal_count_' and 'repeat_count_' have been exhausted.
681
    // Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal
682
    // or repeated run, or leaves both at 0 if no more values can be read (either because
683
    // the end of the input was reached or an error was encountered decoding).
684
    void NextCounts();
685
686
    /// Fill the literal buffer. Invalid to call if there are already buffered literals.
687
    /// Return false if the input was truncated. This does not advance 'literal_count_'.
688
    bool FillLiteralBuffer() WARN_UNUSED_RESULT;
689
690
0
    bool HaveBufferedLiterals() const { return literal_buffer_pos_ < num_buffered_literals_; }
691
692
    /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing
693
    /// 'literal_count_'. Returns the number of literals outputted.
694
    int32_t OutputBufferedLiterals(int32_t max_to_output, T* values);
695
696
    BatchedBitReader bit_reader_;
697
698
    // Number of bits needed to encode the value. Must be between 0 and 64 after
699
    // the decoder is initialized with a buffer. -1 indicates the decoder was not
700
    // initialized.
701
    int bit_width_ = -1;
702
703
    // If a repeated run, the number of repeats remaining in the current run to be read.
704
    // If the current run is a literal run, this is 0.
705
    int32_t repeat_count_ = 0;
706
707
    // If a literal run, the number of literals remaining in the current run to be read.
708
    // If the current run is a repeated run, this is 0.
709
    int32_t literal_count_ = 0;
710
711
    // If a repeated run, the current repeated value.
712
    T repeated_value_;
713
714
    // Size of buffer for literal values. Large enough to decode a full batch of 32
715
    // literals. The buffer is needed to allow clients to read in batches that are not
716
    // multiples of 32.
717
    static constexpr int LITERAL_BUFFER_LEN = 32;
718
719
    // Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' is the
720
    // position of the next literal to be read from the buffer.
721
    T literal_buffer_[LITERAL_BUFFER_LEN];
722
    int num_buffered_literals_ = 0;
723
    int literal_buffer_pos_ = 0;
724
};
725
726
template <typename T>
727
0
int32_t RleBatchDecoder<T>::OutputBufferedLiterals(int32_t max_to_output, T* values) {
728
0
    int32_t num_to_output =
729
0
            std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_);
730
0
    memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * num_to_output);
731
0
    literal_buffer_pos_ += num_to_output;
732
0
    literal_count_ -= num_to_output;
733
0
    return num_to_output;
734
0
}
735
736
template <typename T>
737
15
void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) {
738
15
    bit_reader_.Reset(buffer, buffer_len);
739
15
    bit_width_ = bit_width;
740
15
    repeat_count_ = 0;
741
15
    literal_count_ = 0;
742
15
    num_buffered_literals_ = 0;
743
15
    literal_buffer_pos_ = 0;
744
15
}
745
746
template <typename T>
747
15
int32_t RleBatchDecoder<T>::NextNumRepeats() {
748
15
    if (repeat_count_ > 0) return repeat_count_;
749
15
    if (literal_count_ == 0) NextCounts();
750
15
    return repeat_count_;
751
15
}
752
753
template <typename T>
754
15
void RleBatchDecoder<T>::NextCounts() {
755
    // Read the next run's indicator int, it could be a literal or repeated run.
756
    // The int is encoded as a ULEB128-encoded value.
757
15
    uint32_t indicator_value = 0;
758
15
    if (UNLIKELY(!bit_reader_.GetUleb128<uint32_t>(&indicator_value))) {
759
0
        return;
760
0
    }
761
762
    // lsb indicates if it is a literal run or repeated run
763
15
    bool is_literal = indicator_value & 1;
764
765
    // Don't try to handle run lengths that don't fit in an int32_t - just fail gracefully.
766
    // The Parquet standard does not allow longer runs - see PARQUET-1290.
767
15
    uint32_t run_len = indicator_value >> 1;
768
15
    if (is_literal) {
769
        // Use int64_t to avoid overflowing multiplication.
770
0
        int64_t literal_count = static_cast<int64_t>(run_len) * 8;
771
0
        if (UNLIKELY(literal_count > std::numeric_limits<int32_t>::max())) return;
772
0
        literal_count_ = literal_count;
773
15
    } else {
774
15
        if (UNLIKELY(run_len == 0)) return;
775
15
        bool result = bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_);
776
15
        if (UNLIKELY(!result)) return;
777
15
        repeat_count_ = run_len;
778
15
    }
779
15
}
780
781
template <typename T>
782
15
T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) {
783
15
    repeat_count_ -= num_repeats_to_consume;
784
15
    return repeated_value_;
785
15
}
786
787
template <typename T>
788
0
int32_t RleBatchDecoder<T>::NextNumLiterals() {
789
0
    if (literal_count_ > 0) return literal_count_;
790
0
    if (repeat_count_ == 0) NextCounts();
791
0
    return literal_count_;
792
0
}
793
794
template <typename T>
795
0
bool RleBatchDecoder<T>::GetLiteralValues(int32_t num_literals_to_consume, T* values) {
796
0
    int32_t num_consumed = 0;
797
    // Copy any buffered literals left over from previous calls.
798
0
    if (HaveBufferedLiterals()) {
799
0
        num_consumed = OutputBufferedLiterals(num_literals_to_consume, values);
800
0
    }
801
802
0
    int32_t num_remaining = num_literals_to_consume - num_consumed;
803
    // Copy literals directly to the output, bypassing 'literal_buffer_' when possible.
804
    // Need to round to a batch of 32 if the caller is consuming only part of the current
805
    // run avoid ending on a non-byte boundary.
806
0
    int32_t num_to_bypass =
807
0
            std::min<int32_t>(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32));
808
0
    if (num_to_bypass > 0) {
809
0
        int num_read = bit_reader_.UnpackBatch(bit_width_, num_to_bypass, values + num_consumed);
810
        // If we couldn't read the expected number, that means the input was truncated.
811
0
        if (num_read < num_to_bypass) return false;
812
0
        literal_count_ -= num_to_bypass;
813
0
        num_consumed += num_to_bypass;
814
0
        num_remaining = num_literals_to_consume - num_consumed;
815
0
    }
816
817
0
    if (num_remaining > 0) {
818
        // We weren't able to copy all the literals requested directly from the input.
819
        // Buffer literals and copy over the requested number.
820
0
        if (UNLIKELY(!FillLiteralBuffer())) return false;
821
0
        OutputBufferedLiterals(num_remaining, values + num_consumed);
822
0
    }
823
0
    return true;
824
0
}
825
826
template <typename T>
827
0
bool RleBatchDecoder<T>::FillLiteralBuffer() {
828
0
    int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN, literal_count_);
829
0
    num_buffered_literals_ = bit_reader_.UnpackBatch(bit_width_, num_to_buffer, literal_buffer_);
830
    // If we couldn't read the expected number, that means the input was truncated.
831
0
    if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false;
832
0
    literal_buffer_pos_ = 0;
833
0
    return true;
834
0
}
835
836
template <typename T>
837
15
int32_t RleBatchDecoder<T>::GetBatch(T* values, int32_t batch_num) {
838
15
    int32_t num_consumed = 0;
839
30
    while (num_consumed < batch_num) {
840
        // Add RLE encoded values by repeating the current value this number of times.
841
15
        int32_t num_repeats = NextNumRepeats();
842
15
        if (num_repeats > 0) {
843
15
            int32_t num_repeats_to_set = std::min(num_repeats, batch_num - num_consumed);
844
15
            T repeated_value = GetRepeatedValue(num_repeats_to_set);
845
195
            for (int i = 0; i < num_repeats_to_set; ++i) {
846
180
                values[num_consumed + i] = repeated_value;
847
180
            }
848
15
            num_consumed += num_repeats_to_set;
849
15
            continue;
850
15
        }
851
852
        // Add remaining literal values, if any.
853
0
        int32_t num_literals = NextNumLiterals();
854
0
        if (num_literals == 0) {
855
0
            break;
856
0
        }
857
0
        int32_t num_literals_to_set = std::min(num_literals, batch_num - num_consumed);
858
0
        if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) {
859
0
            return 0;
860
0
        }
861
0
        num_consumed += num_literals_to_set;
862
0
    }
863
15
    return num_consumed;
864
15
}
865
866
} // namespace doris