Coverage Report

Created: 2025-05-21 13:30

/root/doris/be/src/util/rle_encoding.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#pragma once
18
19
#include <glog/logging.h>
20
21
#include <limits> // IWYU pragma: keep
22
23
#include "util/bit_stream_utils.inline.h"
24
#include "util/bit_util.h"
25
26
namespace doris {
27
28
// Utility classes to do run length encoding (RLE) for fixed bit width values.  If runs
29
// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
30
// (literal encoding).
31
// For both types of runs, there is a byte-aligned indicator which encodes the length
32
// of the run and the type of the run.
33
// This encoding has the benefit that when there aren't any long enough runs, values
34
// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
35
// the run length are byte aligned. This allows for very efficient decoding
36
// implementations.
37
// The encoding is:
38
//    encoded-block := run*
39
//    run := literal-run | repeated-run
40
//    literal-run := literal-indicator < literal bytes >
41
//    repeated-run := repeated-indicator < repeated value. padded to byte boundary >
42
//    literal-indicator := varint_encode( number_of_groups << 1 | 1)
43
//    repeated-indicator := varint_encode( number_of_repetitions << 1 )
44
//
45
// Each run is preceded by a varint. The varint's least significant bit is
46
// used to indicate whether the run is a literal run or a repeated run. The rest
47
// of the varint is used to determine the length of the run (eg how many times the
48
// value repeats).
49
//
50
// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
51
// in groups of 8), so that no matter the bit-width of the value, the sequence will end
52
// on a byte boundary without padding.
53
// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
54
// the actual number of encoded ints. (This means that the total number of encoded values
55
// can not be determined from the encoded data, since the number of values in the last
56
// group may not be a multiple of 8).
57
// There is a break-even point when it is more storage efficient to do run length
58
// encoding.  For 1 bit-width values, that point is 8 values.  They require 2 bytes
59
// for both the repeated encoding or the literal encoding.  This value can always
60
// be computed based on the bit-width.
61
// TODO: think about how to use this for strings.  The bit packing isn't quite the same.
62
//
63
// Examples with bit-width 1 (eg encoding booleans):
64
// ----------------------------------------
65
// 100 1s followed by 100 0s:
66
// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
67
//  - (total 4 bytes)
68
//
69
// alternating 1s and 0s (200 total):
70
// 200 ints = 25 groups of 8
71
// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
72
// (total 26 bytes, 1 byte overhead)
73
//
74
75
// Decoder class for RLE encoded data.
76
//
77
// NOTE: the encoded format does not have any length prefix or any other way of
78
// indicating that the encoded sequence ends at a certain point, so the Decoder
79
// methods may return some extra bits at the end before the read methods start
80
// to return 0/false.
81
template <typename T>
82
class RleDecoder {
83
public:
84
    // Create a decoder object. buffer/buffer_len is the decoded data.
85
    // bit_width is the width of each value (before encoding).
86
    RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
87
            : bit_reader_(buffer, buffer_len),
88
              bit_width_(bit_width),
89
              current_value_(0),
90
              repeat_count_(0),
91
              literal_count_(0),
92
853
              rewind_state_(CANT_REWIND) {
93
853
        DCHECK_GE(bit_width_, 1);
94
853
        DCHECK_LE(bit_width_, 64);
95
853
    }
_ZN5doris10RleDecoderImEC2EPKhii
Line
Count
Source
92
394
              rewind_state_(CANT_REWIND) {
93
394
        DCHECK_GE(bit_width_, 1);
94
394
        DCHECK_LE(bit_width_, 64);
95
394
    }
_ZN5doris10RleDecoderIbEC2EPKhii
Line
Count
Source
92
409
              rewind_state_(CANT_REWIND) {
93
409
        DCHECK_GE(bit_width_, 1);
94
409
        DCHECK_LE(bit_width_, 64);
95
409
    }
_ZN5doris10RleDecoderIhEC2EPKhii
Line
Count
Source
92
5
              rewind_state_(CANT_REWIND) {
93
5
        DCHECK_GE(bit_width_, 1);
94
5
        DCHECK_LE(bit_width_, 64);
95
5
    }
_ZN5doris10RleDecoderIsEC2EPKhii
Line
Count
Source
92
45
              rewind_state_(CANT_REWIND) {
93
45
        DCHECK_GE(bit_width_, 1);
94
45
        DCHECK_LE(bit_width_, 64);
95
45
    }
96
97
25.6k
    RleDecoder() {}
_ZN5doris10RleDecoderIbEC2Ev
Line
Count
Source
97
25.5k
    RleDecoder() {}
_ZN5doris10RleDecoderIhEC2Ev
Line
Count
Source
97
6
    RleDecoder() {}
_ZN5doris10RleDecoderIsEC2Ev
Line
Count
Source
97
87
    RleDecoder() {}
98
99
    // Skip n values, and returns the number of non-zero entries skipped.
100
    size_t Skip(size_t to_skip);
101
102
    // Gets the next value.  Returns false if there are no more.
103
    bool Get(T* val);
104
105
    // Seek to the previous value.
106
    void RewindOne();
107
108
    // Gets the next run of the same 'val'. Returns 0 if there is no
109
    // more data to be decoded. Will return a run of at most 'max_run'
110
    // values. If there are more values than this, the next call to
111
    // GetNextRun will return more from the same run.
112
    size_t GetNextRun(T* val, size_t max_run);
113
114
    size_t get_values(T* values, size_t num_values);
115
116
    // Get the count of current repeated value
117
    size_t repeated_count();
118
119
    // Get current repeated value, make sure that count equals repeated_count()
120
    T get_repeated_value(size_t count);
121
122
0
    const BitReader& bit_reader() const { return bit_reader_; }
123
124
private:
125
    bool ReadHeader();
126
127
    enum RewindState { REWIND_LITERAL, REWIND_RUN, CANT_REWIND };
128
129
    BitReader bit_reader_;
130
    int bit_width_;
131
    uint64_t current_value_;
132
    uint32_t repeat_count_;
133
    uint32_t literal_count_;
134
    RewindState rewind_state_;
135
};
136
137
// Class to incrementally build the rle data.
138
// The encoding has two modes: encoding repeated runs and literal runs.
139
// If the run is sufficiently short, it is more efficient to encode as a literal run.
140
// This class does so by buffering 8 values at a time.  If they are not all the same
141
// they are added to the literal run.  If they are the same, they are added to the
142
// repeated run.  When we switch modes, the previous run is flushed out.
143
template <typename T>
144
class RleEncoder {
145
public:
146
    // buffer: buffer to write bits to.
147
    // bit_width: max number of bits for value.
148
    // TODO: consider adding a min_repeated_run_length so the caller can control
149
    // when values should be encoded as repeated runs.  Currently this is derived
150
    // based on the bit_width, which can determine a storage optimal choice.
151
    explicit RleEncoder(faststring* buffer, int bit_width)
152
3.66k
            : bit_width_(bit_width), bit_writer_(buffer) {
153
3.66k
        DCHECK_GE(bit_width_, 1);
154
3.66k
        DCHECK_LE(bit_width_, 64);
155
3.66k
        Clear();
156
3.66k
    }
_ZN5doris10RleEncoderImEC2EPNS_10faststringEi
Line
Count
Source
152
394
            : bit_width_(bit_width), bit_writer_(buffer) {
153
394
        DCHECK_GE(bit_width_, 1);
154
394
        DCHECK_LE(bit_width_, 64);
155
394
        Clear();
156
394
    }
_ZN5doris10RleEncoderIbEC2EPNS_10faststringEi
Line
Count
Source
152
3.27k
            : bit_width_(bit_width), bit_writer_(buffer) {
153
3.27k
        DCHECK_GE(bit_width_, 1);
154
3.27k
        DCHECK_LE(bit_width_, 64);
155
3.27k
        Clear();
156
3.27k
    }
157
158
    // Reserve 'num_bytes' bytes for a plain encoded header, set each
159
    // byte with 'val': this is used for the RLE-encoded data blocks in
160
    // order to be able to able to store the initial ordinal position
161
    // and number of elements. This is a part of RleEncoder in order to
162
    // maintain the correct offset in 'buffer'.
163
    void Reserve(int num_bytes, uint8_t val);
164
165
    // Encode value. This value must be representable with bit_width_ bits.
166
    void Put(T value, size_t run_length = 1);
167
168
    // Flushes any pending values to the underlying buffer.
169
    // Returns the total number of bytes written
170
    int Flush();
171
172
    // Resets all the state in the encoder.
173
    void Clear();
174
175
302
    int32_t len() const { return bit_writer_.bytes_written(); }
176
177
private:
178
    // Flushes any buffered values.  If this is part of a repeated run, this is largely
179
    // a no-op.
180
    // If it is part of a literal run, this will call FlushLiteralRun, which writes
181
    // out the buffered literal values.
182
    // If 'done' is true, the current run would be written even if it would normally
183
    // have been buffered more.  This should only be called at the end, when the
184
    // encoder has received all values even if it would normally continue to be
185
    // buffered.
186
    void FlushBufferedValues(bool done);
187
188
    // Flushes literal values to the underlying buffer.  If update_indicator_byte,
189
    // then the current literal run is complete and the indicator byte is updated.
190
    void FlushLiteralRun(bool update_indicator_byte);
191
192
    // Flushes a repeated run to the underlying buffer.
193
    void FlushRepeatedRun();
194
195
    // Number of bits needed to encode the value.
196
    const int bit_width_;
197
198
    // Underlying buffer.
199
    BitWriter bit_writer_;
200
201
    // We need to buffer at most 8 values for literals.  This happens when the
202
    // bit_width is 1 (so 8 values fit in one byte).
203
    // TODO: generalize this to other bit widths
204
    uint64_t buffered_values_[8];
205
206
    // Number of values in buffered_values_
207
    int num_buffered_values_;
208
209
    // The current (also last) value that was written and the count of how
210
    // many times in a row that value has been seen.  This is maintained even
211
    // if we are in a literal run.  If the repeat_count_ get high enough, we switch
212
    // to encoding repeated runs.
213
    uint64_t current_value_;
214
    int repeat_count_;
215
216
    // Number of literals in the current run.  This does not include the literals
217
    // that might be in buffered_values_.  Only after we've got a group big enough
218
    // can we decide if they should part of the literal_count_ or repeat_count_
219
    int literal_count_;
220
221
    // Index of a byte in the underlying buffer that stores the indicator byte.
222
    // This is reserved as soon as we need a literal run but the value is written
223
    // when the literal run is complete. We maintain an index rather than a pointer
224
    // into the underlying buffer because the pointer value may become invalid if
225
    // the underlying buffer is resized.
226
    int literal_indicator_byte_idx_;
227
};
228
229
template <typename T>
230
307k
bool RleDecoder<T>::ReadHeader() {
231
307k
    DCHECK(bit_reader_.is_initialized());
232
307k
    if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
233
        // Read the next run's indicator int, it could be a literal or repeated run
234
        // The int is encoded as a vlq-encoded value.
235
23.6k
        uint32_t indicator_value = 0;
236
23.6k
        bool result = bit_reader_.GetVlqInt(&indicator_value);
237
23.6k
        if (PREDICT_FALSE(!result)) {
238
276
            return false;
239
276
        }
240
241
        // lsb indicates if it is a literal run or repeated run
242
23.3k
        bool is_literal = indicator_value & 1;
243
23.3k
        if (is_literal) {
244
1.33k
            literal_count_ = (indicator_value >> 1) * 8;
245
1.33k
            DCHECK_GT(literal_count_, 0);
246
22.0k
        } else {
247
22.0k
            repeat_count_ = indicator_value >> 1;
248
22.0k
            DCHECK_GT(repeat_count_, 0);
249
22.0k
            bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
250
22.0k
                                                    reinterpret_cast<T*>(&current_value_));
251
22.0k
            DCHECK(result);
252
22.0k
        }
253
23.3k
    }
254
306k
    return true;
255
307k
}
_ZN5doris10RleDecoderImE10ReadHeaderEv
Line
Count
Source
230
244k
bool RleDecoder<T>::ReadHeader() {
231
244k
    DCHECK(bit_reader_.is_initialized());
232
244k
    if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
233
        // Read the next run's indicator int, it could be a literal or repeated run
234
        // The int is encoded as a vlq-encoded value.
235
3.03k
        uint32_t indicator_value = 0;
236
3.03k
        bool result = bit_reader_.GetVlqInt(&indicator_value);
237
3.03k
        if (PREDICT_FALSE(!result)) {
238
0
            return false;
239
0
        }
240
241
        // lsb indicates if it is a literal run or repeated run
242
3.03k
        bool is_literal = indicator_value & 1;
243
3.03k
        if (is_literal) {
244
1.12k
            literal_count_ = (indicator_value >> 1) * 8;
245
1.12k
            DCHECK_GT(literal_count_, 0);
246
1.91k
        } else {
247
1.91k
            repeat_count_ = indicator_value >> 1;
248
1.91k
            DCHECK_GT(repeat_count_, 0);
249
1.91k
            bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
250
1.91k
                                                    reinterpret_cast<T*>(&current_value_));
251
1.91k
            DCHECK(result);
252
1.91k
        }
253
3.03k
    }
254
244k
    return true;
255
244k
}
_ZN5doris10RleDecoderIbE10ReadHeaderEv
Line
Count
Source
230
62.9k
bool RleDecoder<T>::ReadHeader() {
231
62.9k
    DCHECK(bit_reader_.is_initialized());
232
62.9k
    if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
233
        // Read the next run's indicator int, it could be a literal or repeated run
234
        // The int is encoded as a vlq-encoded value.
235
20.5k
        uint32_t indicator_value = 0;
236
20.5k
        bool result = bit_reader_.GetVlqInt(&indicator_value);
237
20.5k
        if (PREDICT_FALSE(!result)) {
238
276
            return false;
239
276
        }
240
241
        // lsb indicates if it is a literal run or repeated run
242
20.2k
        bool is_literal = indicator_value & 1;
243
20.2k
        if (is_literal) {
244
203
            literal_count_ = (indicator_value >> 1) * 8;
245
203
            DCHECK_GT(literal_count_, 0);
246
20.0k
        } else {
247
20.0k
            repeat_count_ = indicator_value >> 1;
248
20.0k
            DCHECK_GT(repeat_count_, 0);
249
20.0k
            bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
250
20.0k
                                                    reinterpret_cast<T*>(&current_value_));
251
20.0k
            DCHECK(result);
252
20.0k
        }
253
20.2k
    }
254
62.6k
    return true;
255
62.9k
}
_ZN5doris10RleDecoderIsE10ReadHeaderEv
Line
Count
Source
230
49
bool RleDecoder<T>::ReadHeader() {
231
49
    DCHECK(bit_reader_.is_initialized());
232
49
    if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
233
        // Read the next run's indicator int, it could be a literal or repeated run
234
        // The int is encoded as a vlq-encoded value.
235
45
        uint32_t indicator_value = 0;
236
45
        bool result = bit_reader_.GetVlqInt(&indicator_value);
237
45
        if (PREDICT_FALSE(!result)) {
238
0
            return false;
239
0
        }
240
241
        // lsb indicates if it is a literal run or repeated run
242
45
        bool is_literal = indicator_value & 1;
243
45
        if (is_literal) {
244
3
            literal_count_ = (indicator_value >> 1) * 8;
245
3
            DCHECK_GT(literal_count_, 0);
246
42
        } else {
247
42
            repeat_count_ = indicator_value >> 1;
248
42
            DCHECK_GT(repeat_count_, 0);
249
42
            bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
250
42
                                                    reinterpret_cast<T*>(&current_value_));
251
42
            DCHECK(result);
252
42
        }
253
45
    }
254
49
    return true;
255
49
}
_ZN5doris10RleDecoderIhE10ReadHeaderEv
Line
Count
Source
230
5
bool RleDecoder<T>::ReadHeader() {
231
5
    DCHECK(bit_reader_.is_initialized());
232
5
    if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
233
        // Read the next run's indicator int, it could be a literal or repeated run
234
        // The int is encoded as a vlq-encoded value.
235
5
        uint32_t indicator_value = 0;
236
5
        bool result = bit_reader_.GetVlqInt(&indicator_value);
237
5
        if (PREDICT_FALSE(!result)) {
238
0
            return false;
239
0
        }
240
241
        // lsb indicates if it is a literal run or repeated run
242
5
        bool is_literal = indicator_value & 1;
243
5
        if (is_literal) {
244
5
            literal_count_ = (indicator_value >> 1) * 8;
245
5
            DCHECK_GT(literal_count_, 0);
246
5
        } else {
247
0
            repeat_count_ = indicator_value >> 1;
248
0
            DCHECK_GT(repeat_count_, 0);
249
0
            bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
250
0
                                                    reinterpret_cast<T*>(&current_value_));
251
0
            DCHECK(result);
252
0
        }
253
5
    }
254
5
    return true;
255
5
}
256
257
template <typename T>
258
247k
bool RleDecoder<T>::Get(T* val) {
259
247k
    DCHECK(bit_reader_.is_initialized());
260
247k
    if (PREDICT_FALSE(!ReadHeader())) {
261
0
        return false;
262
0
    }
263
264
247k
    if (PREDICT_TRUE(repeat_count_ > 0)) {
265
158k
        *val = current_value_;
266
158k
        --repeat_count_;
267
158k
        rewind_state_ = REWIND_RUN;
268
158k
    } else {
269
89.0k
        DCHECK(literal_count_ > 0);
270
89.0k
        bool result = bit_reader_.GetValue(bit_width_, val);
271
89.0k
        DCHECK(result);
272
89.0k
        --literal_count_;
273
89.0k
        rewind_state_ = REWIND_LITERAL;
274
89.0k
    }
275
276
247k
    return true;
277
247k
}
_ZN5doris10RleDecoderImE3GetEPm
Line
Count
Source
258
244k
bool RleDecoder<T>::Get(T* val) {
259
244k
    DCHECK(bit_reader_.is_initialized());
260
244k
    if (PREDICT_FALSE(!ReadHeader())) {
261
0
        return false;
262
0
    }
263
264
244k
    if (PREDICT_TRUE(repeat_count_ > 0)) {
265
155k
        *val = current_value_;
266
155k
        --repeat_count_;
267
155k
        rewind_state_ = REWIND_RUN;
268
155k
    } else {
269
88.9k
        DCHECK(literal_count_ > 0);
270
88.9k
        bool result = bit_reader_.GetValue(bit_width_, val);
271
88.9k
        DCHECK(result);
272
88.9k
        --literal_count_;
273
88.9k
        rewind_state_ = REWIND_LITERAL;
274
88.9k
    }
275
276
244k
    return true;
277
244k
}
_ZN5doris10RleDecoderIbE3GetEPb
Line
Count
Source
258
3.17k
bool RleDecoder<T>::Get(T* val) {
259
3.17k
    DCHECK(bit_reader_.is_initialized());
260
3.17k
    if (PREDICT_FALSE(!ReadHeader())) {
261
0
        return false;
262
0
    }
263
264
3.17k
    if (PREDICT_TRUE(repeat_count_ > 0)) {
265
3.08k
        *val = current_value_;
266
3.08k
        --repeat_count_;
267
3.08k
        rewind_state_ = REWIND_RUN;
268
3.08k
    } else {
269
93
        DCHECK(literal_count_ > 0);
270
93
        bool result = bit_reader_.GetValue(bit_width_, val);
271
93
        DCHECK(result);
272
93
        --literal_count_;
273
93
        rewind_state_ = REWIND_LITERAL;
274
93
    }
275
276
3.17k
    return true;
277
3.17k
}
_ZN5doris10RleDecoderIsE3GetEPs
Line
Count
Source
258
6
bool RleDecoder<T>::Get(T* val) {
259
6
    DCHECK(bit_reader_.is_initialized());
260
6
    if (PREDICT_FALSE(!ReadHeader())) {
261
0
        return false;
262
0
    }
263
264
6
    if (PREDICT_TRUE(repeat_count_ > 0)) {
265
6
        *val = current_value_;
266
6
        --repeat_count_;
267
6
        rewind_state_ = REWIND_RUN;
268
6
    } else {
269
0
        DCHECK(literal_count_ > 0);
270
0
        bool result = bit_reader_.GetValue(bit_width_, val);
271
0
        DCHECK(result);
272
0
        --literal_count_;
273
0
        rewind_state_ = REWIND_LITERAL;
274
0
    }
275
276
6
    return true;
277
6
}
278
279
template <typename T>
280
2
void RleDecoder<T>::RewindOne() {
281
2
    DCHECK(bit_reader_.is_initialized());
282
283
2
    switch (rewind_state_) {
284
0
    case CANT_REWIND:
285
0
        throw Exception(Status::FatalError("Can't rewind more than once after each read!"));
286
0
        break;
287
2
    case REWIND_RUN:
288
2
        ++repeat_count_;
289
2
        break;
290
0
    case REWIND_LITERAL: {
291
0
        bit_reader_.Rewind(bit_width_);
292
0
        ++literal_count_;
293
0
        break;
294
0
    }
295
2
    }
296
297
2
    rewind_state_ = CANT_REWIND;
298
2
}
299
300
template <typename T>
301
39.8k
size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
302
39.8k
    DCHECK(bit_reader_.is_initialized());
303
39.8k
    DCHECK_GT(max_run, 0);
304
39.8k
    size_t ret = 0;
305
39.8k
    size_t rem = max_run;
306
59.7k
    while (ReadHeader()) {
307
59.5k
        if (PREDICT_TRUE(repeat_count_ > 0)) {
308
48.9k
            if (PREDICT_FALSE(ret > 0 && *val != current_value_)) {
309
19.5k
                return ret;
310
19.5k
            }
311
29.4k
            *val = current_value_;
312
29.4k
            if (repeat_count_ >= rem) {
313
                // The next run is longer than the amount of remaining data
314
                // that the caller wants to read. Only consume it partially.
315
9.67k
                repeat_count_ -= rem;
316
9.67k
                ret += rem;
317
9.67k
                return ret;
318
9.67k
            }
319
19.7k
            ret += repeat_count_;
320
19.7k
            rem -= repeat_count_;
321
19.7k
            repeat_count_ = 0;
322
19.7k
        } else {
323
10.5k
            DCHECK(literal_count_ > 0);
324
10.5k
            if (ret == 0) {
325
10.4k
                bool has_more = bit_reader_.GetValue(bit_width_, val);
326
10.4k
                DCHECK(has_more);
327
10.4k
                literal_count_--;
328
10.4k
                ret++;
329
10.4k
                rem--;
330
10.4k
            }
331
332
41.9k
            while (literal_count_ > 0) {
333
41.7k
                bool result = bit_reader_.GetValue(bit_width_, &current_value_);
334
41.7k
                DCHECK(result);
335
41.7k
                if (current_value_ != *val || rem == 0) {
336
10.3k
                    bit_reader_.Rewind(bit_width_);
337
10.3k
                    return ret;
338
10.3k
                }
339
31.4k
                ret++;
340
31.4k
                rem--;
341
31.4k
                literal_count_--;
342
31.4k
            }
343
10.5k
        }
344
59.5k
    }
345
276
    return ret;
346
39.8k
}
_ZN5doris10RleDecoderIbE10GetNextRunEPbm
Line
Count
Source
301
39.8k
size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
302
39.8k
    DCHECK(bit_reader_.is_initialized());
303
39.8k
    DCHECK_GT(max_run, 0);
304
39.8k
    size_t ret = 0;
305
39.8k
    size_t rem = max_run;
306
59.7k
    while (ReadHeader()) {
307
59.4k
        if (PREDICT_TRUE(repeat_count_ > 0)) {
308
48.9k
            if (PREDICT_FALSE(ret > 0 && *val != current_value_)) {
309
19.5k
                return ret;
310
19.5k
            }
311
29.4k
            *val = current_value_;
312
29.4k
            if (repeat_count_ >= rem) {
313
                // The next run is longer than the amount of remaining data
314
                // that the caller wants to read. Only consume it partially.
315
9.66k
                repeat_count_ -= rem;
316
9.66k
                ret += rem;
317
9.66k
                return ret;
318
9.66k
            }
319
19.7k
            ret += repeat_count_;
320
19.7k
            rem -= repeat_count_;
321
19.7k
            repeat_count_ = 0;
322
19.7k
        } else {
323
10.5k
            DCHECK(literal_count_ > 0);
324
10.5k
            if (ret == 0) {
325
10.4k
                bool has_more = bit_reader_.GetValue(bit_width_, val);
326
10.4k
                DCHECK(has_more);
327
10.4k
                literal_count_--;
328
10.4k
                ret++;
329
10.4k
                rem--;
330
10.4k
            }
331
332
41.9k
            while (literal_count_ > 0) {
333
41.7k
                bool result = bit_reader_.GetValue(bit_width_, &current_value_);
334
41.7k
                DCHECK(result);
335
41.7k
                if (current_value_ != *val || rem == 0) {
336
10.3k
                    bit_reader_.Rewind(bit_width_);
337
10.3k
                    return ret;
338
10.3k
                }
339
31.4k
                ret++;
340
31.4k
                rem--;
341
31.4k
                literal_count_--;
342
31.4k
            }
343
10.5k
        }
344
59.4k
    }
345
276
    return ret;
346
39.8k
}
_ZN5doris10RleDecoderIsE10GetNextRunEPsm
Line
Count
Source
301
7
size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
302
7
    DCHECK(bit_reader_.is_initialized());
303
7
    DCHECK_GT(max_run, 0);
304
7
    size_t ret = 0;
305
7
    size_t rem = max_run;
306
7
    while (ReadHeader()) {
307
7
        if (PREDICT_TRUE(repeat_count_ > 0)) {
308
7
            if (PREDICT_FALSE(ret > 0 && *val != current_value_)) {
309
0
                return ret;
310
0
            }
311
7
            *val = current_value_;
312
7
            if (repeat_count_ >= rem) {
313
                // The next run is longer than the amount of remaining data
314
                // that the caller wants to read. Only consume it partially.
315
7
                repeat_count_ -= rem;
316
7
                ret += rem;
317
7
                return ret;
318
7
            }
319
0
            ret += repeat_count_;
320
0
            rem -= repeat_count_;
321
0
            repeat_count_ = 0;
322
0
        } else {
323
0
            DCHECK(literal_count_ > 0);
324
0
            if (ret == 0) {
325
0
                bool has_more = bit_reader_.GetValue(bit_width_, val);
326
0
                DCHECK(has_more);
327
0
                literal_count_--;
328
0
                ret++;
329
0
                rem--;
330
0
            }
331
332
0
            while (literal_count_ > 0) {
333
0
                bool result = bit_reader_.GetValue(bit_width_, &current_value_);
334
0
                DCHECK(result);
335
0
                if (current_value_ != *val || rem == 0) {
336
0
                    bit_reader_.Rewind(bit_width_);
337
0
                    return ret;
338
0
                }
339
0
                ret++;
340
0
                rem--;
341
0
                literal_count_--;
342
0
            }
343
0
        }
344
7
    }
345
0
    return ret;
346
7
}
347
348
template <typename T>
349
39
size_t RleDecoder<T>::get_values(T* values, size_t num_values) {
350
39
    size_t read_num = 0;
351
120
    while (read_num < num_values) {
352
81
        size_t read_this_time = num_values - read_num;
353
354
81
        if (LIKELY(repeat_count_ > 0)) {
355
33
            read_this_time = std::min((size_t)repeat_count_, read_this_time);
356
33
            std::fill(values, values + read_this_time, current_value_);
357
33
            values += read_this_time;
358
33
            repeat_count_ -= read_this_time;
359
33
            read_num += read_this_time;
360
48
        } else if (literal_count_ > 0) {
361
8
            read_this_time = std::min((size_t)literal_count_, read_this_time);
362
59
            for (int i = 0; i < read_this_time; ++i) {
363
51
                bool result = bit_reader_.GetValue(bit_width_, values);
364
51
                DCHECK(result);
365
51
                values++;
366
51
            }
367
8
            literal_count_ -= read_this_time;
368
8
            read_num += read_this_time;
369
40
        } else {
370
40
            if (!ReadHeader()) {
371
0
                return read_num;
372
0
            }
373
40
        }
374
81
    }
375
39
    return read_num;
376
39
}
_ZN5doris10RleDecoderIhE10get_valuesEPhm
Line
Count
Source
349
5
size_t RleDecoder<T>::get_values(T* values, size_t num_values) {
350
5
    size_t read_num = 0;
351
14
    while (read_num < num_values) {
352
9
        size_t read_this_time = num_values - read_num;
353
354
9
        if (LIKELY(repeat_count_ > 0)) {
355
0
            read_this_time = std::min((size_t)repeat_count_, read_this_time);
356
0
            std::fill(values, values + read_this_time, current_value_);
357
0
            values += read_this_time;
358
0
            repeat_count_ -= read_this_time;
359
0
            read_num += read_this_time;
360
9
        } else if (literal_count_ > 0) {
361
5
            read_this_time = std::min((size_t)literal_count_, read_this_time);
362
40
            for (int i = 0; i < read_this_time; ++i) {
363
35
                bool result = bit_reader_.GetValue(bit_width_, values);
364
35
                DCHECK(result);
365
35
                values++;
366
35
            }
367
5
            literal_count_ -= read_this_time;
368
5
            read_num += read_this_time;
369
5
        } else {
370
4
            if (!ReadHeader()) {
371
0
                return read_num;
372
0
            }
373
4
        }
374
9
    }
375
5
    return read_num;
376
5
}
_ZN5doris10RleDecoderIsE10get_valuesEPsm
Line
Count
Source
349
34
size_t RleDecoder<T>::get_values(T* values, size_t num_values) {
350
34
    size_t read_num = 0;
351
106
    while (read_num < num_values) {
352
72
        size_t read_this_time = num_values - read_num;
353
354
72
        if (LIKELY(repeat_count_ > 0)) {
355
33
            read_this_time = std::min((size_t)repeat_count_, read_this_time);
356
33
            std::fill(values, values + read_this_time, current_value_);
357
33
            values += read_this_time;
358
33
            repeat_count_ -= read_this_time;
359
33
            read_num += read_this_time;
360
39
        } else if (literal_count_ > 0) {
361
3
            read_this_time = std::min((size_t)literal_count_, read_this_time);
362
19
            for (int i = 0; i < read_this_time; ++i) {
363
16
                bool result = bit_reader_.GetValue(bit_width_, values);
364
16
                DCHECK(result);
365
16
                values++;
366
16
            }
367
3
            literal_count_ -= read_this_time;
368
3
            read_num += read_this_time;
369
36
        } else {
370
36
            if (!ReadHeader()) {
371
0
                return read_num;
372
0
            }
373
36
        }
374
72
    }
375
34
    return read_num;
376
34
}
377
378
template <typename T>
379
size_t RleDecoder<T>::repeated_count() {
380
    if (repeat_count_ > 0) {
381
        return repeat_count_;
382
    }
383
    if (literal_count_ == 0) {
384
        ReadHeader();
385
    }
386
    return repeat_count_;
387
}
388
389
template <typename T>
390
T RleDecoder<T>::get_repeated_value(size_t count) {
391
    DCHECK_GE(repeat_count_, count);
392
    repeat_count_ -= count;
393
    return current_value_;
394
}
395
396
template <typename T>
397
5
size_t RleDecoder<T>::Skip(size_t to_skip) {
398
5
    DCHECK(bit_reader_.is_initialized());
399
400
5
    size_t set_count = 0;
401
16
    while (to_skip > 0) {
402
11
        bool result = ReadHeader();
403
11
        DCHECK(result);
404
405
11
        if (PREDICT_TRUE(repeat_count_ > 0)) {
406
7
            size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip;
407
7
            repeat_count_ -= nskip;
408
7
            to_skip -= nskip;
409
7
            if (current_value_ != 0) {
410
3
                set_count += nskip;
411
3
            }
412
7
        } else {
413
4
            DCHECK(literal_count_ > 0);
414
4
            size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip;
415
4
            literal_count_ -= nskip;
416
4
            to_skip -= nskip;
417
60
            for (; nskip > 0; nskip--) {
418
56
                T value = 0;
419
56
                bool result = bit_reader_.GetValue(bit_width_, &value);
420
56
                DCHECK(result);
421
56
                if (value != 0) {
422
28
                    set_count++;
423
28
                }
424
56
            }
425
4
        }
426
11
    }
427
5
    return set_count;
428
5
}
_ZN5doris10RleDecoderIbE4SkipEm
Line
Count
Source
397
4
size_t RleDecoder<T>::Skip(size_t to_skip) {
398
4
    DCHECK(bit_reader_.is_initialized());
399
400
4
    size_t set_count = 0;
401
14
    while (to_skip > 0) {
402
10
        bool result = ReadHeader();
403
10
        DCHECK(result);
404
405
10
        if (PREDICT_TRUE(repeat_count_ > 0)) {
406
7
            size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip;
407
7
            repeat_count_ -= nskip;
408
7
            to_skip -= nskip;
409
7
            if (current_value_ != 0) {
410
3
                set_count += nskip;
411
3
            }
412
7
        } else {
413
3
            DCHECK(literal_count_ > 0);
414
3
            size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip;
415
3
            literal_count_ -= nskip;
416
3
            to_skip -= nskip;
417
56
            for (; nskip > 0; nskip--) {
418
53
                T value = 0;
419
53
                bool result = bit_reader_.GetValue(bit_width_, &value);
420
53
                DCHECK(result);
421
53
                if (value != 0) {
422
26
                    set_count++;
423
26
                }
424
53
            }
425
3
        }
426
10
    }
427
4
    return set_count;
428
4
}
_ZN5doris10RleDecoderIhE4SkipEm
Line
Count
Source
397
1
size_t RleDecoder<T>::Skip(size_t to_skip) {
398
1
    DCHECK(bit_reader_.is_initialized());
399
400
1
    size_t set_count = 0;
401
2
    while (to_skip > 0) {
402
1
        bool result = ReadHeader();
403
1
        DCHECK(result);
404
405
1
        if (PREDICT_TRUE(repeat_count_ > 0)) {
406
0
            size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip;
407
0
            repeat_count_ -= nskip;
408
0
            to_skip -= nskip;
409
0
            if (current_value_ != 0) {
410
0
                set_count += nskip;
411
0
            }
412
1
        } else {
413
1
            DCHECK(literal_count_ > 0);
414
1
            size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip;
415
1
            literal_count_ -= nskip;
416
1
            to_skip -= nskip;
417
4
            for (; nskip > 0; nskip--) {
418
3
                T value = 0;
419
3
                bool result = bit_reader_.GetValue(bit_width_, &value);
420
3
                DCHECK(result);
421
3
                if (value != 0) {
422
2
                    set_count++;
423
2
                }
424
3
            }
425
1
        }
426
1
    }
427
1
    return set_count;
428
1
}
429
430
// This function buffers input values 8 at a time.  After seeing all 8 values,
431
// it decides whether they should be encoded as a literal or repeated run.
432
template <typename T>
433
732k
void RleEncoder<T>::Put(T value, size_t run_length) {
434
732k
    DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
435
436
    // TODO(perf): remove the loop and use the repeat_count_
437
3.35M
    for (; run_length > 0; run_length--) {
438
2.61M
        if (PREDICT_TRUE(current_value_ == value)) {
439
2.51M
            ++repeat_count_;
440
2.51M
            if (repeat_count_ > 8) {
441
                // This is just a continuation of the current run, no need to buffer the
442
                // values.
443
                // Note that this is the fast path for long repeated runs.
444
2.29M
                continue;
445
2.29M
            }
446
2.51M
        } else {
447
107k
            if (repeat_count_ >= 8) {
448
                // We had a run that was long enough but it has ended.  Flush the
449
                // current repeated run.
450
21.4k
                DCHECK_EQ(literal_count_, 0);
451
21.4k
                FlushRepeatedRun();
452
21.4k
            }
453
107k
            repeat_count_ = 1;
454
107k
            current_value_ = value;
455
107k
        }
456
457
329k
        buffered_values_[num_buffered_values_] = value;
458
329k
        if (++num_buffered_values_ == 8) {
459
41.0k
            DCHECK_EQ(literal_count_ % 8, 0);
460
41.0k
            FlushBufferedValues(false);
461
41.0k
        }
462
329k
    }
463
732k
}
_ZN5doris10RleEncoderImE3PutEmm
Line
Count
Source
433
244k
void RleEncoder<T>::Put(T value, size_t run_length) {
434
244k
    DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
435
436
    // TODO(perf): remove the loop and use the repeat_count_
437
488k
    for (; run_length > 0; run_length--) {
438
244k
        if (PREDICT_TRUE(current_value_ == value)) {
439
167k
            ++repeat_count_;
440
167k
            if (repeat_count_ > 8) {
441
                // This is just a continuation of the current run, no need to buffer the
442
                // values.
443
                // Note that this is the fast path for long repeated runs.
444
140k
                continue;
445
140k
            }
446
167k
        } else {
447
76.9k
            if (repeat_count_ >= 8) {
448
                // We had a run that was long enough but it has ended.  Flush the
449
                // current repeated run.
450
1.65k
                DCHECK_EQ(literal_count_, 0);
451
1.65k
                FlushRepeatedRun();
452
1.65k
            }
453
76.9k
            repeat_count_ = 1;
454
76.9k
            current_value_ = value;
455
76.9k
        }
456
457
103k
        buffered_values_[num_buffered_values_] = value;
458
103k
        if (++num_buffered_values_ == 8) {
459
12.9k
            DCHECK_EQ(literal_count_ % 8, 0);
460
12.9k
            FlushBufferedValues(false);
461
12.9k
        }
462
103k
    }
463
244k
}
_ZN5doris10RleEncoderIbE3PutEbm
Line
Count
Source
433
487k
void RleEncoder<T>::Put(T value, size_t run_length) {
434
487k
    DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
435
436
    // TODO(perf): remove the loop and use the repeat_count_
437
2.86M
    for (; run_length > 0; run_length--) {
438
2.37M
        if (PREDICT_TRUE(current_value_ == value)) {
439
2.34M
            ++repeat_count_;
440
2.34M
            if (repeat_count_ > 8) {
441
                // This is just a continuation of the current run, no need to buffer the
442
                // values.
443
                // Note that this is the fast path for long repeated runs.
444
2.15M
                continue;
445
2.15M
            }
446
2.34M
        } else {
447
30.1k
            if (repeat_count_ >= 8) {
448
                // We had a run that was long enough but it has ended.  Flush the
449
                // current repeated run.
450
19.7k
                DCHECK_EQ(literal_count_, 0);
451
19.7k
                FlushRepeatedRun();
452
19.7k
            }
453
30.1k
            repeat_count_ = 1;
454
30.1k
            current_value_ = value;
455
30.1k
        }
456
457
225k
        buffered_values_[num_buffered_values_] = value;
458
225k
        if (++num_buffered_values_ == 8) {
459
28.0k
            DCHECK_EQ(literal_count_ % 8, 0);
460
28.0k
            FlushBufferedValues(false);
461
28.0k
        }
462
225k
    }
463
487k
}
464
465
template <typename T>
466
17.4k
void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
467
17.4k
    if (literal_indicator_byte_idx_ < 0) {
468
        // The literal indicator byte has not been reserved yet, get one now.
469
1.32k
        literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1);
470
1.32k
        DCHECK_GE(literal_indicator_byte_idx_, 0);
471
1.32k
    }
472
473
    // Write all the buffered values as bit packed literals
474
148k
    for (int i = 0; i < num_buffered_values_; ++i) {
475
130k
        bit_writer_.PutValue(buffered_values_[i], bit_width_);
476
130k
    }
477
17.4k
    num_buffered_values_ = 0;
478
479
17.4k
    if (update_indicator_byte) {
480
        // At this point we need to write the indicator byte for the literal run.
481
        // We only reserve one byte, to allow for streaming writes of literal values.
482
        // The logic makes sure we flush literal runs often enough to not overrun
483
        // the 1 byte.
484
1.32k
        int num_groups = BitUtil::Ceil(literal_count_, 8);
485
1.32k
        int32_t indicator_value = (num_groups << 1) | 1;
486
1.32k
        DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
487
1.32k
        bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value;
488
1.32k
        literal_indicator_byte_idx_ = -1;
489
1.32k
        literal_count_ = 0;
490
1.32k
    }
491
17.4k
}
_ZN5doris10RleEncoderImE15FlushLiteralRunEb
Line
Count
Source
466
12.0k
void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
467
12.0k
    if (literal_indicator_byte_idx_ < 0) {
468
        // The literal indicator byte has not been reserved yet, get one now.
469
1.12k
        literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1);
470
1.12k
        DCHECK_GE(literal_indicator_byte_idx_, 0);
471
1.12k
    }
472
473
    // Write all the buffered values as bit packed literals
474
101k
    for (int i = 0; i < num_buffered_values_; ++i) {
475
88.9k
        bit_writer_.PutValue(buffered_values_[i], bit_width_);
476
88.9k
    }
477
12.0k
    num_buffered_values_ = 0;
478
479
12.0k
    if (update_indicator_byte) {
480
        // At this point we need to write the indicator byte for the literal run.
481
        // We only reserve one byte, to allow for streaming writes of literal values.
482
        // The logic makes sure we flush literal runs often enough to not overrun
483
        // the 1 byte.
484
1.12k
        int num_groups = BitUtil::Ceil(literal_count_, 8);
485
1.12k
        int32_t indicator_value = (num_groups << 1) | 1;
486
1.12k
        DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
487
1.12k
        bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value;
488
1.12k
        literal_indicator_byte_idx_ = -1;
489
1.12k
        literal_count_ = 0;
490
1.12k
    }
491
12.0k
}
_ZN5doris10RleEncoderIbE15FlushLiteralRunEb
Line
Count
Source
466
5.34k
void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
467
5.34k
    if (literal_indicator_byte_idx_ < 0) {
468
        // The literal indicator byte has not been reserved yet, get one now.
469
205
        literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1);
470
205
        DCHECK_GE(literal_indicator_byte_idx_, 0);
471
205
    }
472
473
    // Write all the buffered values as bit packed literals
474
47.1k
    for (int i = 0; i < num_buffered_values_; ++i) {
475
41.7k
        bit_writer_.PutValue(buffered_values_[i], bit_width_);
476
41.7k
    }
477
5.34k
    num_buffered_values_ = 0;
478
479
5.34k
    if (update_indicator_byte) {
480
        // At this point we need to write the indicator byte for the literal run.
481
        // We only reserve one byte, to allow for streaming writes of literal values.
482
        // The logic makes sure we flush literal runs often enough to not overrun
483
        // the 1 byte.
484
205
        int num_groups = BitUtil::Ceil(literal_count_, 8);
485
205
        int32_t indicator_value = (num_groups << 1) | 1;
486
205
        DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
487
205
        bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value;
488
205
        literal_indicator_byte_idx_ = -1;
489
205
        literal_count_ = 0;
490
205
    }
491
5.34k
}
492
493
template <typename T>
494
22.0k
void RleEncoder<T>::FlushRepeatedRun() {
495
22.0k
    DCHECK_GT(repeat_count_, 0);
496
    // The lsb of 0 indicates this is a repeated run
497
22.0k
    int32_t indicator_value = repeat_count_ << 1 | 0;
498
22.0k
    bit_writer_.PutVlqInt(indicator_value);
499
22.0k
    bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
500
22.0k
    num_buffered_values_ = 0;
501
22.0k
    repeat_count_ = 0;
502
22.0k
}
_ZN5doris10RleEncoderImE16FlushRepeatedRunEv
Line
Count
Source
494
1.91k
void RleEncoder<T>::FlushRepeatedRun() {
495
1.91k
    DCHECK_GT(repeat_count_, 0);
496
    // The lsb of 0 indicates this is a repeated run
497
1.91k
    int32_t indicator_value = repeat_count_ << 1 | 0;
498
1.91k
    bit_writer_.PutVlqInt(indicator_value);
499
1.91k
    bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
500
1.91k
    num_buffered_values_ = 0;
501
1.91k
    repeat_count_ = 0;
502
1.91k
}
_ZN5doris10RleEncoderIbE16FlushRepeatedRunEv
Line
Count
Source
494
20.0k
void RleEncoder<T>::FlushRepeatedRun() {
495
20.0k
    DCHECK_GT(repeat_count_, 0);
496
    // The lsb of 0 indicates this is a repeated run
497
20.0k
    int32_t indicator_value = repeat_count_ << 1 | 0;
498
20.0k
    bit_writer_.PutVlqInt(indicator_value);
499
20.0k
    bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
500
20.0k
    num_buffered_values_ = 0;
501
20.0k
    repeat_count_ = 0;
502
20.0k
}
503
504
// Flush the values that have been buffered.  At this point we decide whether
505
// we need to switch between the run types or continue the current one.
506
template <typename T>
507
41.0k
void RleEncoder<T>::FlushBufferedValues(bool done) {
508
41.0k
    if (repeat_count_ >= 8) {
509
        // Clear the buffered values.  They are part of the repeated run now and we
510
        // don't want to flush them out as literals.
511
24.7k
        num_buffered_values_ = 0;
512
24.7k
        if (literal_count_ != 0) {
513
            // There was a current literal run.  All the values in it have been flushed
514
            // but we still need to update the indicator byte.
515
913
            DCHECK_EQ(literal_count_ % 8, 0);
516
913
            DCHECK_EQ(repeat_count_, 8);
517
913
            FlushLiteralRun(true);
518
913
        }
519
24.7k
        DCHECK_EQ(literal_count_, 0);
520
24.7k
        return;
521
24.7k
    }
522
523
16.2k
    literal_count_ += num_buffered_values_;
524
16.2k
    int num_groups = BitUtil::Ceil(literal_count_, 8);
525
16.2k
    if (num_groups + 1 >= (1 << 6)) {
526
        // We need to start a new literal run because the indicator byte we've reserved
527
        // cannot store more values.
528
167
        DCHECK_GE(literal_indicator_byte_idx_, 0);
529
167
        FlushLiteralRun(true);
530
16.0k
    } else {
531
16.0k
        FlushLiteralRun(done);
532
16.0k
    }
533
16.2k
    repeat_count_ = 0;
534
16.2k
}
_ZN5doris10RleEncoderImE19FlushBufferedValuesEb
Line
Count
Source
507
12.9k
void RleEncoder<T>::FlushBufferedValues(bool done) {
508
12.9k
    if (repeat_count_ >= 8) {
509
        // Clear the buffered values.  They are part of the repeated run now and we
510
        // don't want to flush them out as literals.
511
1.85k
        num_buffered_values_ = 0;
512
1.85k
        if (literal_count_ != 0) {
513
            // There was a current literal run.  All the values in it have been flushed
514
            // but we still need to update the indicator byte.
515
858
            DCHECK_EQ(literal_count_ % 8, 0);
516
858
            DCHECK_EQ(repeat_count_, 8);
517
858
            FlushLiteralRun(true);
518
858
        }
519
1.85k
        DCHECK_EQ(literal_count_, 0);
520
1.85k
        return;
521
1.85k
    }
522
523
11.0k
    literal_count_ += num_buffered_values_;
524
11.0k
    int num_groups = BitUtil::Ceil(literal_count_, 8);
525
11.0k
    if (num_groups + 1 >= (1 << 6)) {
526
        // We need to start a new literal run because the indicator byte we've reserved
527
        // cannot store more values.
528
128
        DCHECK_GE(literal_indicator_byte_idx_, 0);
529
128
        FlushLiteralRun(true);
530
10.9k
    } else {
531
10.9k
        FlushLiteralRun(done);
532
10.9k
    }
533
11.0k
    repeat_count_ = 0;
534
11.0k
}
_ZN5doris10RleEncoderIbE19FlushBufferedValuesEb
Line
Count
Source
507
28.0k
void RleEncoder<T>::FlushBufferedValues(bool done) {
508
28.0k
    if (repeat_count_ >= 8) {
509
        // Clear the buffered values.  They are part of the repeated run now and we
510
        // don't want to flush them out as literals.
511
22.9k
        num_buffered_values_ = 0;
512
22.9k
        if (literal_count_ != 0) {
513
            // There was a current literal run.  All the values in it have been flushed
514
            // but we still need to update the indicator byte.
515
55
            DCHECK_EQ(literal_count_ % 8, 0);
516
55
            DCHECK_EQ(repeat_count_, 8);
517
55
            FlushLiteralRun(true);
518
55
        }
519
22.9k
        DCHECK_EQ(literal_count_, 0);
520
22.9k
        return;
521
22.9k
    }
522
523
5.17k
    literal_count_ += num_buffered_values_;
524
5.17k
    int num_groups = BitUtil::Ceil(literal_count_, 8);
525
5.17k
    if (num_groups + 1 >= (1 << 6)) {
526
        // We need to start a new literal run because the indicator byte we've reserved
527
        // cannot store more values.
528
39
        DCHECK_GE(literal_indicator_byte_idx_, 0);
529
39
        FlushLiteralRun(true);
530
5.13k
    } else {
531
5.13k
        FlushLiteralRun(done);
532
5.13k
    }
533
5.17k
    repeat_count_ = 0;
534
5.17k
}
535
536
template <typename T>
537
0
void RleEncoder<T>::Reserve(int num_bytes, uint8_t val) {
538
0
    for (int i = 0; i < num_bytes; ++i) {
539
0
        bit_writer_.PutValue(val, 8);
540
0
    }
541
0
}
542
543
template <typename T>
544
806
int RleEncoder<T>::Flush() {
545
806
    if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
546
804
        bool all_repeat = literal_count_ == 0 &&
547
804
                          (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
548
        // There is something pending, figure out if it's a repeated or literal run
549
804
        if (repeat_count_ > 0 && all_repeat) {
550
559
            FlushRepeatedRun();
551
559
        } else {
552
245
            literal_count_ += num_buffered_values_;
553
245
            FlushLiteralRun(true);
554
245
            repeat_count_ = 0;
555
245
        }
556
804
    }
557
806
    bit_writer_.Flush();
558
806
    DCHECK_EQ(num_buffered_values_, 0);
559
806
    DCHECK_EQ(literal_count_, 0);
560
806
    DCHECK_EQ(repeat_count_, 0);
561
806
    return bit_writer_.bytes_written();
562
806
}
_ZN5doris10RleEncoderImE5FlushEv
Line
Count
Source
544
394
int RleEncoder<T>::Flush() {
545
394
    if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
546
394
        bool all_repeat = literal_count_ == 0 &&
547
394
                          (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
548
        // There is something pending, figure out if it's a repeated or literal run
549
394
        if (repeat_count_ > 0 && all_repeat) {
550
260
            FlushRepeatedRun();
551
260
        } else {
552
134
            literal_count_ += num_buffered_values_;
553
134
            FlushLiteralRun(true);
554
134
            repeat_count_ = 0;
555
134
        }
556
394
    }
557
394
    bit_writer_.Flush();
558
394
    DCHECK_EQ(num_buffered_values_, 0);
559
394
    DCHECK_EQ(literal_count_, 0);
560
394
    DCHECK_EQ(repeat_count_, 0);
561
394
    return bit_writer_.bytes_written();
562
394
}
_ZN5doris10RleEncoderIbE5FlushEv
Line
Count
Source
544
412
int RleEncoder<T>::Flush() {
545
412
    if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
546
410
        bool all_repeat = literal_count_ == 0 &&
547
410
                          (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
548
        // There is something pending, figure out if it's a repeated or literal run
549
410
        if (repeat_count_ > 0 && all_repeat) {
550
299
            FlushRepeatedRun();
551
299
        } else {
552
111
            literal_count_ += num_buffered_values_;
553
111
            FlushLiteralRun(true);
554
111
            repeat_count_ = 0;
555
111
        }
556
410
    }
557
412
    bit_writer_.Flush();
558
412
    DCHECK_EQ(num_buffered_values_, 0);
559
412
    DCHECK_EQ(literal_count_, 0);
560
412
    DCHECK_EQ(repeat_count_, 0);
561
412
    return bit_writer_.bytes_written();
562
412
}
563
564
template <typename T>
565
6.54k
void RleEncoder<T>::Clear() {
566
6.54k
    current_value_ = 0;
567
6.54k
    repeat_count_ = 0;
568
6.54k
    num_buffered_values_ = 0;
569
6.54k
    literal_count_ = 0;
570
6.54k
    literal_indicator_byte_idx_ = -1;
571
6.54k
    bit_writer_.Clear();
572
6.54k
}
_ZN5doris10RleEncoderImE5ClearEv
Line
Count
Source
565
394
void RleEncoder<T>::Clear() {
566
394
    current_value_ = 0;
567
394
    repeat_count_ = 0;
568
394
    num_buffered_values_ = 0;
569
394
    literal_count_ = 0;
570
394
    literal_indicator_byte_idx_ = -1;
571
394
    bit_writer_.Clear();
572
394
}
_ZN5doris10RleEncoderIbE5ClearEv
Line
Count
Source
565
6.15k
void RleEncoder<T>::Clear() {
566
6.15k
    current_value_ = 0;
567
6.15k
    repeat_count_ = 0;
568
6.15k
    num_buffered_values_ = 0;
569
6.15k
    literal_count_ = 0;
570
6.15k
    literal_indicator_byte_idx_ = -1;
571
6.15k
    bit_writer_.Clear();
572
6.15k
}
573
574
// Copy from https://github.com/apache/impala/blob/master/be/src/util/rle-encoding.h
575
// Utility classes to do run length encoding (RLE) for fixed bit width values.  If runs
576
// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
577
// (literal encoding).
578
//
579
// For both types of runs, there is a byte-aligned indicator which encodes the length
580
// of the run and the type of the run.
581
//
582
// This encoding has the benefit that when there aren't any long enough runs, values
583
// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
584
// the run length are byte aligned. This allows for very efficient decoding
585
// implementations.
586
// The encoding is:
587
//    encoded-block := run*
588
//    run := literal-run | repeated-run
589
//    literal-run := literal-indicator < literal bytes >
590
//    repeated-run := repeated-indicator < repeated value. padded to byte boundary >
591
//    literal-indicator := varint_encode( number_of_groups << 1 | 1)
592
//    repeated-indicator := varint_encode( number_of_repetitions << 1 )
593
//
594
// Each run is preceded by a varint. The varint's least significant bit is
595
// used to indicate whether the run is a literal run or a repeated run. The rest
596
// of the varint is used to determine the length of the run (eg how many times the
597
// value repeats).
598
//
599
// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
600
// in groups of 8), so that no matter the bit-width of the value, the sequence will end
601
// on a byte boundary without padding.
602
// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
603
// the actual number of encoded ints. (This means that the total number of encoded values
604
// can not be determined from the encoded data, since the number of values in the last
605
// group may not be a multiple of 8). For the last group of literal runs, we pad
606
// the group to 8 with zeros. This allows for 8 at a time decoding on the read side
607
// without the need for additional checks.
608
//
609
// There is a break-even point when it is more storage efficient to do run length
610
// encoding.  For 1 bit-width values, that point is 8 values.  They require 2 bytes
611
// for both the repeated encoding or the literal encoding.  This value can always
612
// be computed based on the bit-width.
613
// TODO: For 1 bit-width values it can be optimal to use 16 or 24 values, but more
614
// investigation is needed to do this efficiently, see the reverted IMPALA-6658.
615
// TODO: think about how to use this for strings.  The bit packing isn't quite the same.
616
//
617
// Examples with bit-width 1 (eg encoding booleans):
618
// ----------------------------------------
619
// 100 1s followed by 100 0s:
620
// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
621
//  - (total 4 bytes)
622
//
623
// alternating 1s and 0s (200 total):
624
// 200 ints = 25 groups of 8
625
// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
626
// (total 26 bytes, 1 byte overhead)
627
628
// RLE decoder with a batch-oriented interface that enables fast decoding.
629
// Users of this class must first initialize the class to point to a buffer of
630
// RLE-encoded data, passed into the constructor or Reset(). The provided
631
// bit_width must be at most min(sizeof(T) * 8, BatchedBitReader::MAX_BITWIDTH).
632
// Then they can decode data by checking NextNumRepeats()/NextNumLiterals() to
633
// see if the next run is a repeated or literal run, then calling
634
// GetRepeatedValue() or GetLiteralValues() respectively to read the values.
635
//
636
// End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0.
637
// Other decoding errors are signalled by functions returning false. If an
638
// error is encountered then it is not valid to read any more data until
639
// Reset() is called.
640
641
//bit-packed-run-len and rle-run-len must be in the range [1, 2^31 - 1].
642
// This means that a Parquet implementation can always store the run length in a signed 32-bit integer.
643
template <typename T>
644
class RleBatchDecoder {
645
public:
646
35
    RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) {
647
35
        Reset(buffer, buffer_len, bit_width);
648
35
    }
649
650
    RleBatchDecoder() = default;
651
652
    // Reset the decoder to read from a new buffer.
653
    void Reset(uint8_t* buffer, int buffer_len, int bit_width);
654
655
    // Return the size of the current repeated run. Returns zero if the current run is
656
    // a literal run or if no more runs can be read from the input.
657
    int32_t NextNumRepeats();
658
659
    // Get the value of the current repeated run and consume the given number of repeats.
660
    // Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot
661
    // be greater than the remaining number of repeats in the run. 'num_repeats_to_consume'
662
    // can be set to 0 to peek at the value without consuming repeats.
663
    T GetRepeatedValue(int32_t num_repeats_to_consume);
664
665
    // Return the size of the current literal run. Returns zero if the current run is
666
    // a repeated run or if no more runs can be read from the input.
667
    int32_t NextNumLiterals();
668
669
    // Consume 'num_literals_to_consume' literals from the current literal run,
670
    // copying the values to 'values'. 'num_literals_to_consume' must be <=
671
    // NextNumLiterals(). Returns true if the requested number of literals were
672
    // successfully read or false if an error was encountered, e.g. the input was
673
    // truncated.
674
    bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT;
675
676
    // Consume 'num_values_to_consume' values and copy them to 'values'.
677
    // Returns the number of consumed values or 0 if an error occurred.
678
    uint32_t GetBatch(T* values, uint32_t batch_num);
679
680
private:
681
    // Called when both 'literal_count_' and 'repeat_count_' have been exhausted.
682
    // Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal
683
    // or repeated run, or leaves both at 0 if no more values can be read (either because
684
    // the end of the input was reached or an error was encountered decoding).
685
    void NextCounts();
686
687
    /// Fill the literal buffer. Invalid to call if there are already buffered literals.
688
    /// Return false if the input was truncated. This does not advance 'literal_count_'.
689
    bool FillLiteralBuffer() WARN_UNUSED_RESULT;
690
691
20
    bool HaveBufferedLiterals() const { return literal_buffer_pos_ < num_buffered_literals_; }
692
693
    /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing
694
    /// 'literal_count_'. Returns the number of literals outputted.
695
    int32_t OutputBufferedLiterals(int32_t max_to_output, T* values);
696
697
    BatchedBitReader bit_reader_;
698
699
    // Number of bits needed to encode the value. Must be between 0 and 64 after
700
    // the decoder is initialized with a buffer. -1 indicates the decoder was not
701
    // initialized.
702
    int bit_width_ = -1;
703
704
    // If a repeated run, the number of repeats remaining in the current run to be read.
705
    // If the current run is a literal run, this is 0.
706
    int32_t repeat_count_ = 0;
707
708
    // If a literal run, the number of literals remaining in the current run to be read.
709
    // If the current run is a repeated run, this is 0.
710
    int32_t literal_count_ = 0;
711
712
    // If a repeated run, the current repeated value.
713
    T repeated_value_;
714
715
    // Size of buffer for literal values. Large enough to decode a full batch of 32
716
    // literals. The buffer is needed to allow clients to read in batches that are not
717
    // multiples of 32.
718
    static constexpr int LITERAL_BUFFER_LEN = 32;
719
720
    // Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' is the
721
    // position of the next literal to be read from the buffer.
722
    T literal_buffer_[LITERAL_BUFFER_LEN];
723
    int num_buffered_literals_ = 0;
724
    int literal_buffer_pos_ = 0;
725
};
726
727
template <typename T>
728
20
int32_t RleBatchDecoder<T>::OutputBufferedLiterals(int32_t max_to_output, T* values) {
729
20
    int32_t num_to_output =
730
20
            std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_);
731
20
    memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * num_to_output);
732
20
    literal_buffer_pos_ += num_to_output;
733
20
    literal_count_ -= num_to_output;
734
20
    return num_to_output;
735
20
}
736
737
template <typename T>
738
35
void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) {
739
35
    bit_reader_.Reset(buffer, buffer_len);
740
35
    bit_width_ = bit_width;
741
35
    repeat_count_ = 0;
742
35
    literal_count_ = 0;
743
35
    num_buffered_literals_ = 0;
744
35
    literal_buffer_pos_ = 0;
745
35
}
746
747
template <typename T>
748
56
int32_t RleBatchDecoder<T>::NextNumRepeats() {
749
56
    if (repeat_count_ > 0) return repeat_count_;
750
54
    if (literal_count_ == 0) NextCounts();
751
54
    return repeat_count_;
752
56
}
753
754
template <typename T>
755
54
void RleBatchDecoder<T>::NextCounts() {
756
    // Read the next run's indicator int, it could be a literal or repeated run.
757
    // The int is encoded as a ULEB128-encoded value.
758
54
    uint32_t indicator_value = 0;
759
54
    if (UNLIKELY(!bit_reader_.GetUleb128<uint32_t>(&indicator_value))) {
760
0
        return;
761
0
    }
762
763
    // lsb indicates if it is a literal run or repeated run
764
54
    bool is_literal = indicator_value & 1;
765
766
    // Don't try to handle run lengths that don't fit in an int32_t - just fail gracefully.
767
    // The Parquet standard does not allow longer runs - see PARQUET-1290.
768
54
    uint32_t run_len = indicator_value >> 1;
769
54
    if (is_literal) {
770
        // Use int64_t to avoid overflowing multiplication.
771
20
        int64_t literal_count = static_cast<int64_t>(run_len) * 8;
772
20
        if (UNLIKELY(literal_count > std::numeric_limits<int32_t>::max())) return;
773
20
        literal_count_ = literal_count;
774
34
    } else {
775
34
        if (UNLIKELY(run_len == 0)) return;
776
34
        bool result = bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_);
777
34
        if (UNLIKELY(!result)) return;
778
34
        repeat_count_ = run_len;
779
34
    }
780
54
}
781
782
template <typename T>
783
36
T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) {
784
36
    repeat_count_ -= num_repeats_to_consume;
785
36
    return repeated_value_;
786
36
}
787
788
template <typename T>
789
20
int32_t RleBatchDecoder<T>::NextNumLiterals() {
790
20
    if (literal_count_ > 0) return literal_count_;
791
0
    if (repeat_count_ == 0) NextCounts();
792
0
    return literal_count_;
793
20
}
794
795
template <typename T>
796
20
bool RleBatchDecoder<T>::GetLiteralValues(int32_t num_literals_to_consume, T* values) {
797
20
    int32_t num_consumed = 0;
798
    // Copy any buffered literals left over from previous calls.
799
20
    if (HaveBufferedLiterals()) {
800
0
        num_consumed = OutputBufferedLiterals(num_literals_to_consume, values);
801
0
    }
802
803
20
    int32_t num_remaining = num_literals_to_consume - num_consumed;
804
    // Copy literals directly to the output, bypassing 'literal_buffer_' when possible.
805
    // Need to round to a batch of 32 if the caller is consuming only part of the current
806
    // run avoid ending on a non-byte boundary.
807
20
    int32_t num_to_bypass =
808
20
            std::min<int32_t>(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32));
809
20
    if (num_to_bypass > 0) {
810
0
        int num_read = bit_reader_.UnpackBatch(bit_width_, num_to_bypass, values + num_consumed);
811
        // If we couldn't read the expected number, that means the input was truncated.
812
0
        if (num_read < num_to_bypass) return false;
813
0
        literal_count_ -= num_to_bypass;
814
0
        num_consumed += num_to_bypass;
815
0
        num_remaining = num_literals_to_consume - num_consumed;
816
0
    }
817
818
20
    if (num_remaining > 0) {
819
        // We weren't able to copy all the literals requested directly from the input.
820
        // Buffer literals and copy over the requested number.
821
20
        if (UNLIKELY(!FillLiteralBuffer())) return false;
822
20
        OutputBufferedLiterals(num_remaining, values + num_consumed);
823
20
    }
824
20
    return true;
825
20
}
826
827
template <typename T>
828
20
bool RleBatchDecoder<T>::FillLiteralBuffer() {
829
20
    int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN, literal_count_);
830
20
    num_buffered_literals_ = bit_reader_.UnpackBatch(bit_width_, num_to_buffer, literal_buffer_);
831
    // If we couldn't read the expected number, that means the input was truncated.
832
20
    if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false;
833
20
    literal_buffer_pos_ = 0;
834
20
    return true;
835
20
}
836
837
template <typename T>
838
37
uint32_t RleBatchDecoder<T>::GetBatch(T* values, uint32_t batch_num) {
839
37
    uint32_t num_consumed = 0;
840
93
    while (num_consumed < batch_num) {
841
        // Add RLE encoded values by repeating the current value this number of times.
842
56
        uint32_t num_repeats = NextNumRepeats();
843
56
        if (num_repeats > 0) {
844
36
            int32_t num_repeats_to_set = std::min(num_repeats, batch_num - num_consumed);
845
36
            T repeated_value = GetRepeatedValue(num_repeats_to_set);
846
292
            for (int i = 0; i < num_repeats_to_set; ++i) {
847
256
                values[num_consumed + i] = repeated_value;
848
256
            }
849
36
            num_consumed += num_repeats_to_set;
850
36
            continue;
851
36
        }
852
853
        // Add remaining literal values, if any.
854
20
        uint32_t num_literals = NextNumLiterals();
855
20
        if (num_literals == 0) {
856
0
            break;
857
0
        }
858
20
        uint32_t num_literals_to_set = std::min(num_literals, batch_num - num_consumed);
859
20
        if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) {
860
0
            return 0;
861
0
        }
862
20
        num_consumed += num_literals_to_set;
863
20
    }
864
37
    return num_consumed;
865
37
}
866
867
} // namespace doris