Coverage Report

Created: 2025-04-27 02:50

/root/doris/be/src/util/rle_encoding.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#pragma once
18
19
#include <glog/logging.h>
20
21
#include <limits> // IWYU pragma: keep
22
23
#include "gutil/port.h"
24
#include "util/bit_stream_utils.inline.h"
25
#include "util/bit_util.h"
26
27
namespace doris {
28
29
// Utility classes to do run length encoding (RLE) for fixed bit width values.  If runs
30
// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
31
// (literal encoding).
32
// For both types of runs, there is a byte-aligned indicator which encodes the length
33
// of the run and the type of the run.
34
// This encoding has the benefit that when there aren't any long enough runs, values
35
// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
36
// the run length are byte aligned. This allows for very efficient decoding
37
// implementations.
38
// The encoding is:
39
//    encoded-block := run*
40
//    run := literal-run | repeated-run
41
//    literal-run := literal-indicator < literal bytes >
42
//    repeated-run := repeated-indicator < repeated value. padded to byte boundary >
43
//    literal-indicator := varint_encode( number_of_groups << 1 | 1)
44
//    repeated-indicator := varint_encode( number_of_repetitions << 1 )
45
//
46
// Each run is preceded by a varint. The varint's least significant bit is
47
// used to indicate whether the run is a literal run or a repeated run. The rest
48
// of the varint is used to determine the length of the run (eg how many times the
49
// value repeats).
50
//
51
// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
52
// in groups of 8), so that no matter the bit-width of the value, the sequence will end
53
// on a byte boundary without padding.
54
// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
55
// the actual number of encoded ints. (This means that the total number of encoded values
56
// can not be determined from the encoded data, since the number of values in the last
57
// group may not be a multiple of 8).
58
// There is a break-even point when it is more storage efficient to do run length
59
// encoding.  For 1 bit-width values, that point is 8 values.  They require 2 bytes
60
// for both the repeated encoding or the literal encoding.  This value can always
61
// be computed based on the bit-width.
62
// TODO: think about how to use this for strings.  The bit packing isn't quite the same.
63
//
64
// Examples with bit-width 1 (eg encoding booleans):
65
// ----------------------------------------
66
// 100 1s followed by 100 0s:
67
// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
68
//  - (total 4 bytes)
69
//
70
// alternating 1s and 0s (200 total):
71
// 200 ints = 25 groups of 8
72
// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
73
// (total 26 bytes, 1 byte overhead)
74
//
75
76
// Decoder class for RLE encoded data.
77
//
78
// NOTE: the encoded format does not have any length prefix or any other way of
79
// indicating that the encoded sequence ends at a certain point, so the Decoder
80
// methods may return some extra bits at the end before the read methods start
81
// to return 0/false.
82
template <typename T>
83
class RleDecoder {
84
public:
85
    // Create a decoder object. buffer/buffer_len is the decoded data.
86
    // bit_width is the width of each value (before encoding).
87
    RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
88
            : bit_reader_(buffer, buffer_len),
89
              bit_width_(bit_width),
90
              current_value_(0),
91
              repeat_count_(0),
92
              literal_count_(0),
93
1.70k
              rewind_state_(CANT_REWIND) {
94
1.70k
        DCHECK_GE(bit_width_, 1);
95
1.70k
        DCHECK_LE(bit_width_, 64);
96
1.70k
    }
_ZN5doris10RleDecoderImEC2EPKhii
Line
Count
Source
93
788
              rewind_state_(CANT_REWIND) {
94
788
        DCHECK_GE(bit_width_, 1);
95
788
        DCHECK_LE(bit_width_, 64);
96
788
    }
_ZN5doris10RleDecoderIbEC2EPKhii
Line
Count
Source
93
818
              rewind_state_(CANT_REWIND) {
94
818
        DCHECK_GE(bit_width_, 1);
95
818
        DCHECK_LE(bit_width_, 64);
96
818
    }
_ZN5doris10RleDecoderIhEC2EPKhii
Line
Count
Source
93
10
              rewind_state_(CANT_REWIND) {
94
10
        DCHECK_GE(bit_width_, 1);
95
10
        DCHECK_LE(bit_width_, 64);
96
10
    }
_ZN5doris10RleDecoderIsEC2EPKhii
Line
Count
Source
93
90
              rewind_state_(CANT_REWIND) {
94
90
        DCHECK_GE(bit_width_, 1);
95
90
        DCHECK_LE(bit_width_, 64);
96
90
    }
97
98
51.2k
    RleDecoder() {}
_ZN5doris10RleDecoderIbEC2Ev
Line
Count
Source
98
51.0k
    RleDecoder() {}
_ZN5doris10RleDecoderIhEC2Ev
Line
Count
Source
98
12
    RleDecoder() {}
_ZN5doris10RleDecoderIsEC2Ev
Line
Count
Source
98
174
    RleDecoder() {}
99
100
    // Skip n values, and returns the number of non-zero entries skipped.
101
    size_t Skip(size_t to_skip);
102
103
    // Gets the next value.  Returns false if there are no more.
104
    bool Get(T* val);
105
106
    // Seek to the previous value.
107
    void RewindOne();
108
109
    // Gets the next run of the same 'val'. Returns 0 if there is no
110
    // more data to be decoded. Will return a run of at most 'max_run'
111
    // values. If there are more values than this, the next call to
112
    // GetNextRun will return more from the same run.
113
    size_t GetNextRun(T* val, size_t max_run);
114
115
    size_t get_values(T* values, size_t num_values);
116
117
    // Get the count of current repeated value
118
    size_t repeated_count();
119
120
    // Get current repeated value, make sure that count equals repeated_count()
121
    T get_repeated_value(size_t count);
122
123
0
    const BitReader& bit_reader() const { return bit_reader_; }
124
125
private:
126
    bool ReadHeader();
127
128
    enum RewindState { REWIND_LITERAL, REWIND_RUN, CANT_REWIND };
129
130
    BitReader bit_reader_;
131
    int bit_width_;
132
    uint64_t current_value_;
133
    uint32_t repeat_count_;
134
    uint32_t literal_count_;
135
    RewindState rewind_state_;
136
};
137
138
// Class to incrementally build the rle data.
139
// The encoding has two modes: encoding repeated runs and literal runs.
140
// If the run is sufficiently short, it is more efficient to encode as a literal run.
141
// This class does so by buffering 8 values at a time.  If they are not all the same
142
// they are added to the literal run.  If they are the same, they are added to the
143
// repeated run.  When we switch modes, the previous run is flushed out.
144
template <typename T>
145
class RleEncoder {
146
public:
147
    // buffer: buffer to write bits to.
148
    // bit_width: max number of bits for value.
149
    // TODO: consider adding a min_repeated_run_length so the caller can control
150
    // when values should be encoded as repeated runs.  Currently this is derived
151
    // based on the bit_width, which can determine a storage optimal choice.
152
    explicit RleEncoder(faststring* buffer, int bit_width)
153
7.33k
            : bit_width_(bit_width), bit_writer_(buffer) {
154
7.33k
        DCHECK_GE(bit_width_, 1);
155
7.33k
        DCHECK_LE(bit_width_, 64);
156
7.33k
        Clear();
157
7.33k
    }
_ZN5doris10RleEncoderImEC2EPNS_10faststringEi
Line
Count
Source
153
788
            : bit_width_(bit_width), bit_writer_(buffer) {
154
788
        DCHECK_GE(bit_width_, 1);
155
788
        DCHECK_LE(bit_width_, 64);
156
788
        Clear();
157
788
    }
_ZN5doris10RleEncoderIbEC2EPNS_10faststringEi
Line
Count
Source
153
6.54k
            : bit_width_(bit_width), bit_writer_(buffer) {
154
6.54k
        DCHECK_GE(bit_width_, 1);
155
6.54k
        DCHECK_LE(bit_width_, 64);
156
6.54k
        Clear();
157
6.54k
    }
158
159
    // Reserve 'num_bytes' bytes for a plain encoded header, set each
160
    // byte with 'val': this is used for the RLE-encoded data blocks in
161
    // order to be able to able to store the initial ordinal position
162
    // and number of elements. This is a part of RleEncoder in order to
163
    // maintain the correct offset in 'buffer'.
164
    void Reserve(int num_bytes, uint8_t val);
165
166
    // Encode value. This value must be representable with bit_width_ bits.
167
    void Put(T value, size_t run_length = 1);
168
169
    // Flushes any pending values to the underlying buffer.
170
    // Returns the total number of bytes written
171
    int Flush();
172
173
    // Resets all the state in the encoder.
174
    void Clear();
175
176
604
    int32_t len() const { return bit_writer_.bytes_written(); }
177
178
private:
179
    // Flushes any buffered values.  If this is part of a repeated run, this is largely
180
    // a no-op.
181
    // If it is part of a literal run, this will call FlushLiteralRun, which writes
182
    // out the buffered literal values.
183
    // If 'done' is true, the current run would be written even if it would normally
184
    // have been buffered more.  This should only be called at the end, when the
185
    // encoder has received all values even if it would normally continue to be
186
    // buffered.
187
    void FlushBufferedValues(bool done);
188
189
    // Flushes literal values to the underlying buffer.  If update_indicator_byte,
190
    // then the current literal run is complete and the indicator byte is updated.
191
    void FlushLiteralRun(bool update_indicator_byte);
192
193
    // Flushes a repeated run to the underlying buffer.
194
    void FlushRepeatedRun();
195
196
    // Number of bits needed to encode the value.
197
    const int bit_width_;
198
199
    // Underlying buffer.
200
    BitWriter bit_writer_;
201
202
    // We need to buffer at most 8 values for literals.  This happens when the
203
    // bit_width is 1 (so 8 values fit in one byte).
204
    // TODO: generalize this to other bit widths
205
    uint64_t buffered_values_[8];
206
207
    // Number of values in buffered_values_
208
    int num_buffered_values_;
209
210
    // The current (also last) value that was written and the count of how
211
    // many times in a row that value has been seen.  This is maintained even
212
    // if we are in a literal run.  If the repeat_count_ get high enough, we switch
213
    // to encoding repeated runs.
214
    uint64_t current_value_;
215
    int repeat_count_;
216
217
    // Number of literals in the current run.  This does not include the literals
218
    // that might be in buffered_values_.  Only after we've got a group big enough
219
    // can we decide if they should part of the literal_count_ or repeat_count_
220
    int literal_count_;
221
222
    // Index of a byte in the underlying buffer that stores the indicator byte.
223
    // This is reserved as soon as we need a literal run but the value is written
224
    // when the literal run is complete. We maintain an index rather than a pointer
225
    // into the underlying buffer because the pointer value may become invalid if
226
    // the underlying buffer is resized.
227
    int literal_indicator_byte_idx_;
228
};
229
230
template <typename T>
231
602k
bool RleDecoder<T>::ReadHeader() {
232
602k
    DCHECK(bit_reader_.is_initialized());
233
602k
    if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
234
        // Read the next run's indicator int, it could be a literal or repeated run
235
        // The int is encoded as a vlq-encoded value.
236
47.3k
        uint32_t indicator_value = 0;
237
47.3k
        bool result = bit_reader_.GetVlqInt(&indicator_value);
238
47.3k
        if (PREDICT_FALSE(!result)) {
239
552
            return false;
240
552
        }
241
242
        // lsb indicates if it is a literal run or repeated run
243
46.7k
        bool is_literal = indicator_value & 1;
244
46.7k
        if (is_literal) {
245
2.65k
            literal_count_ = (indicator_value >> 1) * 8;
246
2.65k
            DCHECK_GT(literal_count_, 0);
247
44.0k
        } else {
248
44.0k
            repeat_count_ = indicator_value >> 1;
249
44.0k
            DCHECK_GT(repeat_count_, 0);
250
44.0k
            bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
251
44.0k
                                                    reinterpret_cast<T*>(&current_value_));
252
44.0k
            DCHECK(result);
253
44.0k
        }
254
46.7k
    }
255
602k
    return true;
256
602k
}
_ZN5doris10RleDecoderImE10ReadHeaderEv
Line
Count
Source
231
488k
bool RleDecoder<T>::ReadHeader() {
232
488k
    DCHECK(bit_reader_.is_initialized());
233
488k
    if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
234
        // Read the next run's indicator int, it could be a literal or repeated run
235
        // The int is encoded as a vlq-encoded value.
236
6.07k
        uint32_t indicator_value = 0;
237
6.07k
        bool result = bit_reader_.GetVlqInt(&indicator_value);
238
6.07k
        if (PREDICT_FALSE(!result)) {
239
0
            return false;
240
0
        }
241
242
        // lsb indicates if it is a literal run or repeated run
243
6.07k
        bool is_literal = indicator_value & 1;
244
6.07k
        if (is_literal) {
245
2.24k
            literal_count_ = (indicator_value >> 1) * 8;
246
2.24k
            DCHECK_GT(literal_count_, 0);
247
3.83k
        } else {
248
3.83k
            repeat_count_ = indicator_value >> 1;
249
3.83k
            DCHECK_GT(repeat_count_, 0);
250
3.83k
            bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
251
3.83k
                                                    reinterpret_cast<T*>(&current_value_));
252
3.83k
            DCHECK(result);
253
3.83k
        }
254
6.07k
    }
255
488k
    return true;
256
488k
}
_ZN5doris10RleDecoderIbE10ReadHeaderEv
Line
Count
Source
231
114k
bool RleDecoder<T>::ReadHeader() {
232
114k
    DCHECK(bit_reader_.is_initialized());
233
114k
    if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
234
        // Read the next run's indicator int, it could be a literal or repeated run
235
        // The int is encoded as a vlq-encoded value.
236
41.1k
        uint32_t indicator_value = 0;
237
41.1k
        bool result = bit_reader_.GetVlqInt(&indicator_value);
238
41.1k
        if (PREDICT_FALSE(!result)) {
239
552
            return false;
240
552
        }
241
242
        // lsb indicates if it is a literal run or repeated run
243
40.5k
        bool is_literal = indicator_value & 1;
244
40.5k
        if (is_literal) {
245
401
            literal_count_ = (indicator_value >> 1) * 8;
246
401
            DCHECK_GT(literal_count_, 0);
247
40.1k
        } else {
248
40.1k
            repeat_count_ = indicator_value >> 1;
249
40.1k
            DCHECK_GT(repeat_count_, 0);
250
40.1k
            bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
251
40.1k
                                                    reinterpret_cast<T*>(&current_value_));
252
40.1k
            DCHECK(result);
253
40.1k
        }
254
40.5k
    }
255
113k
    return true;
256
114k
}
_ZN5doris10RleDecoderIsE10ReadHeaderEv
Line
Count
Source
231
98
bool RleDecoder<T>::ReadHeader() {
232
98
    DCHECK(bit_reader_.is_initialized());
233
98
    if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
234
        // Read the next run's indicator int, it could be a literal or repeated run
235
        // The int is encoded as a vlq-encoded value.
236
90
        uint32_t indicator_value = 0;
237
90
        bool result = bit_reader_.GetVlqInt(&indicator_value);
238
90
        if (PREDICT_FALSE(!result)) {
239
0
            return false;
240
0
        }
241
242
        // lsb indicates if it is a literal run or repeated run
243
90
        bool is_literal = indicator_value & 1;
244
90
        if (is_literal) {
245
6
            literal_count_ = (indicator_value >> 1) * 8;
246
6
            DCHECK_GT(literal_count_, 0);
247
84
        } else {
248
84
            repeat_count_ = indicator_value >> 1;
249
84
            DCHECK_GT(repeat_count_, 0);
250
84
            bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
251
84
                                                    reinterpret_cast<T*>(&current_value_));
252
84
            DCHECK(result);
253
84
        }
254
90
    }
255
98
    return true;
256
98
}
_ZN5doris10RleDecoderIhE10ReadHeaderEv
Line
Count
Source
231
10
bool RleDecoder<T>::ReadHeader() {
232
10
    DCHECK(bit_reader_.is_initialized());
233
10
    if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
234
        // Read the next run's indicator int, it could be a literal or repeated run
235
        // The int is encoded as a vlq-encoded value.
236
10
        uint32_t indicator_value = 0;
237
10
        bool result = bit_reader_.GetVlqInt(&indicator_value);
238
10
        if (PREDICT_FALSE(!result)) {
239
0
            return false;
240
0
        }
241
242
        // lsb indicates if it is a literal run or repeated run
243
10
        bool is_literal = indicator_value & 1;
244
10
        if (is_literal) {
245
10
            literal_count_ = (indicator_value >> 1) * 8;
246
10
            DCHECK_GT(literal_count_, 0);
247
10
        } else {
248
0
            repeat_count_ = indicator_value >> 1;
249
0
            DCHECK_GT(repeat_count_, 0);
250
0
            bool result = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
251
0
                                                    reinterpret_cast<T*>(&current_value_));
252
0
            DCHECK(result);
253
0
        }
254
10
    }
255
10
    return true;
256
10
}
257
258
template <typename T>
259
494k
bool RleDecoder<T>::Get(T* val) {
260
494k
    DCHECK(bit_reader_.is_initialized());
261
494k
    if (PREDICT_FALSE(!ReadHeader())) {
262
0
        return false;
263
0
    }
264
265
494k
    if (PREDICT_TRUE(repeat_count_ > 0)) {
266
316k
        *val = current_value_;
267
316k
        --repeat_count_;
268
316k
        rewind_state_ = REWIND_RUN;
269
316k
    } else {
270
178k
        DCHECK(literal_count_ > 0);
271
178k
        bool result = bit_reader_.GetValue(bit_width_, val);
272
178k
        DCHECK(result);
273
178k
        --literal_count_;
274
178k
        rewind_state_ = REWIND_LITERAL;
275
178k
    }
276
277
494k
    return true;
278
494k
}
_ZN5doris10RleDecoderImE3GetEPm
Line
Count
Source
259
488k
bool RleDecoder<T>::Get(T* val) {
260
488k
    DCHECK(bit_reader_.is_initialized());
261
488k
    if (PREDICT_FALSE(!ReadHeader())) {
262
0
        return false;
263
0
    }
264
265
488k
    if (PREDICT_TRUE(repeat_count_ > 0)) {
266
310k
        *val = current_value_;
267
310k
        --repeat_count_;
268
310k
        rewind_state_ = REWIND_RUN;
269
310k
    } else {
270
177k
        DCHECK(literal_count_ > 0);
271
177k
        bool result = bit_reader_.GetValue(bit_width_, val);
272
177k
        DCHECK(result);
273
177k
        --literal_count_;
274
177k
        rewind_state_ = REWIND_LITERAL;
275
177k
    }
276
277
488k
    return true;
278
488k
}
_ZN5doris10RleDecoderIbE3GetEPb
Line
Count
Source
259
6.35k
bool RleDecoder<T>::Get(T* val) {
260
6.35k
    DCHECK(bit_reader_.is_initialized());
261
6.35k
    if (PREDICT_FALSE(!ReadHeader())) {
262
0
        return false;
263
0
    }
264
265
6.35k
    if (PREDICT_TRUE(repeat_count_ > 0)) {
266
6.17k
        *val = current_value_;
267
6.17k
        --repeat_count_;
268
6.17k
        rewind_state_ = REWIND_RUN;
269
6.17k
    } else {
270
186
        DCHECK(literal_count_ > 0);
271
186
        bool result = bit_reader_.GetValue(bit_width_, val);
272
186
        DCHECK(result);
273
186
        --literal_count_;
274
186
        rewind_state_ = REWIND_LITERAL;
275
186
    }
276
277
6.35k
    return true;
278
6.35k
}
_ZN5doris10RleDecoderIsE3GetEPs
Line
Count
Source
259
12
bool RleDecoder<T>::Get(T* val) {
260
12
    DCHECK(bit_reader_.is_initialized());
261
12
    if (PREDICT_FALSE(!ReadHeader())) {
262
0
        return false;
263
0
    }
264
265
12
    if (PREDICT_TRUE(repeat_count_ > 0)) {
266
12
        *val = current_value_;
267
12
        --repeat_count_;
268
12
        rewind_state_ = REWIND_RUN;
269
12
    } else {
270
0
        DCHECK(literal_count_ > 0);
271
0
        bool result = bit_reader_.GetValue(bit_width_, val);
272
0
        DCHECK(result);
273
0
        --literal_count_;
274
0
        rewind_state_ = REWIND_LITERAL;
275
0
    }
276
277
12
    return true;
278
12
}
279
280
template <typename T>
281
4
void RleDecoder<T>::RewindOne() {
282
4
    DCHECK(bit_reader_.is_initialized());
283
284
4
    switch (rewind_state_) {
285
0
    case CANT_REWIND:
286
0
        throw Exception(Status::FatalError("Can't rewind more than once after each read!"));
287
0
        break;
288
4
    case REWIND_RUN:
289
4
        ++repeat_count_;
290
4
        break;
291
0
    case REWIND_LITERAL: {
292
0
        bit_reader_.Rewind(bit_width_);
293
0
        ++literal_count_;
294
0
        break;
295
0
    }
296
4
    }
297
298
4
    rewind_state_ = CANT_REWIND;
299
4
}
300
301
template <typename T>
302
67.9k
size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
303
67.9k
    DCHECK(bit_reader_.is_initialized());
304
67.9k
    DCHECK_GT(max_run, 0);
305
67.9k
    size_t ret = 0;
306
67.9k
    size_t rem = max_run;
307
107k
    while (ReadHeader()) {
308
107k
        if (PREDICT_TRUE(repeat_count_ > 0)) {
309
86.4k
            if (PREDICT_FALSE(ret > 0 && *val != current_value_)) {
310
39.2k
                return ret;
311
39.2k
            }
312
47.2k
            *val = current_value_;
313
47.2k
            if (repeat_count_ >= rem) {
314
                // The next run is longer than the amount of remaining data
315
                // that the caller wants to read. Only consume it partially.
316
7.51k
                repeat_count_ -= rem;
317
7.51k
                ret += rem;
318
7.51k
                return ret;
319
7.51k
            }
320
39.6k
            ret += repeat_count_;
321
39.6k
            rem -= repeat_count_;
322
39.6k
            repeat_count_ = 0;
323
39.6k
        } else {
324
20.9k
            DCHECK(literal_count_ > 0);
325
20.9k
            if (ret == 0) {
326
20.7k
                bool has_more = bit_reader_.GetValue(bit_width_, val);
327
20.7k
                DCHECK(has_more);
328
20.7k
                literal_count_--;
329
20.7k
                ret++;
330
20.7k
                rem--;
331
20.7k
            }
332
333
83.8k
            while (literal_count_ > 0) {
334
83.4k
                bool result = bit_reader_.GetValue(bit_width_, &current_value_);
335
83.4k
                DCHECK(result);
336
83.4k
                if (current_value_ != *val || rem == 0) {
337
20.6k
                    bit_reader_.Rewind(bit_width_);
338
20.6k
                    return ret;
339
20.6k
                }
340
62.8k
                ret++;
341
62.8k
                rem--;
342
62.8k
                literal_count_--;
343
62.8k
            }
344
20.9k
        }
345
107k
    }
346
552
    return ret;
347
67.9k
}
_ZN5doris10RleDecoderIbE10GetNextRunEPbm
Line
Count
Source
302
67.8k
size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
303
67.8k
    DCHECK(bit_reader_.is_initialized());
304
67.8k
    DCHECK_GT(max_run, 0);
305
67.8k
    size_t ret = 0;
306
67.8k
    size_t rem = max_run;
307
107k
    while (ReadHeader()) {
308
107k
        if (PREDICT_TRUE(repeat_count_ > 0)) {
309
86.4k
            if (PREDICT_FALSE(ret > 0 && *val != current_value_)) {
310
39.2k
                return ret;
311
39.2k
            }
312
47.1k
            *val = current_value_;
313
47.1k
            if (repeat_count_ >= rem) {
314
                // The next run is longer than the amount of remaining data
315
                // that the caller wants to read. Only consume it partially.
316
7.50k
                repeat_count_ -= rem;
317
7.50k
                ret += rem;
318
7.50k
                return ret;
319
7.50k
            }
320
39.6k
            ret += repeat_count_;
321
39.6k
            rem -= repeat_count_;
322
39.6k
            repeat_count_ = 0;
323
39.6k
        } else {
324
20.9k
            DCHECK(literal_count_ > 0);
325
20.9k
            if (ret == 0) {
326
20.7k
                bool has_more = bit_reader_.GetValue(bit_width_, val);
327
20.7k
                DCHECK(has_more);
328
20.7k
                literal_count_--;
329
20.7k
                ret++;
330
20.7k
                rem--;
331
20.7k
            }
332
333
83.8k
            while (literal_count_ > 0) {
334
83.4k
                bool result = bit_reader_.GetValue(bit_width_, &current_value_);
335
83.4k
                DCHECK(result);
336
83.4k
                if (current_value_ != *val || rem == 0) {
337
20.6k
                    bit_reader_.Rewind(bit_width_);
338
20.6k
                    return ret;
339
20.6k
                }
340
62.8k
                ret++;
341
62.8k
                rem--;
342
62.8k
                literal_count_--;
343
62.8k
            }
344
20.9k
        }
345
107k
    }
346
552
    return ret;
347
67.8k
}
_ZN5doris10RleDecoderIsE10GetNextRunEPsm
Line
Count
Source
302
14
size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
303
14
    DCHECK(bit_reader_.is_initialized());
304
14
    DCHECK_GT(max_run, 0);
305
14
    size_t ret = 0;
306
14
    size_t rem = max_run;
307
14
    while (ReadHeader()) {
308
14
        if (PREDICT_TRUE(repeat_count_ > 0)) {
309
14
            if (PREDICT_FALSE(ret > 0 && *val != current_value_)) {
310
0
                return ret;
311
0
            }
312
14
            *val = current_value_;
313
14
            if (repeat_count_ >= rem) {
314
                // The next run is longer than the amount of remaining data
315
                // that the caller wants to read. Only consume it partially.
316
14
                repeat_count_ -= rem;
317
14
                ret += rem;
318
14
                return ret;
319
14
            }
320
0
            ret += repeat_count_;
321
0
            rem -= repeat_count_;
322
0
            repeat_count_ = 0;
323
0
        } else {
324
0
            DCHECK(literal_count_ > 0);
325
0
            if (ret == 0) {
326
0
                bool has_more = bit_reader_.GetValue(bit_width_, val);
327
0
                DCHECK(has_more);
328
0
                literal_count_--;
329
0
                ret++;
330
0
                rem--;
331
0
            }
332
333
0
            while (literal_count_ > 0) {
334
0
                bool result = bit_reader_.GetValue(bit_width_, &current_value_);
335
0
                DCHECK(result);
336
0
                if (current_value_ != *val || rem == 0) {
337
0
                    bit_reader_.Rewind(bit_width_);
338
0
                    return ret;
339
0
                }
340
0
                ret++;
341
0
                rem--;
342
0
                literal_count_--;
343
0
            }
344
0
        }
345
14
    }
346
0
    return ret;
347
14
}
348
349
template <typename T>
350
78
size_t RleDecoder<T>::get_values(T* values, size_t num_values) {
351
78
    size_t read_num = 0;
352
240
    while (read_num < num_values) {
353
162
        size_t read_this_time = num_values - read_num;
354
355
162
        if (LIKELY(repeat_count_ > 0)) {
356
66
            read_this_time = std::min((size_t)repeat_count_, read_this_time);
357
66
            std::fill(values, values + read_this_time, current_value_);
358
66
            values += read_this_time;
359
66
            repeat_count_ -= read_this_time;
360
66
            read_num += read_this_time;
361
96
        } else if (literal_count_ > 0) {
362
16
            read_this_time = std::min((size_t)literal_count_, read_this_time);
363
118
            for (int i = 0; i < read_this_time; ++i) {
364
102
                bool result = bit_reader_.GetValue(bit_width_, values);
365
102
                DCHECK(result);
366
102
                values++;
367
102
            }
368
16
            literal_count_ -= read_this_time;
369
16
            read_num += read_this_time;
370
80
        } else {
371
80
            if (!ReadHeader()) {
372
0
                return read_num;
373
0
            }
374
80
        }
375
162
    }
376
78
    return read_num;
377
78
}
_ZN5doris10RleDecoderIhE10get_valuesEPhm
Line
Count
Source
350
10
size_t RleDecoder<T>::get_values(T* values, size_t num_values) {
351
10
    size_t read_num = 0;
352
28
    while (read_num < num_values) {
353
18
        size_t read_this_time = num_values - read_num;
354
355
18
        if (LIKELY(repeat_count_ > 0)) {
356
0
            read_this_time = std::min((size_t)repeat_count_, read_this_time);
357
0
            std::fill(values, values + read_this_time, current_value_);
358
0
            values += read_this_time;
359
0
            repeat_count_ -= read_this_time;
360
0
            read_num += read_this_time;
361
18
        } else if (literal_count_ > 0) {
362
10
            read_this_time = std::min((size_t)literal_count_, read_this_time);
363
80
            for (int i = 0; i < read_this_time; ++i) {
364
70
                bool result = bit_reader_.GetValue(bit_width_, values);
365
70
                DCHECK(result);
366
70
                values++;
367
70
            }
368
10
            literal_count_ -= read_this_time;
369
10
            read_num += read_this_time;
370
10
        } else {
371
8
            if (!ReadHeader()) {
372
0
                return read_num;
373
0
            }
374
8
        }
375
18
    }
376
10
    return read_num;
377
10
}
_ZN5doris10RleDecoderIsE10get_valuesEPsm
Line
Count
Source
350
68
size_t RleDecoder<T>::get_values(T* values, size_t num_values) {
351
68
    size_t read_num = 0;
352
212
    while (read_num < num_values) {
353
144
        size_t read_this_time = num_values - read_num;
354
355
144
        if (LIKELY(repeat_count_ > 0)) {
356
66
            read_this_time = std::min((size_t)repeat_count_, read_this_time);
357
66
            std::fill(values, values + read_this_time, current_value_);
358
66
            values += read_this_time;
359
66
            repeat_count_ -= read_this_time;
360
66
            read_num += read_this_time;
361
78
        } else if (literal_count_ > 0) {
362
6
            read_this_time = std::min((size_t)literal_count_, read_this_time);
363
38
            for (int i = 0; i < read_this_time; ++i) {
364
32
                bool result = bit_reader_.GetValue(bit_width_, values);
365
32
                DCHECK(result);
366
32
                values++;
367
32
            }
368
6
            literal_count_ -= read_this_time;
369
6
            read_num += read_this_time;
370
72
        } else {
371
72
            if (!ReadHeader()) {
372
0
                return read_num;
373
0
            }
374
72
        }
375
144
    }
376
68
    return read_num;
377
68
}
378
379
template <typename T>
380
size_t RleDecoder<T>::repeated_count() {
381
    if (repeat_count_ > 0) {
382
        return repeat_count_;
383
    }
384
    if (literal_count_ == 0) {
385
        ReadHeader();
386
    }
387
    return repeat_count_;
388
}
389
390
template <typename T>
391
T RleDecoder<T>::get_repeated_value(size_t count) {
392
    DCHECK_GE(repeat_count_, count);
393
    repeat_count_ -= count;
394
    return current_value_;
395
}
396
397
template <typename T>
398
10
size_t RleDecoder<T>::Skip(size_t to_skip) {
399
10
    DCHECK(bit_reader_.is_initialized());
400
401
10
    size_t set_count = 0;
402
32
    while (to_skip > 0) {
403
22
        bool result = ReadHeader();
404
22
        DCHECK(result);
405
406
22
        if (PREDICT_TRUE(repeat_count_ > 0)) {
407
14
            size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip;
408
14
            repeat_count_ -= nskip;
409
14
            to_skip -= nskip;
410
14
            if (current_value_ != 0) {
411
6
                set_count += nskip;
412
6
            }
413
14
        } else {
414
8
            DCHECK(literal_count_ > 0);
415
8
            size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip;
416
8
            literal_count_ -= nskip;
417
8
            to_skip -= nskip;
418
120
            for (; nskip > 0; nskip--) {
419
112
                T value = 0;
420
112
                bool result = bit_reader_.GetValue(bit_width_, &value);
421
112
                DCHECK(result);
422
112
                if (value != 0) {
423
56
                    set_count++;
424
56
                }
425
112
            }
426
8
        }
427
22
    }
428
10
    return set_count;
429
10
}
_ZN5doris10RleDecoderIbE4SkipEm
Line
Count
Source
398
8
size_t RleDecoder<T>::Skip(size_t to_skip) {
399
8
    DCHECK(bit_reader_.is_initialized());
400
401
8
    size_t set_count = 0;
402
28
    while (to_skip > 0) {
403
20
        bool result = ReadHeader();
404
20
        DCHECK(result);
405
406
20
        if (PREDICT_TRUE(repeat_count_ > 0)) {
407
14
            size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip;
408
14
            repeat_count_ -= nskip;
409
14
            to_skip -= nskip;
410
14
            if (current_value_ != 0) {
411
6
                set_count += nskip;
412
6
            }
413
14
        } else {
414
6
            DCHECK(literal_count_ > 0);
415
6
            size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip;
416
6
            literal_count_ -= nskip;
417
6
            to_skip -= nskip;
418
112
            for (; nskip > 0; nskip--) {
419
106
                T value = 0;
420
106
                bool result = bit_reader_.GetValue(bit_width_, &value);
421
106
                DCHECK(result);
422
106
                if (value != 0) {
423
52
                    set_count++;
424
52
                }
425
106
            }
426
6
        }
427
20
    }
428
8
    return set_count;
429
8
}
_ZN5doris10RleDecoderIhE4SkipEm
Line
Count
Source
398
2
size_t RleDecoder<T>::Skip(size_t to_skip) {
399
2
    DCHECK(bit_reader_.is_initialized());
400
401
2
    size_t set_count = 0;
402
4
    while (to_skip > 0) {
403
2
        bool result = ReadHeader();
404
2
        DCHECK(result);
405
406
2
        if (PREDICT_TRUE(repeat_count_ > 0)) {
407
0
            size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip;
408
0
            repeat_count_ -= nskip;
409
0
            to_skip -= nskip;
410
0
            if (current_value_ != 0) {
411
0
                set_count += nskip;
412
0
            }
413
2
        } else {
414
2
            DCHECK(literal_count_ > 0);
415
2
            size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip;
416
2
            literal_count_ -= nskip;
417
2
            to_skip -= nskip;
418
8
            for (; nskip > 0; nskip--) {
419
6
                T value = 0;
420
6
                bool result = bit_reader_.GetValue(bit_width_, &value);
421
6
                DCHECK(result);
422
6
                if (value != 0) {
423
4
                    set_count++;
424
4
                }
425
6
            }
426
2
        }
427
2
    }
428
2
    return set_count;
429
2
}
430
431
// This function buffers input values 8 at a time.  After seeing all 8 values,
432
// it decides whether they should be encoded as a literal or repeated run.
433
template <typename T>
434
1.46M
void RleEncoder<T>::Put(T value, size_t run_length) {
435
1.46M
    DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
436
437
    // TODO(perf): remove the loop and use the repeat_count_
438
6.70M
    for (; run_length > 0; run_length--) {
439
5.23M
        if (PREDICT_TRUE(current_value_ == value)) {
440
5.02M
            ++repeat_count_;
441
5.02M
            if (repeat_count_ > 8) {
442
                // This is just a continuation of the current run, no need to buffer the
443
                // values.
444
                // Note that this is the fast path for long repeated runs.
445
4.57M
                continue;
446
4.57M
            }
447
5.02M
        } else {
448
214k
            if (repeat_count_ >= 8) {
449
                // We had a run that was long enough but it has ended.  Flush the
450
                // current repeated run.
451
42.8k
                DCHECK_EQ(literal_count_, 0);
452
42.8k
                FlushRepeatedRun();
453
42.8k
            }
454
214k
            repeat_count_ = 1;
455
214k
            current_value_ = value;
456
214k
        }
457
458
658k
        buffered_values_[num_buffered_values_] = value;
459
658k
        if (++num_buffered_values_ == 8) {
460
82.0k
            DCHECK_EQ(literal_count_ % 8, 0);
461
82.0k
            FlushBufferedValues(false);
462
82.0k
        }
463
658k
    }
464
1.46M
}
_ZN5doris10RleEncoderImE3PutEmm
Line
Count
Source
434
488k
void RleEncoder<T>::Put(T value, size_t run_length) {
435
488k
    DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
436
437
    // TODO(perf): remove the loop and use the repeat_count_
438
977k
    for (; run_length > 0; run_length--) {
439
488k
        if (PREDICT_TRUE(current_value_ == value)) {
440
334k
            ++repeat_count_;
441
334k
            if (repeat_count_ > 8) {
442
                // This is just a continuation of the current run, no need to buffer the
443
                // values.
444
                // Note that this is the fast path for long repeated runs.
445
280k
                continue;
446
280k
            }
447
334k
        } else {
448
153k
            if (repeat_count_ >= 8) {
449
                // We had a run that was long enough but it has ended.  Flush the
450
                // current repeated run.
451
3.31k
                DCHECK_EQ(literal_count_, 0);
452
3.31k
                FlushRepeatedRun();
453
3.31k
            }
454
153k
            repeat_count_ = 1;
455
153k
            current_value_ = value;
456
153k
        }
457
458
207k
        buffered_values_[num_buffered_values_] = value;
459
207k
        if (++num_buffered_values_ == 8) {
460
25.8k
            DCHECK_EQ(literal_count_ % 8, 0);
461
25.8k
            FlushBufferedValues(false);
462
25.8k
        }
463
207k
    }
464
488k
}
_ZN5doris10RleEncoderIbE3PutEbm
Line
Count
Source
434
975k
void RleEncoder<T>::Put(T value, size_t run_length) {
435
975k
    DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
436
437
    // TODO(perf): remove the loop and use the repeat_count_
438
5.72M
    for (; run_length > 0; run_length--) {
439
4.74M
        if (PREDICT_TRUE(current_value_ == value)) {
440
4.68M
            ++repeat_count_;
441
4.68M
            if (repeat_count_ > 8) {
442
                // This is just a continuation of the current run, no need to buffer the
443
                // values.
444
                // Note that this is the fast path for long repeated runs.
445
4.29M
                continue;
446
4.29M
            }
447
4.68M
        } else {
448
60.1k
            if (repeat_count_ >= 8) {
449
                // We had a run that was long enough but it has ended.  Flush the
450
                // current repeated run.
451
39.5k
                DCHECK_EQ(literal_count_, 0);
452
39.5k
                FlushRepeatedRun();
453
39.5k
            }
454
60.1k
            repeat_count_ = 1;
455
60.1k
            current_value_ = value;
456
60.1k
        }
457
458
450k
        buffered_values_[num_buffered_values_] = value;
459
450k
        if (++num_buffered_values_ == 8) {
460
56.1k
            DCHECK_EQ(literal_count_ % 8, 0);
461
56.1k
            FlushBufferedValues(false);
462
56.1k
        }
463
450k
    }
464
975k
}
465
466
template <typename T>
467
34.8k
void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
468
34.8k
    if (literal_indicator_byte_idx_ < 0) {
469
        // The literal indicator byte has not been reserved yet, get one now.
470
2.64k
        literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1);
471
2.64k
        DCHECK_GE(literal_indicator_byte_idx_, 0);
472
2.64k
    }
473
474
    // Write all the buffered values as bit packed literals
475
296k
    for (int i = 0; i < num_buffered_values_; ++i) {
476
261k
        bit_writer_.PutValue(buffered_values_[i], bit_width_);
477
261k
    }
478
34.8k
    num_buffered_values_ = 0;
479
480
34.8k
    if (update_indicator_byte) {
481
        // At this point we need to write the indicator byte for the literal run.
482
        // We only reserve one byte, to allow for streaming writes of literal values.
483
        // The logic makes sure we flush literal runs often enough to not overrun
484
        // the 1 byte.
485
2.64k
        int num_groups = BitUtil::Ceil(literal_count_, 8);
486
2.64k
        int32_t indicator_value = (num_groups << 1) | 1;
487
2.64k
        DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
488
2.64k
        bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value;
489
2.64k
        literal_indicator_byte_idx_ = -1;
490
2.64k
        literal_count_ = 0;
491
2.64k
    }
492
34.8k
}
_ZN5doris10RleEncoderImE15FlushLiteralRunEb
Line
Count
Source
467
24.1k
void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
468
24.1k
    if (literal_indicator_byte_idx_ < 0) {
469
        // The literal indicator byte has not been reserved yet, get one now.
470
2.24k
        literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1);
471
2.24k
        DCHECK_GE(literal_indicator_byte_idx_, 0);
472
2.24k
    }
473
474
    // Write all the buffered values as bit packed literals
475
202k
    for (int i = 0; i < num_buffered_values_; ++i) {
476
177k
        bit_writer_.PutValue(buffered_values_[i], bit_width_);
477
177k
    }
478
24.1k
    num_buffered_values_ = 0;
479
480
24.1k
    if (update_indicator_byte) {
481
        // At this point we need to write the indicator byte for the literal run.
482
        // We only reserve one byte, to allow for streaming writes of literal values.
483
        // The logic makes sure we flush literal runs often enough to not overrun
484
        // the 1 byte.
485
2.24k
        int num_groups = BitUtil::Ceil(literal_count_, 8);
486
2.24k
        int32_t indicator_value = (num_groups << 1) | 1;
487
2.24k
        DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
488
2.24k
        bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value;
489
2.24k
        literal_indicator_byte_idx_ = -1;
490
2.24k
        literal_count_ = 0;
491
2.24k
    }
492
24.1k
}
_ZN5doris10RleEncoderIbE15FlushLiteralRunEb
Line
Count
Source
467
10.6k
void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
468
10.6k
    if (literal_indicator_byte_idx_ < 0) {
469
        // The literal indicator byte has not been reserved yet, get one now.
470
401
        literal_indicator_byte_idx_ = bit_writer_.GetByteIndexAndAdvance(1);
471
401
        DCHECK_GE(literal_indicator_byte_idx_, 0);
472
401
    }
473
474
    // Write all the buffered values as bit packed literals
475
94.1k
    for (int i = 0; i < num_buffered_values_; ++i) {
476
83.4k
        bit_writer_.PutValue(buffered_values_[i], bit_width_);
477
83.4k
    }
478
10.6k
    num_buffered_values_ = 0;
479
480
10.6k
    if (update_indicator_byte) {
481
        // At this point we need to write the indicator byte for the literal run.
482
        // We only reserve one byte, to allow for streaming writes of literal values.
483
        // The logic makes sure we flush literal runs often enough to not overrun
484
        // the 1 byte.
485
401
        int num_groups = BitUtil::Ceil(literal_count_, 8);
486
401
        int32_t indicator_value = (num_groups << 1) | 1;
487
401
        DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
488
401
        bit_writer_.buffer()->data()[literal_indicator_byte_idx_] = indicator_value;
489
401
        literal_indicator_byte_idx_ = -1;
490
401
        literal_count_ = 0;
491
401
    }
492
10.6k
}
493
494
template <typename T>
495
44.0k
void RleEncoder<T>::FlushRepeatedRun() {
496
44.0k
    DCHECK_GT(repeat_count_, 0);
497
    // The lsb of 0 indicates this is a repeated run
498
44.0k
    int32_t indicator_value = repeat_count_ << 1 | 0;
499
44.0k
    bit_writer_.PutVlqInt(indicator_value);
500
44.0k
    bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
501
44.0k
    num_buffered_values_ = 0;
502
44.0k
    repeat_count_ = 0;
503
44.0k
}
_ZN5doris10RleEncoderImE16FlushRepeatedRunEv
Line
Count
Source
495
3.83k
void RleEncoder<T>::FlushRepeatedRun() {
496
3.83k
    DCHECK_GT(repeat_count_, 0);
497
    // The lsb of 0 indicates this is a repeated run
498
3.83k
    int32_t indicator_value = repeat_count_ << 1 | 0;
499
3.83k
    bit_writer_.PutVlqInt(indicator_value);
500
3.83k
    bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
501
3.83k
    num_buffered_values_ = 0;
502
3.83k
    repeat_count_ = 0;
503
3.83k
}
_ZN5doris10RleEncoderIbE16FlushRepeatedRunEv
Line
Count
Source
495
40.1k
void RleEncoder<T>::FlushRepeatedRun() {
496
40.1k
    DCHECK_GT(repeat_count_, 0);
497
    // The lsb of 0 indicates this is a repeated run
498
40.1k
    int32_t indicator_value = repeat_count_ << 1 | 0;
499
40.1k
    bit_writer_.PutVlqInt(indicator_value);
500
40.1k
    bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
501
40.1k
    num_buffered_values_ = 0;
502
40.1k
    repeat_count_ = 0;
503
40.1k
}
504
505
// Flush the values that have been buffered.  At this point we decide whether
506
// we need to switch between the run types or continue the current one.
507
template <typename T>
508
82.0k
void RleEncoder<T>::FlushBufferedValues(bool done) {
509
82.0k
    if (repeat_count_ >= 8) {
510
        // Clear the buffered values.  They are part of the repeated run now and we
511
        // don't want to flush them out as literals.
512
49.5k
        num_buffered_values_ = 0;
513
49.5k
        if (literal_count_ != 0) {
514
            // There was a current literal run.  All the values in it have been flushed
515
            // but we still need to update the indicator byte.
516
1.82k
            DCHECK_EQ(literal_count_ % 8, 0);
517
1.82k
            DCHECK_EQ(repeat_count_, 8);
518
1.82k
            FlushLiteralRun(true);
519
1.82k
        }
520
49.5k
        DCHECK_EQ(literal_count_, 0);
521
49.5k
        return;
522
49.5k
    }
523
524
32.5k
    literal_count_ += num_buffered_values_;
525
32.5k
    int num_groups = BitUtil::Ceil(literal_count_, 8);
526
32.5k
    if (num_groups + 1 >= (1 << 6)) {
527
        // We need to start a new literal run because the indicator byte we've reserved
528
        // cannot store more values.
529
334
        DCHECK_GE(literal_indicator_byte_idx_, 0);
530
334
        FlushLiteralRun(true);
531
32.1k
    } else {
532
32.1k
        FlushLiteralRun(done);
533
32.1k
    }
534
32.5k
    repeat_count_ = 0;
535
32.5k
}
_ZN5doris10RleEncoderImE19FlushBufferedValuesEb
Line
Count
Source
508
25.8k
void RleEncoder<T>::FlushBufferedValues(bool done) {
509
25.8k
    if (repeat_count_ >= 8) {
510
        // Clear the buffered values.  They are part of the repeated run now and we
511
        // don't want to flush them out as literals.
512
3.70k
        num_buffered_values_ = 0;
513
3.70k
        if (literal_count_ != 0) {
514
            // There was a current literal run.  All the values in it have been flushed
515
            // but we still need to update the indicator byte.
516
1.71k
            DCHECK_EQ(literal_count_ % 8, 0);
517
1.71k
            DCHECK_EQ(repeat_count_, 8);
518
1.71k
            FlushLiteralRun(true);
519
1.71k
        }
520
3.70k
        DCHECK_EQ(literal_count_, 0);
521
3.70k
        return;
522
3.70k
    }
523
524
22.1k
    literal_count_ += num_buffered_values_;
525
22.1k
    int num_groups = BitUtil::Ceil(literal_count_, 8);
526
22.1k
    if (num_groups + 1 >= (1 << 6)) {
527
        // We need to start a new literal run because the indicator byte we've reserved
528
        // cannot store more values.
529
256
        DCHECK_GE(literal_indicator_byte_idx_, 0);
530
256
        FlushLiteralRun(true);
531
21.9k
    } else {
532
21.9k
        FlushLiteralRun(done);
533
21.9k
    }
534
22.1k
    repeat_count_ = 0;
535
22.1k
}
_ZN5doris10RleEncoderIbE19FlushBufferedValuesEb
Line
Count
Source
508
56.1k
void RleEncoder<T>::FlushBufferedValues(bool done) {
509
56.1k
    if (repeat_count_ >= 8) {
510
        // Clear the buffered values.  They are part of the repeated run now and we
511
        // don't want to flush them out as literals.
512
45.8k
        num_buffered_values_ = 0;
513
45.8k
        if (literal_count_ != 0) {
514
            // There was a current literal run.  All the values in it have been flushed
515
            // but we still need to update the indicator byte.
516
108
            DCHECK_EQ(literal_count_ % 8, 0);
517
108
            DCHECK_EQ(repeat_count_, 8);
518
108
            FlushLiteralRun(true);
519
108
        }
520
45.8k
        DCHECK_EQ(literal_count_, 0);
521
45.8k
        return;
522
45.8k
    }
523
524
10.3k
    literal_count_ += num_buffered_values_;
525
10.3k
    int num_groups = BitUtil::Ceil(literal_count_, 8);
526
10.3k
    if (num_groups + 1 >= (1 << 6)) {
527
        // We need to start a new literal run because the indicator byte we've reserved
528
        // cannot store more values.
529
78
        DCHECK_GE(literal_indicator_byte_idx_, 0);
530
78
        FlushLiteralRun(true);
531
10.2k
    } else {
532
10.2k
        FlushLiteralRun(done);
533
10.2k
    }
534
10.3k
    repeat_count_ = 0;
535
10.3k
}
536
537
template <typename T>
538
0
void RleEncoder<T>::Reserve(int num_bytes, uint8_t val) {
539
0
    for (int i = 0; i < num_bytes; ++i) {
540
0
        bit_writer_.PutValue(val, 8);
541
0
    }
542
0
}
543
544
template <typename T>
545
1.60k
int RleEncoder<T>::Flush() {
546
1.60k
    if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
547
1.60k
        bool all_repeat = literal_count_ == 0 &&
548
1.60k
                          (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
549
        // There is something pending, figure out if it's a repeated or literal run
550
1.60k
        if (repeat_count_ > 0 && all_repeat) {
551
1.12k
            FlushRepeatedRun();
552
1.12k
        } else {
553
483
            literal_count_ += num_buffered_values_;
554
483
            FlushLiteralRun(true);
555
483
            repeat_count_ = 0;
556
483
        }
557
1.60k
    }
558
1.60k
    bit_writer_.Flush();
559
1.60k
    DCHECK_EQ(num_buffered_values_, 0);
560
1.60k
    DCHECK_EQ(literal_count_, 0);
561
1.60k
    DCHECK_EQ(repeat_count_, 0);
562
1.60k
    return bit_writer_.bytes_written();
563
1.60k
}
_ZN5doris10RleEncoderImE5FlushEv
Line
Count
Source
545
788
int RleEncoder<T>::Flush() {
546
788
    if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
547
788
        bool all_repeat = literal_count_ == 0 &&
548
788
                          (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
549
        // There is something pending, figure out if it's a repeated or literal run
550
788
        if (repeat_count_ > 0 && all_repeat) {
551
520
            FlushRepeatedRun();
552
520
        } else {
553
268
            literal_count_ += num_buffered_values_;
554
268
            FlushLiteralRun(true);
555
268
            repeat_count_ = 0;
556
268
        }
557
788
    }
558
788
    bit_writer_.Flush();
559
788
    DCHECK_EQ(num_buffered_values_, 0);
560
788
    DCHECK_EQ(literal_count_, 0);
561
788
    DCHECK_EQ(repeat_count_, 0);
562
788
    return bit_writer_.bytes_written();
563
788
}
_ZN5doris10RleEncoderIbE5FlushEv
Line
Count
Source
545
820
int RleEncoder<T>::Flush() {
546
820
    if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
547
816
        bool all_repeat = literal_count_ == 0 &&
548
816
                          (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
549
        // There is something pending, figure out if it's a repeated or literal run
550
816
        if (repeat_count_ > 0 && all_repeat) {
551
601
            FlushRepeatedRun();
552
601
        } else {
553
215
            literal_count_ += num_buffered_values_;
554
215
            FlushLiteralRun(true);
555
215
            repeat_count_ = 0;
556
215
        }
557
816
    }
558
820
    bit_writer_.Flush();
559
820
    DCHECK_EQ(num_buffered_values_, 0);
560
820
    DCHECK_EQ(literal_count_, 0);
561
820
    DCHECK_EQ(repeat_count_, 0);
562
820
    return bit_writer_.bytes_written();
563
820
}
564
565
template <typename T>
566
13.0k
void RleEncoder<T>::Clear() {
567
13.0k
    current_value_ = 0;
568
13.0k
    repeat_count_ = 0;
569
13.0k
    num_buffered_values_ = 0;
570
13.0k
    literal_count_ = 0;
571
13.0k
    literal_indicator_byte_idx_ = -1;
572
13.0k
    bit_writer_.Clear();
573
13.0k
}
_ZN5doris10RleEncoderImE5ClearEv
Line
Count
Source
566
788
void RleEncoder<T>::Clear() {
567
788
    current_value_ = 0;
568
788
    repeat_count_ = 0;
569
788
    num_buffered_values_ = 0;
570
788
    literal_count_ = 0;
571
788
    literal_indicator_byte_idx_ = -1;
572
788
    bit_writer_.Clear();
573
788
}
_ZN5doris10RleEncoderIbE5ClearEv
Line
Count
Source
566
12.2k
void RleEncoder<T>::Clear() {
567
12.2k
    current_value_ = 0;
568
12.2k
    repeat_count_ = 0;
569
12.2k
    num_buffered_values_ = 0;
570
12.2k
    literal_count_ = 0;
571
12.2k
    literal_indicator_byte_idx_ = -1;
572
12.2k
    bit_writer_.Clear();
573
12.2k
}
574
575
// Copy from https://github.com/apache/impala/blob/master/be/src/util/rle-encoding.h
576
// Utility classes to do run length encoding (RLE) for fixed bit width values.  If runs
577
// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
578
// (literal encoding).
579
//
580
// For both types of runs, there is a byte-aligned indicator which encodes the length
581
// of the run and the type of the run.
582
//
583
// This encoding has the benefit that when there aren't any long enough runs, values
584
// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
585
// the run length are byte aligned. This allows for very efficient decoding
586
// implementations.
587
// The encoding is:
588
//    encoded-block := run*
589
//    run := literal-run | repeated-run
590
//    literal-run := literal-indicator < literal bytes >
591
//    repeated-run := repeated-indicator < repeated value. padded to byte boundary >
592
//    literal-indicator := varint_encode( number_of_groups << 1 | 1)
593
//    repeated-indicator := varint_encode( number_of_repetitions << 1 )
594
//
595
// Each run is preceded by a varint. The varint's least significant bit is
596
// used to indicate whether the run is a literal run or a repeated run. The rest
597
// of the varint is used to determine the length of the run (eg how many times the
598
// value repeats).
599
//
600
// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
601
// in groups of 8), so that no matter the bit-width of the value, the sequence will end
602
// on a byte boundary without padding.
603
// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
604
// the actual number of encoded ints. (This means that the total number of encoded values
605
// can not be determined from the encoded data, since the number of values in the last
606
// group may not be a multiple of 8). For the last group of literal runs, we pad
607
// the group to 8 with zeros. This allows for 8 at a time decoding on the read side
608
// without the need for additional checks.
609
//
610
// There is a break-even point when it is more storage efficient to do run length
611
// encoding.  For 1 bit-width values, that point is 8 values.  They require 2 bytes
612
// for both the repeated encoding or the literal encoding.  This value can always
613
// be computed based on the bit-width.
614
// TODO: For 1 bit-width values it can be optimal to use 16 or 24 values, but more
615
// investigation is needed to do this efficiently, see the reverted IMPALA-6658.
616
// TODO: think about how to use this for strings.  The bit packing isn't quite the same.
617
//
618
// Examples with bit-width 1 (eg encoding booleans):
619
// ----------------------------------------
620
// 100 1s followed by 100 0s:
621
// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
622
//  - (total 4 bytes)
623
//
624
// alternating 1s and 0s (200 total):
625
// 200 ints = 25 groups of 8
626
// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
627
// (total 26 bytes, 1 byte overhead)
628
629
// RLE decoder with a batch-oriented interface that enables fast decoding.
630
// Users of this class must first initialize the class to point to a buffer of
631
// RLE-encoded data, passed into the constructor or Reset(). The provided
632
// bit_width must be at most min(sizeof(T) * 8, BatchedBitReader::MAX_BITWIDTH).
633
// Then they can decode data by checking NextNumRepeats()/NextNumLiterals() to
634
// see if the next run is a repeated or literal run, then calling
635
// GetRepeatedValue() or GetLiteralValues() respectively to read the values.
636
//
637
// End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0.
638
// Other decoding errors are signalled by functions returning false. If an
639
// error is encountered then it is not valid to read any more data until
640
// Reset() is called.
641
642
//bit-packed-run-len and rle-run-len must be in the range [1, 2^31 - 1].
643
// This means that a Parquet implementation can always store the run length in a signed 32-bit integer.
644
template <typename T>
645
class RleBatchDecoder {
646
public:
647
70
    RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) {
648
70
        Reset(buffer, buffer_len, bit_width);
649
70
    }
650
651
    RleBatchDecoder() = default;
652
653
    // Reset the decoder to read from a new buffer.
654
    void Reset(uint8_t* buffer, int buffer_len, int bit_width);
655
656
    // Return the size of the current repeated run. Returns zero if the current run is
657
    // a literal run or if no more runs can be read from the input.
658
    int32_t NextNumRepeats();
659
660
    // Get the value of the current repeated run and consume the given number of repeats.
661
    // Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot
662
    // be greater than the remaining number of repeats in the run. 'num_repeats_to_consume'
663
    // can be set to 0 to peek at the value without consuming repeats.
664
    T GetRepeatedValue(int32_t num_repeats_to_consume);
665
666
    // Return the size of the current literal run. Returns zero if the current run is
667
    // a repeated run or if no more runs can be read from the input.
668
    int32_t NextNumLiterals();
669
670
    // Consume 'num_literals_to_consume' literals from the current literal run,
671
    // copying the values to 'values'. 'num_literals_to_consume' must be <=
672
    // NextNumLiterals(). Returns true if the requested number of literals were
673
    // successfully read or false if an error was encountered, e.g. the input was
674
    // truncated.
675
    bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT;
676
677
    // Consume 'num_values_to_consume' values and copy them to 'values'.
678
    // Returns the number of consumed values or 0 if an error occurred.
679
    uint32_t GetBatch(T* values, uint32_t batch_num);
680
681
private:
682
    // Called when both 'literal_count_' and 'repeat_count_' have been exhausted.
683
    // Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal
684
    // or repeated run, or leaves both at 0 if no more values can be read (either because
685
    // the end of the input was reached or an error was encountered decoding).
686
    void NextCounts();
687
688
    /// Fill the literal buffer. Invalid to call if there are already buffered literals.
689
    /// Return false if the input was truncated. This does not advance 'literal_count_'.
690
    bool FillLiteralBuffer() WARN_UNUSED_RESULT;
691
692
40
    bool HaveBufferedLiterals() const { return literal_buffer_pos_ < num_buffered_literals_; }
693
694
    /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing
695
    /// 'literal_count_'. Returns the number of literals outputted.
696
    int32_t OutputBufferedLiterals(int32_t max_to_output, T* values);
697
698
    BatchedBitReader bit_reader_;
699
700
    // Number of bits needed to encode the value. Must be between 0 and 64 after
701
    // the decoder is initialized with a buffer. -1 indicates the decoder was not
702
    // initialized.
703
    int bit_width_ = -1;
704
705
    // If a repeated run, the number of repeats remaining in the current run to be read.
706
    // If the current run is a literal run, this is 0.
707
    int32_t repeat_count_ = 0;
708
709
    // If a literal run, the number of literals remaining in the current run to be read.
710
    // If the current run is a repeated run, this is 0.
711
    int32_t literal_count_ = 0;
712
713
    // If a repeated run, the current repeated value.
714
    T repeated_value_;
715
716
    // Size of buffer for literal values. Large enough to decode a full batch of 32
717
    // literals. The buffer is needed to allow clients to read in batches that are not
718
    // multiples of 32.
719
    static constexpr int LITERAL_BUFFER_LEN = 32;
720
721
    // Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' is the
722
    // position of the next literal to be read from the buffer.
723
    T literal_buffer_[LITERAL_BUFFER_LEN];
724
    int num_buffered_literals_ = 0;
725
    int literal_buffer_pos_ = 0;
726
};
727
728
template <typename T>
729
40
int32_t RleBatchDecoder<T>::OutputBufferedLiterals(int32_t max_to_output, T* values) {
730
40
    int32_t num_to_output =
731
40
            std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_);
732
40
    memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * num_to_output);
733
40
    literal_buffer_pos_ += num_to_output;
734
40
    literal_count_ -= num_to_output;
735
40
    return num_to_output;
736
40
}
737
738
template <typename T>
739
70
void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) {
740
70
    bit_reader_.Reset(buffer, buffer_len);
741
70
    bit_width_ = bit_width;
742
70
    repeat_count_ = 0;
743
70
    literal_count_ = 0;
744
70
    num_buffered_literals_ = 0;
745
70
    literal_buffer_pos_ = 0;
746
70
}
747
748
template <typename T>
749
112
int32_t RleBatchDecoder<T>::NextNumRepeats() {
750
112
    if (repeat_count_ > 0) return repeat_count_;
751
108
    if (literal_count_ == 0) NextCounts();
752
108
    return repeat_count_;
753
112
}
754
755
template <typename T>
756
108
void RleBatchDecoder<T>::NextCounts() {
757
    // Read the next run's indicator int, it could be a literal or repeated run.
758
    // The int is encoded as a ULEB128-encoded value.
759
108
    uint32_t indicator_value = 0;
760
108
    if (UNLIKELY(!bit_reader_.GetUleb128<uint32_t>(&indicator_value))) {
761
0
        return;
762
0
    }
763
764
    // lsb indicates if it is a literal run or repeated run
765
108
    bool is_literal = indicator_value & 1;
766
767
    // Don't try to handle run lengths that don't fit in an int32_t - just fail gracefully.
768
    // The Parquet standard does not allow longer runs - see PARQUET-1290.
769
108
    uint32_t run_len = indicator_value >> 1;
770
108
    if (is_literal) {
771
        // Use int64_t to avoid overflowing multiplication.
772
40
        int64_t literal_count = static_cast<int64_t>(run_len) * 8;
773
40
        if (UNLIKELY(literal_count > std::numeric_limits<int32_t>::max())) return;
774
40
        literal_count_ = literal_count;
775
68
    } else {
776
68
        if (UNLIKELY(run_len == 0)) return;
777
68
        bool result = bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_);
778
68
        if (UNLIKELY(!result)) return;
779
68
        repeat_count_ = run_len;
780
68
    }
781
108
}
782
783
template <typename T>
784
72
T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) {
785
72
    repeat_count_ -= num_repeats_to_consume;
786
72
    return repeated_value_;
787
72
}
788
789
template <typename T>
790
40
int32_t RleBatchDecoder<T>::NextNumLiterals() {
791
40
    if (literal_count_ > 0) return literal_count_;
792
0
    if (repeat_count_ == 0) NextCounts();
793
0
    return literal_count_;
794
40
}
795
796
template <typename T>
797
40
bool RleBatchDecoder<T>::GetLiteralValues(int32_t num_literals_to_consume, T* values) {
798
40
    int32_t num_consumed = 0;
799
    // Copy any buffered literals left over from previous calls.
800
40
    if (HaveBufferedLiterals()) {
801
0
        num_consumed = OutputBufferedLiterals(num_literals_to_consume, values);
802
0
    }
803
804
40
    int32_t num_remaining = num_literals_to_consume - num_consumed;
805
    // Copy literals directly to the output, bypassing 'literal_buffer_' when possible.
806
    // Need to round to a batch of 32 if the caller is consuming only part of the current
807
    // run avoid ending on a non-byte boundary.
808
40
    int32_t num_to_bypass =
809
40
            std::min<int32_t>(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32));
810
40
    if (num_to_bypass > 0) {
811
0
        int num_read = bit_reader_.UnpackBatch(bit_width_, num_to_bypass, values + num_consumed);
812
        // If we couldn't read the expected number, that means the input was truncated.
813
0
        if (num_read < num_to_bypass) return false;
814
0
        literal_count_ -= num_to_bypass;
815
0
        num_consumed += num_to_bypass;
816
0
        num_remaining = num_literals_to_consume - num_consumed;
817
0
    }
818
819
40
    if (num_remaining > 0) {
820
        // We weren't able to copy all the literals requested directly from the input.
821
        // Buffer literals and copy over the requested number.
822
40
        if (UNLIKELY(!FillLiteralBuffer())) return false;
823
40
        OutputBufferedLiterals(num_remaining, values + num_consumed);
824
40
    }
825
40
    return true;
826
40
}
827
828
template <typename T>
829
40
bool RleBatchDecoder<T>::FillLiteralBuffer() {
830
40
    int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN, literal_count_);
831
40
    num_buffered_literals_ = bit_reader_.UnpackBatch(bit_width_, num_to_buffer, literal_buffer_);
832
    // If we couldn't read the expected number, that means the input was truncated.
833
40
    if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false;
834
40
    literal_buffer_pos_ = 0;
835
40
    return true;
836
40
}
837
838
template <typename T>
839
74
uint32_t RleBatchDecoder<T>::GetBatch(T* values, uint32_t batch_num) {
840
74
    uint32_t num_consumed = 0;
841
186
    while (num_consumed < batch_num) {
842
        // Add RLE encoded values by repeating the current value this number of times.
843
112
        uint32_t num_repeats = NextNumRepeats();
844
112
        if (num_repeats > 0) {
845
72
            int32_t num_repeats_to_set = std::min(num_repeats, batch_num - num_consumed);
846
72
            T repeated_value = GetRepeatedValue(num_repeats_to_set);
847
584
            for (int i = 0; i < num_repeats_to_set; ++i) {
848
512
                values[num_consumed + i] = repeated_value;
849
512
            }
850
72
            num_consumed += num_repeats_to_set;
851
72
            continue;
852
72
        }
853
854
        // Add remaining literal values, if any.
855
40
        uint32_t num_literals = NextNumLiterals();
856
40
        if (num_literals == 0) {
857
0
            break;
858
0
        }
859
40
        uint32_t num_literals_to_set = std::min(num_literals, batch_num - num_consumed);
860
40
        if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) {
861
0
            return 0;
862
0
        }
863
40
        num_consumed += num_literals_to_set;
864
40
    }
865
74
    return num_consumed;
866
74
}
867
868
} // namespace doris