Coverage Report

Created: 2025-07-23 22:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/util/rle_encoding.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#pragma once
18
19
#include <glog/logging.h>
20
21
#include <limits> // IWYU pragma: keep
22
23
#include "common/cast_set.h"
24
#include "util/bit_stream_utils.inline.h"
25
#include "util/bit_util.h"
26
27
namespace doris {
28
#include "common/compile_check_begin.h"
29
30
// Utility classes to do run length encoding (RLE) for fixed bit width values.  If runs
31
// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
32
// (literal encoding).
33
// For both types of runs, there is a byte-aligned indicator which encodes the length
34
// of the run and the type of the run.
35
// This encoding has the benefit that when there aren't any long enough runs, values
36
// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
37
// the run length are byte aligned. This allows for very efficient decoding
38
// implementations.
39
// The encoding is:
40
//    encoded-block := run*
41
//    run := literal-run | repeated-run
42
//    literal-run := literal-indicator < literal bytes >
43
//    repeated-run := repeated-indicator < repeated value. padded to byte boundary >
44
//    literal-indicator := varint_encode( number_of_groups << 1 | 1)
45
//    repeated-indicator := varint_encode( number_of_repetitions << 1 )
46
//
47
// Each run is preceded by a varint. The varint's least significant bit is
48
// used to indicate whether the run is a literal run or a repeated run. The rest
49
// of the varint is used to determine the length of the run (eg how many times the
50
// value repeats).
51
//
52
// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
53
// in groups of 8), so that no matter the bit-width of the value, the sequence will end
54
// on a byte boundary without padding.
55
// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
56
// the actual number of encoded ints. (This means that the total number of encoded values
57
// can not be determined from the encoded data, since the number of values in the last
58
// group may not be a multiple of 8).
59
// There is a break-even point when it is more storage efficient to do run length
60
// encoding.  For 1 bit-width values, that point is 8 values.  They require 2 bytes
61
// for both the repeated encoding or the literal encoding.  This value can always
62
// be computed based on the bit-width.
63
// TODO: think about how to use this for strings.  The bit packing isn't quite the same.
64
//
65
// Examples with bit-width 1 (eg encoding booleans):
66
// ----------------------------------------
67
// 100 1s followed by 100 0s:
68
// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
69
//  - (total 4 bytes)
70
//
71
// alternating 1s and 0s (200 total):
72
// 200 ints = 25 groups of 8
73
// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
74
// (total 26 bytes, 1 byte overhead)
75
//
76
77
// Decoder class for RLE encoded data.
78
//
79
// NOTE: the encoded format does not have any length prefix or any other way of
80
// indicating that the encoded sequence ends at a certain point, so the Decoder
81
// methods may return some extra bits at the end before the read methods start
82
// to return 0/false.
83
template <typename T>
84
class RleDecoder {
85
public:
86
    // Create a decoder object. buffer/buffer_len is the decoded data.
87
    // bit_width is the width of each value (before encoding).
88
    RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
89
922
            : bit_reader_(buffer, buffer_len),
90
922
              bit_width_(bit_width),
91
922
              current_value_(0),
92
922
              repeat_count_(0),
93
922
              literal_count_(0),
94
922
              rewind_state_(CANT_REWIND) {
95
922
        DCHECK_GE(bit_width_, 1);
96
922
        DCHECK_LE(bit_width_, 64);
97
922
    }
_ZN5doris10RleDecoderImEC2EPKhii
Line
Count
Source
89
394
            : bit_reader_(buffer, buffer_len),
90
394
              bit_width_(bit_width),
91
394
              current_value_(0),
92
394
              repeat_count_(0),
93
394
              literal_count_(0),
94
394
              rewind_state_(CANT_REWIND) {
95
394
        DCHECK_GE(bit_width_, 1);
96
        DCHECK_LE(bit_width_, 64);
97
394
    }
_ZN5doris10RleDecoderIbEC2EPKhii
Line
Count
Source
89
409
            : bit_reader_(buffer, buffer_len),
90
409
              bit_width_(bit_width),
91
409
              current_value_(0),
92
409
              repeat_count_(0),
93
409
              literal_count_(0),
94
409
              rewind_state_(CANT_REWIND) {
95
409
        DCHECK_GE(bit_width_, 1);
96
        DCHECK_LE(bit_width_, 64);
97
409
    }
_ZN5doris10RleDecoderIhEC2EPKhii
Line
Count
Source
89
5
            : bit_reader_(buffer, buffer_len),
90
5
              bit_width_(bit_width),
91
5
              current_value_(0),
92
5
              repeat_count_(0),
93
5
              literal_count_(0),
94
5
              rewind_state_(CANT_REWIND) {
95
5
        DCHECK_GE(bit_width_, 1);
96
        DCHECK_LE(bit_width_, 64);
97
5
    }
_ZN5doris10RleDecoderIsEC2EPKhii
Line
Count
Source
89
114
            : bit_reader_(buffer, buffer_len),
90
114
              bit_width_(bit_width),
91
114
              current_value_(0),
92
114
              repeat_count_(0),
93
114
              literal_count_(0),
94
114
              rewind_state_(CANT_REWIND) {
95
114
        DCHECK_GE(bit_width_, 1);
96
        DCHECK_LE(bit_width_, 64);
97
114
    }
98
99
27.6k
    RleDecoder() {}
_ZN5doris10RleDecoderIbEC2Ev
Line
Count
Source
99
27.3k
    RleDecoder() {}
_ZN5doris10RleDecoderIhEC2Ev
Line
Count
Source
99
6
    RleDecoder() {}
_ZN5doris10RleDecoderIsEC2Ev
Line
Count
Source
99
225
    RleDecoder() {}
100
101
    // Skip n values, and returns the number of non-zero entries skipped.
102
    size_t Skip(size_t to_skip);
103
104
    // Gets the next value.  Returns false if there are no more.
105
    bool Get(T* val);
106
107
    // Seek to the previous value.
108
    void RewindOne();
109
110
    // Gets the next run of the same 'val'. Returns 0 if there is no
111
    // more data to be decoded. Will return a run of at most 'max_run'
112
    // values. If there are more values than this, the next call to
113
    // GetNextRun will return more from the same run.
114
    size_t GetNextRun(T* val, size_t max_run);
115
116
    size_t get_values(T* values, size_t num_values);
117
118
    // Get the count of current repeated value
119
    size_t repeated_count();
120
121
    // Get current repeated value, make sure that count equals repeated_count()
122
    T get_repeated_value(size_t count);
123
124
0
    const BitReader& bit_reader() const { return bit_reader_; }
125
126
private:
127
    bool ReadHeader();
128
129
    enum RewindState { REWIND_LITERAL, REWIND_RUN, CANT_REWIND };
130
131
    BitReader bit_reader_;
132
    int bit_width_;
133
    uint64_t current_value_;
134
    uint32_t repeat_count_;
135
    uint32_t literal_count_;
136
    RewindState rewind_state_;
137
};
138
139
// Class to incrementally build the rle data.
140
// The encoding has two modes: encoding repeated runs and literal runs.
141
// If the run is sufficiently short, it is more efficient to encode as a literal run.
142
// This class does so by buffering 8 values at a time.  If they are not all the same
143
// they are added to the literal run.  If they are the same, they are added to the
144
// repeated run.  When we switch modes, the previous run is flushed out.
145
template <typename T>
146
class RleEncoder {
147
public:
148
    // buffer: buffer to write bits to.
149
    // bit_width: max number of bits for value.
150
    // TODO: consider adding a min_repeated_run_length so the caller can control
151
    // when values should be encoded as repeated runs.  Currently this is derived
152
    // based on the bit_width, which can determine a storage optimal choice.
153
    explicit RleEncoder(faststring* buffer, int bit_width)
154
3.89k
            : bit_width_(bit_width), bit_writer_(buffer) {
155
3.89k
        DCHECK_GE(bit_width_, 1);
156
3.89k
        DCHECK_LE(bit_width_, 64);
157
3.89k
        Clear();
158
3.89k
    }
_ZN5doris10RleEncoderImEC2EPNS_10faststringEi
Line
Count
Source
154
394
            : bit_width_(bit_width), bit_writer_(buffer) {
155
394
        DCHECK_GE(bit_width_, 1);
156
        DCHECK_LE(bit_width_, 64);
157
394
        Clear();
158
394
    }
_ZN5doris10RleEncoderIbEC2EPNS_10faststringEi
Line
Count
Source
154
3.49k
            : bit_width_(bit_width), bit_writer_(buffer) {
155
3.49k
        DCHECK_GE(bit_width_, 1);
156
        DCHECK_LE(bit_width_, 64);
157
3.49k
        Clear();
158
3.49k
    }
159
160
    // Reserve 'num_bytes' bytes for a plain encoded header, set each
161
    // byte with 'val': this is used for the RLE-encoded data blocks in
162
    // order to be able to able to store the initial ordinal position
163
    // and number of elements. This is a part of RleEncoder in order to
164
    // maintain the correct offset in 'buffer'.
165
    void Reserve(int num_bytes, uint8_t val);
166
167
    // Encode value. This value must be representable with bit_width_ bits.
168
    void Put(T value, size_t run_length = 1);
169
170
    // Flushes any pending values to the underlying buffer.
171
    // Returns the total number of bytes written
172
    int Flush();
173
174
    // Resets all the state in the encoder.
175
    void Clear();
176
177
302
    int32_t len() const { return bit_writer_.bytes_written(); }
178
179
private:
180
    // Flushes any buffered values.  If this is part of a repeated run, this is largely
181
    // a no-op.
182
    // If it is part of a literal run, this will call FlushLiteralRun, which writes
183
    // out the buffered literal values.
184
    // If 'done' is true, the current run would be written even if it would normally
185
    // have been buffered more.  This should only be called at the end, when the
186
    // encoder has received all values even if it would normally continue to be
187
    // buffered.
188
    void FlushBufferedValues(bool done);
189
190
    // Flushes literal values to the underlying buffer.  If update_indicator_byte,
191
    // then the current literal run is complete and the indicator byte is updated.
192
    void FlushLiteralRun(bool update_indicator_byte);
193
194
    // Flushes a repeated run to the underlying buffer.
195
    void FlushRepeatedRun();
196
197
    // Number of bits needed to encode the value.
198
    const int bit_width_;
199
200
    // Underlying buffer.
201
    BitWriter bit_writer_;
202
203
    // We need to buffer at most 8 values for literals.  This happens when the
204
    // bit_width is 1 (so 8 values fit in one byte).
205
    // TODO: generalize this to other bit widths
206
    uint64_t buffered_values_[8];
207
208
    // Number of values in buffered_values_
209
    int num_buffered_values_;
210
211
    // The current (also last) value that was written and the count of how
212
    // many times in a row that value has been seen.  This is maintained even
213
    // if we are in a literal run.  If the repeat_count_ get high enough, we switch
214
    // to encoding repeated runs.
215
    uint64_t current_value_;
216
    int repeat_count_;
217
218
    // Number of literals in the current run.  This does not include the literals
219
    // that might be in buffered_values_.  Only after we've got a group big enough
220
    // can we decide if they should part of the literal_count_ or repeat_count_
221
    int literal_count_;
222
223
    // Index of a byte in the underlying buffer that stores the indicator byte.
224
    // This is reserved as soon as we need a literal run but the value is written
225
    // when the literal run is complete. We maintain an index rather than a pointer
226
    // into the underlying buffer because the pointer value may become invalid if
227
    // the underlying buffer is resized.
228
    int literal_indicator_byte_idx_;
229
};
230
231
template <typename T>
232
302k
bool RleDecoder<T>::ReadHeader() {
233
302k
    DCHECK(bit_reader_.is_initialized());
234
302k
    if (literal_count_ == 0 && repeat_count_ == 0) [[unlikely]] {
235
        // Read the next run's indicator int, it could be a literal or repeated run
236
        // The int is encoded as a vlq-encoded value.
237
23.7k
        uint32_t indicator_value = 0;
238
23.7k
        bool result = bit_reader_.GetVlqInt(&indicator_value);
239
23.7k
        if (!result) [[unlikely]] {
240
276
            return false;
241
276
        }
242
243
        // lsb indicates if it is a literal run or repeated run
244
23.4k
        bool is_literal = indicator_value & 1;
245
23.4k
        if (is_literal) {
246
1.34k
            literal_count_ = (indicator_value >> 1) * 8;
247
1.34k
            DCHECK_GT(literal_count_, 0);
248
22.1k
        } else {
249
22.1k
            repeat_count_ = indicator_value >> 1;
250
22.1k
            DCHECK_GT(repeat_count_, 0);
251
22.1k
            bool result1 = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
252
22.1k
                                                     reinterpret_cast<T*>(&current_value_));
253
22.1k
            DCHECK(result1);
254
22.1k
        }
255
23.4k
    }
256
301k
    return true;
257
302k
}
_ZN5doris10RleDecoderIsE10ReadHeaderEv
Line
Count
Source
232
266
bool RleDecoder<T>::ReadHeader() {
233
266
    DCHECK(bit_reader_.is_initialized());
234
266
    if (literal_count_ == 0 && repeat_count_ == 0) [[unlikely]] {
235
        // Read the next run's indicator int, it could be a literal or repeated run
236
        // The int is encoded as a vlq-encoded value.
237
114
        uint32_t indicator_value = 0;
238
114
        bool result = bit_reader_.GetVlqInt(&indicator_value);
239
114
        if (!result) [[unlikely]] {
240
0
            return false;
241
0
        }
242
243
        // lsb indicates if it is a literal run or repeated run
244
114
        bool is_literal = indicator_value & 1;
245
114
        if (is_literal) {
246
7
            literal_count_ = (indicator_value >> 1) * 8;
247
7
            DCHECK_GT(literal_count_, 0);
248
107
        } else {
249
107
            repeat_count_ = indicator_value >> 1;
250
107
            DCHECK_GT(repeat_count_, 0);
251
107
            bool result1 = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
252
107
                                                     reinterpret_cast<T*>(&current_value_));
253
107
            DCHECK(result1);
254
107
        }
255
114
    }
256
266
    return true;
257
266
}
_ZN5doris10RleDecoderImE10ReadHeaderEv
Line
Count
Source
232
244k
bool RleDecoder<T>::ReadHeader() {
233
244k
    DCHECK(bit_reader_.is_initialized());
234
244k
    if (literal_count_ == 0 && repeat_count_ == 0) [[unlikely]] {
235
        // Read the next run's indicator int, it could be a literal or repeated run
236
        // The int is encoded as a vlq-encoded value.
237
3.03k
        uint32_t indicator_value = 0;
238
3.03k
        bool result = bit_reader_.GetVlqInt(&indicator_value);
239
3.03k
        if (!result) [[unlikely]] {
240
0
            return false;
241
0
        }
242
243
        // lsb indicates if it is a literal run or repeated run
244
3.03k
        bool is_literal = indicator_value & 1;
245
3.03k
        if (is_literal) {
246
1.12k
            literal_count_ = (indicator_value >> 1) * 8;
247
1.12k
            DCHECK_GT(literal_count_, 0);
248
1.91k
        } else {
249
1.91k
            repeat_count_ = indicator_value >> 1;
250
1.91k
            DCHECK_GT(repeat_count_, 0);
251
1.91k
            bool result1 = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
252
1.91k
                                                     reinterpret_cast<T*>(&current_value_));
253
1.91k
            DCHECK(result1);
254
1.91k
        }
255
3.03k
    }
256
244k
    return true;
257
244k
}
_ZN5doris10RleDecoderIbE10ReadHeaderEv
Line
Count
Source
232
57.5k
bool RleDecoder<T>::ReadHeader() {
233
57.5k
    DCHECK(bit_reader_.is_initialized());
234
57.5k
    if (literal_count_ == 0 && repeat_count_ == 0) [[unlikely]] {
235
        // Read the next run's indicator int, it could be a literal or repeated run
236
        // The int is encoded as a vlq-encoded value.
237
20.5k
        uint32_t indicator_value = 0;
238
20.5k
        bool result = bit_reader_.GetVlqInt(&indicator_value);
239
20.5k
        if (!result) [[unlikely]] {
240
276
            return false;
241
276
        }
242
243
        // lsb indicates if it is a literal run or repeated run
244
20.2k
        bool is_literal = indicator_value & 1;
245
20.2k
        if (is_literal) {
246
209
            literal_count_ = (indicator_value >> 1) * 8;
247
209
            DCHECK_GT(literal_count_, 0);
248
20.0k
        } else {
249
20.0k
            repeat_count_ = indicator_value >> 1;
250
20.0k
            DCHECK_GT(repeat_count_, 0);
251
20.0k
            bool result1 = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
252
20.0k
                                                     reinterpret_cast<T*>(&current_value_));
253
20.0k
            DCHECK(result1);
254
20.0k
        }
255
20.2k
    }
256
57.2k
    return true;
257
57.5k
}
_ZN5doris10RleDecoderIhE10ReadHeaderEv
Line
Count
Source
232
5
bool RleDecoder<T>::ReadHeader() {
233
5
    DCHECK(bit_reader_.is_initialized());
234
5
    if (literal_count_ == 0 && repeat_count_ == 0) [[unlikely]] {
235
        // Read the next run's indicator int, it could be a literal or repeated run
236
        // The int is encoded as a vlq-encoded value.
237
5
        uint32_t indicator_value = 0;
238
5
        bool result = bit_reader_.GetVlqInt(&indicator_value);
239
5
        if (!result) [[unlikely]] {
240
0
            return false;
241
0
        }
242
243
        // lsb indicates if it is a literal run or repeated run
244
5
        bool is_literal = indicator_value & 1;
245
5
        if (is_literal) {
246
5
            literal_count_ = (indicator_value >> 1) * 8;
247
5
            DCHECK_GT(literal_count_, 0);
248
5
        } else {
249
0
            repeat_count_ = indicator_value >> 1;
250
0
            DCHECK_GT(repeat_count_, 0);
251
0
            bool result1 = bit_reader_.GetAligned<T>(BitUtil::Ceil(bit_width_, 8),
252
0
                                                     reinterpret_cast<T*>(&current_value_));
253
0
            DCHECK(result1);
254
0
        }
255
5
    }
256
5
    return true;
257
5
}
258
259
template <typename T>
260
247k
bool RleDecoder<T>::Get(T* val) {
261
247k
    DCHECK(bit_reader_.is_initialized());
262
247k
    if (!ReadHeader()) [[unlikely]] {
263
0
        return false;
264
0
    }
265
266
247k
    if (repeat_count_ > 0) [[likely]] {
267
158k
        *val = cast_set<T>(current_value_);
268
158k
        --repeat_count_;
269
158k
        rewind_state_ = REWIND_RUN;
270
158k
    } else {
271
89.0k
        DCHECK(literal_count_ > 0);
272
89.0k
        bool result = bit_reader_.GetValue(bit_width_, val);
273
89.0k
        DCHECK(result);
274
89.0k
        --literal_count_;
275
89.0k
        rewind_state_ = REWIND_LITERAL;
276
89.0k
    }
277
278
247k
    return true;
279
247k
}
_ZN5doris10RleDecoderIsE3GetEPs
Line
Count
Source
260
6
bool RleDecoder<T>::Get(T* val) {
261
6
    DCHECK(bit_reader_.is_initialized());
262
6
    if (!ReadHeader()) [[unlikely]] {
263
0
        return false;
264
0
    }
265
266
6
    if (repeat_count_ > 0) [[likely]] {
267
6
        *val = cast_set<T>(current_value_);
268
6
        --repeat_count_;
269
6
        rewind_state_ = REWIND_RUN;
270
6
    } else {
271
0
        DCHECK(literal_count_ > 0);
272
0
        bool result = bit_reader_.GetValue(bit_width_, val);
273
0
        DCHECK(result);
274
0
        --literal_count_;
275
0
        rewind_state_ = REWIND_LITERAL;
276
0
    }
277
278
6
    return true;
279
6
}
_ZN5doris10RleDecoderImE3GetEPm
Line
Count
Source
260
244k
bool RleDecoder<T>::Get(T* val) {
261
244k
    DCHECK(bit_reader_.is_initialized());
262
244k
    if (!ReadHeader()) [[unlikely]] {
263
0
        return false;
264
0
    }
265
266
244k
    if (repeat_count_ > 0) [[likely]] {
267
155k
        *val = cast_set<T>(current_value_);
268
155k
        --repeat_count_;
269
155k
        rewind_state_ = REWIND_RUN;
270
155k
    } else {
271
88.9k
        DCHECK(literal_count_ > 0);
272
88.9k
        bool result = bit_reader_.GetValue(bit_width_, val);
273
88.9k
        DCHECK(result);
274
88.9k
        --literal_count_;
275
88.9k
        rewind_state_ = REWIND_LITERAL;
276
88.9k
    }
277
278
244k
    return true;
279
244k
}
_ZN5doris10RleDecoderIbE3GetEPb
Line
Count
Source
260
3.17k
bool RleDecoder<T>::Get(T* val) {
261
3.17k
    DCHECK(bit_reader_.is_initialized());
262
3.17k
    if (!ReadHeader()) [[unlikely]] {
263
0
        return false;
264
0
    }
265
266
3.17k
    if (repeat_count_ > 0) [[likely]] {
267
3.08k
        *val = cast_set<T>(current_value_);
268
3.08k
        --repeat_count_;
269
3.08k
        rewind_state_ = REWIND_RUN;
270
3.08k
    } else {
271
93
        DCHECK(literal_count_ > 0);
272
93
        bool result = bit_reader_.GetValue(bit_width_, val);
273
93
        DCHECK(result);
274
93
        --literal_count_;
275
93
        rewind_state_ = REWIND_LITERAL;
276
93
    }
277
278
3.17k
    return true;
279
3.17k
}
280
281
template <typename T>
282
2
void RleDecoder<T>::RewindOne() {
283
2
    DCHECK(bit_reader_.is_initialized());
284
285
2
    switch (rewind_state_) {
286
0
    case CANT_REWIND:
287
0
        throw Exception(Status::FatalError("Can't rewind more than once after each read!"));
288
0
        break;
289
2
    case REWIND_RUN:
290
2
        ++repeat_count_;
291
2
        break;
292
0
    case REWIND_LITERAL: {
293
0
        bit_reader_.Rewind(bit_width_);
294
0
        ++literal_count_;
295
0
        break;
296
0
    }
297
2
    }
298
299
2
    rewind_state_ = CANT_REWIND;
300
2
}
301
302
template <typename T>
303
34.5k
size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
304
34.5k
    DCHECK(bit_reader_.is_initialized());
305
34.5k
    DCHECK_GT(max_run, 0);
306
34.5k
    size_t ret = 0;
307
34.5k
    size_t rem = max_run;
308
54.5k
    while (ReadHeader()) {
309
54.2k
        if (repeat_count_ > 0) [[likely]] {
310
43.7k
            if (ret > 0 && *val != current_value_) [[unlikely]] {
311
19.6k
                return ret;
312
19.6k
            }
313
24.1k
            *val = cast_set<T>(current_value_);
314
24.1k
            if (repeat_count_ >= rem) {
315
                // The next run is longer than the amount of remaining data
316
                // that the caller wants to read. Only consume it partially.
317
4.31k
                repeat_count_ -= rem;
318
4.31k
                ret += rem;
319
4.31k
                return ret;
320
4.31k
            }
321
19.8k
            ret += repeat_count_;
322
19.8k
            rem -= repeat_count_;
323
19.8k
            repeat_count_ = 0;
324
19.8k
        } else {
325
10.5k
            DCHECK(literal_count_ > 0);
326
10.5k
            if (ret == 0) {
327
10.4k
                bool has_more = bit_reader_.GetValue(bit_width_, val);
328
10.4k
                DCHECK(has_more);
329
10.4k
                literal_count_--;
330
10.4k
                ret++;
331
10.4k
                rem--;
332
10.4k
            }
333
334
42.0k
            while (literal_count_ > 0) {
335
41.8k
                bool result = bit_reader_.GetValue(bit_width_, &current_value_);
336
41.8k
                DCHECK(result);
337
41.8k
                if (current_value_ != *val || rem == 0) {
338
10.3k
                    bit_reader_.Rewind(bit_width_);
339
10.3k
                    return ret;
340
10.3k
                }
341
31.5k
                ret++;
342
31.5k
                rem--;
343
31.5k
                literal_count_--;
344
31.5k
            }
345
10.5k
        }
346
54.2k
    }
347
276
    return ret;
348
34.5k
}
_ZN5doris10RleDecoderIsE10GetNextRunEPsm
Line
Count
Source
303
224
size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
304
224
    DCHECK(bit_reader_.is_initialized());
305
224
    DCHECK_GT(max_run, 0);
306
224
    size_t ret = 0;
307
224
    size_t rem = max_run;
308
224
    while (ReadHeader()) {
309
224
        if (repeat_count_ > 0) [[likely]] {
310
199
            if (ret > 0 && *val != current_value_) [[unlikely]] {
311
0
                return ret;
312
0
            }
313
199
            *val = cast_set<T>(current_value_);
314
199
            if (repeat_count_ >= rem) {
315
                // The next run is longer than the amount of remaining data
316
                // that the caller wants to read. Only consume it partially.
317
199
                repeat_count_ -= rem;
318
199
                ret += rem;
319
199
                return ret;
320
199
            }
321
0
            ret += repeat_count_;
322
0
            rem -= repeat_count_;
323
0
            repeat_count_ = 0;
324
25
        } else {
325
25
            DCHECK(literal_count_ > 0);
326
25
            if (ret == 0) {
327
25
                bool has_more = bit_reader_.GetValue(bit_width_, val);
328
25
                DCHECK(has_more);
329
25
                literal_count_--;
330
25
                ret++;
331
25
                rem--;
332
25
            }
333
334
29
            while (literal_count_ > 0) {
335
29
                bool result = bit_reader_.GetValue(bit_width_, &current_value_);
336
29
                DCHECK(result);
337
29
                if (current_value_ != *val || rem == 0) {
338
25
                    bit_reader_.Rewind(bit_width_);
339
25
                    return ret;
340
25
                }
341
4
                ret++;
342
4
                rem--;
343
4
                literal_count_--;
344
4
            }
345
25
        }
346
224
    }
347
0
    return ret;
348
224
}
_ZN5doris10RleDecoderIbE10GetNextRunEPbm
Line
Count
Source
303
34.3k
size_t RleDecoder<T>::GetNextRun(T* val, size_t max_run) {
304
34.3k
    DCHECK(bit_reader_.is_initialized());
305
34.3k
    DCHECK_GT(max_run, 0);
306
34.3k
    size_t ret = 0;
307
34.3k
    size_t rem = max_run;
308
54.3k
    while (ReadHeader()) {
309
54.0k
        if (repeat_count_ > 0) [[likely]] {
310
43.5k
            if (ret > 0 && *val != current_value_) [[unlikely]] {
311
19.6k
                return ret;
312
19.6k
            }
313
23.9k
            *val = cast_set<T>(current_value_);
314
23.9k
            if (repeat_count_ >= rem) {
315
                // The next run is longer than the amount of remaining data
316
                // that the caller wants to read. Only consume it partially.
317
4.11k
                repeat_count_ -= rem;
318
4.11k
                ret += rem;
319
4.11k
                return ret;
320
4.11k
            }
321
19.8k
            ret += repeat_count_;
322
19.8k
            rem -= repeat_count_;
323
19.8k
            repeat_count_ = 0;
324
19.8k
        } else {
325
10.5k
            DCHECK(literal_count_ > 0);
326
10.5k
            if (ret == 0) {
327
10.4k
                bool has_more = bit_reader_.GetValue(bit_width_, val);
328
10.4k
                DCHECK(has_more);
329
10.4k
                literal_count_--;
330
10.4k
                ret++;
331
10.4k
                rem--;
332
10.4k
            }
333
334
42.0k
            while (literal_count_ > 0) {
335
41.8k
                bool result = bit_reader_.GetValue(bit_width_, &current_value_);
336
41.8k
                DCHECK(result);
337
41.8k
                if (current_value_ != *val || rem == 0) {
338
10.3k
                    bit_reader_.Rewind(bit_width_);
339
10.3k
                    return ret;
340
10.3k
                }
341
31.5k
                ret++;
342
31.5k
                rem--;
343
31.5k
                literal_count_--;
344
31.5k
            }
345
10.5k
        }
346
54.0k
    }
347
276
    return ret;
348
34.3k
}
349
350
template <typename T>
351
39
size_t RleDecoder<T>::get_values(T* values, size_t num_values) {
352
39
    size_t read_num = 0;
353
120
    while (read_num < num_values) {
354
81
        size_t read_this_time = num_values - read_num;
355
356
81
        if (LIKELY(repeat_count_ > 0)) {
357
33
            read_this_time = std::min((size_t)repeat_count_, read_this_time);
358
33
            std::fill(values, values + read_this_time, current_value_);
359
33
            values += read_this_time;
360
33
            repeat_count_ -= read_this_time;
361
33
            read_num += read_this_time;
362
48
        } else if (literal_count_ > 0) {
363
8
            read_this_time = std::min((size_t)literal_count_, read_this_time);
364
59
            for (int i = 0; i < read_this_time; ++i) {
365
51
                bool result = bit_reader_.GetValue(bit_width_, values);
366
51
                DCHECK(result);
367
51
                values++;
368
51
            }
369
8
            literal_count_ -= read_this_time;
370
8
            read_num += read_this_time;
371
40
        } else {
372
40
            if (!ReadHeader()) {
373
0
                return read_num;
374
0
            }
375
40
        }
376
81
    }
377
39
    return read_num;
378
39
}
_ZN5doris10RleDecoderIhE10get_valuesEPhm
Line
Count
Source
351
5
size_t RleDecoder<T>::get_values(T* values, size_t num_values) {
352
5
    size_t read_num = 0;
353
14
    while (read_num < num_values) {
354
9
        size_t read_this_time = num_values - read_num;
355
356
9
        if (LIKELY(repeat_count_ > 0)) {
357
0
            read_this_time = std::min((size_t)repeat_count_, read_this_time);
358
0
            std::fill(values, values + read_this_time, current_value_);
359
0
            values += read_this_time;
360
0
            repeat_count_ -= read_this_time;
361
0
            read_num += read_this_time;
362
9
        } else if (literal_count_ > 0) {
363
5
            read_this_time = std::min((size_t)literal_count_, read_this_time);
364
40
            for (int i = 0; i < read_this_time; ++i) {
365
35
                bool result = bit_reader_.GetValue(bit_width_, values);
366
35
                DCHECK(result);
367
35
                values++;
368
35
            }
369
5
            literal_count_ -= read_this_time;
370
5
            read_num += read_this_time;
371
5
        } else {
372
4
            if (!ReadHeader()) {
373
0
                return read_num;
374
0
            }
375
4
        }
376
9
    }
377
5
    return read_num;
378
5
}
_ZN5doris10RleDecoderIsE10get_valuesEPsm
Line
Count
Source
351
34
size_t RleDecoder<T>::get_values(T* values, size_t num_values) {
352
34
    size_t read_num = 0;
353
106
    while (read_num < num_values) {
354
72
        size_t read_this_time = num_values - read_num;
355
356
72
        if (LIKELY(repeat_count_ > 0)) {
357
33
            read_this_time = std::min((size_t)repeat_count_, read_this_time);
358
33
            std::fill(values, values + read_this_time, current_value_);
359
33
            values += read_this_time;
360
33
            repeat_count_ -= read_this_time;
361
33
            read_num += read_this_time;
362
39
        } else if (literal_count_ > 0) {
363
3
            read_this_time = std::min((size_t)literal_count_, read_this_time);
364
19
            for (int i = 0; i < read_this_time; ++i) {
365
16
                bool result = bit_reader_.GetValue(bit_width_, values);
366
16
                DCHECK(result);
367
16
                values++;
368
16
            }
369
3
            literal_count_ -= read_this_time;
370
3
            read_num += read_this_time;
371
36
        } else {
372
36
            if (!ReadHeader()) {
373
0
                return read_num;
374
0
            }
375
36
        }
376
72
    }
377
34
    return read_num;
378
34
}
379
380
template <typename T>
381
size_t RleDecoder<T>::repeated_count() {
382
    if (repeat_count_ > 0) {
383
        return repeat_count_;
384
    }
385
    if (literal_count_ == 0) {
386
        ReadHeader();
387
    }
388
    return repeat_count_;
389
}
390
391
template <typename T>
392
T RleDecoder<T>::get_repeated_value(size_t count) {
393
    DCHECK_GE(repeat_count_, count);
394
    repeat_count_ -= count;
395
    return current_value_;
396
}
397
398
template <typename T>
399
5
size_t RleDecoder<T>::Skip(size_t to_skip) {
400
5
    DCHECK(bit_reader_.is_initialized());
401
402
5
    size_t set_count = 0;
403
16
    while (to_skip > 0) {
404
11
        bool result = ReadHeader();
405
11
        DCHECK(result);
406
407
11
        if (repeat_count_ > 0) [[likely]] {
408
7
            size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip;
409
7
            repeat_count_ -= nskip;
410
7
            to_skip -= nskip;
411
7
            if (current_value_ != 0) {
412
3
                set_count += nskip;
413
3
            }
414
7
        } else {
415
4
            DCHECK(literal_count_ > 0);
416
4
            size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip;
417
4
            literal_count_ -= nskip;
418
4
            to_skip -= nskip;
419
60
            for (; nskip > 0; nskip--) {
420
56
                T value = 0;
421
56
                bool result1 = bit_reader_.GetValue(bit_width_, &value);
422
56
                DCHECK(result1);
423
56
                if (value != 0) {
424
28
                    set_count++;
425
28
                }
426
56
            }
427
4
        }
428
11
    }
429
5
    return set_count;
430
5
}
_ZN5doris10RleDecoderIbE4SkipEm
Line
Count
Source
399
4
size_t RleDecoder<T>::Skip(size_t to_skip) {
400
4
    DCHECK(bit_reader_.is_initialized());
401
402
4
    size_t set_count = 0;
403
14
    while (to_skip > 0) {
404
10
        bool result = ReadHeader();
405
10
        DCHECK(result);
406
407
10
        if (repeat_count_ > 0) [[likely]] {
408
7
            size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip;
409
7
            repeat_count_ -= nskip;
410
7
            to_skip -= nskip;
411
7
            if (current_value_ != 0) {
412
3
                set_count += nskip;
413
3
            }
414
7
        } else {
415
3
            DCHECK(literal_count_ > 0);
416
3
            size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip;
417
3
            literal_count_ -= nskip;
418
3
            to_skip -= nskip;
419
56
            for (; nskip > 0; nskip--) {
420
53
                T value = 0;
421
53
                bool result1 = bit_reader_.GetValue(bit_width_, &value);
422
53
                DCHECK(result1);
423
53
                if (value != 0) {
424
26
                    set_count++;
425
26
                }
426
53
            }
427
3
        }
428
10
    }
429
4
    return set_count;
430
4
}
_ZN5doris10RleDecoderIhE4SkipEm
Line
Count
Source
399
1
size_t RleDecoder<T>::Skip(size_t to_skip) {
400
1
    DCHECK(bit_reader_.is_initialized());
401
402
1
    size_t set_count = 0;
403
2
    while (to_skip > 0) {
404
1
        bool result = ReadHeader();
405
1
        DCHECK(result);
406
407
1
        if (repeat_count_ > 0) [[likely]] {
408
0
            size_t nskip = (repeat_count_ < to_skip) ? repeat_count_ : to_skip;
409
0
            repeat_count_ -= nskip;
410
0
            to_skip -= nskip;
411
0
            if (current_value_ != 0) {
412
0
                set_count += nskip;
413
0
            }
414
1
        } else {
415
1
            DCHECK(literal_count_ > 0);
416
1
            size_t nskip = (literal_count_ < to_skip) ? literal_count_ : to_skip;
417
1
            literal_count_ -= nskip;
418
1
            to_skip -= nskip;
419
4
            for (; nskip > 0; nskip--) {
420
3
                T value = 0;
421
3
                bool result1 = bit_reader_.GetValue(bit_width_, &value);
422
3
                DCHECK(result1);
423
3
                if (value != 0) {
424
2
                    set_count++;
425
2
                }
426
3
            }
427
1
        }
428
1
    }
429
1
    return set_count;
430
1
}
431
432
// This function buffers input values 8 at a time.  After seeing all 8 values,
433
// it decides whether they should be encoded as a literal or repeated run.
434
template <typename T>
435
735k
void RleEncoder<T>::Put(T value, size_t run_length) {
436
735k
    DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
437
438
    // TODO(perf): remove the loop and use the repeat_count_
439
3.39M
    for (; run_length > 0; run_length--) {
440
2.65M
        if (current_value_ == value) [[likely]] {
441
2.55M
            ++repeat_count_;
442
2.55M
            if (repeat_count_ > 8) {
443
                // This is just a continuation of the current run, no need to buffer the
444
                // values.
445
                // Note that this is the fast path for long repeated runs.
446
2.30M
                continue;
447
2.30M
            }
448
2.55M
        } else {
449
107k
            if (repeat_count_ >= 8) {
450
                // We had a run that was long enough but it has ended.  Flush the
451
                // current repeated run.
452
21.4k
                DCHECK_EQ(literal_count_, 0);
453
21.4k
                FlushRepeatedRun();
454
21.4k
            }
455
107k
            repeat_count_ = 1;
456
107k
            current_value_ = value;
457
107k
        }
458
459
351k
        buffered_values_[num_buffered_values_] = value;
460
351k
        if (++num_buffered_values_ == 8) {
461
43.0k
            DCHECK_EQ(literal_count_ % 8, 0);
462
43.0k
            FlushBufferedValues(false);
463
43.0k
        }
464
351k
    }
465
735k
}
_ZN5doris10RleEncoderImE3PutEmm
Line
Count
Source
435
244k
void RleEncoder<T>::Put(T value, size_t run_length) {
436
244k
    DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
437
438
    // TODO(perf): remove the loop and use the repeat_count_
439
488k
    for (; run_length > 0; run_length--) {
440
244k
        if (current_value_ == value) [[likely]] {
441
167k
            ++repeat_count_;
442
167k
            if (repeat_count_ > 8) {
443
                // This is just a continuation of the current run, no need to buffer the
444
                // values.
445
                // Note that this is the fast path for long repeated runs.
446
140k
                continue;
447
140k
            }
448
167k
        } else {
449
76.9k
            if (repeat_count_ >= 8) {
450
                // We had a run that was long enough but it has ended.  Flush the
451
                // current repeated run.
452
1.65k
                DCHECK_EQ(literal_count_, 0);
453
1.65k
                FlushRepeatedRun();
454
1.65k
            }
455
76.9k
            repeat_count_ = 1;
456
76.9k
            current_value_ = value;
457
76.9k
        }
458
459
103k
        buffered_values_[num_buffered_values_] = value;
460
103k
        if (++num_buffered_values_ == 8) {
461
            DCHECK_EQ(literal_count_ % 8, 0);
462
12.9k
            FlushBufferedValues(false);
463
12.9k
        }
464
103k
    }
465
244k
}
_ZN5doris10RleEncoderIbE3PutEbm
Line
Count
Source
435
491k
void RleEncoder<T>::Put(T value, size_t run_length) {
436
491k
    DCHECK(bit_width_ == 64 || value < (1LL << bit_width_));
437
438
    // TODO(perf): remove the loop and use the repeat_count_
439
2.90M
    for (; run_length > 0; run_length--) {
440
2.41M
        if (current_value_ == value) [[likely]] {
441
2.38M
            ++repeat_count_;
442
2.38M
            if (repeat_count_ > 8) {
443
                // This is just a continuation of the current run, no need to buffer the
444
                // values.
445
                // Note that this is the fast path for long repeated runs.
446
2.16M
                continue;
447
2.16M
            }
448
2.38M
        } else {
449
30.1k
            if (repeat_count_ >= 8) {
450
                // We had a run that was long enough but it has ended.  Flush the
451
                // current repeated run.
452
19.7k
                DCHECK_EQ(literal_count_, 0);
453
19.7k
                FlushRepeatedRun();
454
19.7k
            }
455
30.1k
            repeat_count_ = 1;
456
30.1k
            current_value_ = value;
457
30.1k
        }
458
459
247k
        buffered_values_[num_buffered_values_] = value;
460
247k
        if (++num_buffered_values_ == 8) {
461
            DCHECK_EQ(literal_count_ % 8, 0);
462
30.1k
            FlushBufferedValues(false);
463
30.1k
        }
464
247k
    }
465
491k
}
466
467
template <typename T>
468
17.4k
void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
469
17.4k
    if (literal_indicator_byte_idx_ < 0) {
470
        // The literal indicator byte has not been reserved yet, get one now.
471
1.33k
        literal_indicator_byte_idx_ = cast_set<int>(bit_writer_.GetByteIndexAndAdvance(1));
472
1.33k
        DCHECK_GE(literal_indicator_byte_idx_, 0);
473
1.33k
    }
474
475
    // Write all the buffered values as bit packed literals
476
148k
    for (int i = 0; i < num_buffered_values_; ++i) {
477
130k
        bit_writer_.PutValue(buffered_values_[i], bit_width_);
478
130k
    }
479
17.4k
    num_buffered_values_ = 0;
480
481
17.4k
    if (update_indicator_byte) {
482
        // At this point we need to write the indicator byte for the literal run.
483
        // We only reserve one byte, to allow for streaming writes of literal values.
484
        // The logic makes sure we flush literal runs often enough to not overrun
485
        // the 1 byte.
486
1.33k
        int num_groups = BitUtil::Ceil(literal_count_, 8);
487
1.33k
        int32_t indicator_value = (num_groups << 1) | 1;
488
1.33k
        DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
489
1.33k
        bit_writer_.buffer()->data()[literal_indicator_byte_idx_] =
490
1.33k
                cast_set<uint8_t>(indicator_value);
491
1.33k
        literal_indicator_byte_idx_ = -1;
492
1.33k
        literal_count_ = 0;
493
1.33k
    }
494
17.4k
}
_ZN5doris10RleEncoderImE15FlushLiteralRunEb
Line
Count
Source
468
12.0k
void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
469
12.0k
    if (literal_indicator_byte_idx_ < 0) {
470
        // The literal indicator byte has not been reserved yet, get one now.
471
1.12k
        literal_indicator_byte_idx_ = cast_set<int>(bit_writer_.GetByteIndexAndAdvance(1));
472
1.12k
        DCHECK_GE(literal_indicator_byte_idx_, 0);
473
1.12k
    }
474
475
    // Write all the buffered values as bit packed literals
476
101k
    for (int i = 0; i < num_buffered_values_; ++i) {
477
88.9k
        bit_writer_.PutValue(buffered_values_[i], bit_width_);
478
88.9k
    }
479
12.0k
    num_buffered_values_ = 0;
480
481
12.0k
    if (update_indicator_byte) {
482
        // At this point we need to write the indicator byte for the literal run.
483
        // We only reserve one byte, to allow for streaming writes of literal values.
484
        // The logic makes sure we flush literal runs often enough to not overrun
485
        // the 1 byte.
486
1.12k
        int num_groups = BitUtil::Ceil(literal_count_, 8);
487
1.12k
        int32_t indicator_value = (num_groups << 1) | 1;
488
        DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
489
1.12k
        bit_writer_.buffer()->data()[literal_indicator_byte_idx_] =
490
1.12k
                cast_set<uint8_t>(indicator_value);
491
1.12k
        literal_indicator_byte_idx_ = -1;
492
1.12k
        literal_count_ = 0;
493
1.12k
    }
494
12.0k
}
_ZN5doris10RleEncoderIbE15FlushLiteralRunEb
Line
Count
Source
468
5.36k
void RleEncoder<T>::FlushLiteralRun(bool update_indicator_byte) {
469
5.36k
    if (literal_indicator_byte_idx_ < 0) {
470
        // The literal indicator byte has not been reserved yet, get one now.
471
211
        literal_indicator_byte_idx_ = cast_set<int>(bit_writer_.GetByteIndexAndAdvance(1));
472
211
        DCHECK_GE(literal_indicator_byte_idx_, 0);
473
211
    }
474
475
    // Write all the buffered values as bit packed literals
476
47.2k
    for (int i = 0; i < num_buffered_values_; ++i) {
477
41.8k
        bit_writer_.PutValue(buffered_values_[i], bit_width_);
478
41.8k
    }
479
5.36k
    num_buffered_values_ = 0;
480
481
5.36k
    if (update_indicator_byte) {
482
        // At this point we need to write the indicator byte for the literal run.
483
        // We only reserve one byte, to allow for streaming writes of literal values.
484
        // The logic makes sure we flush literal runs often enough to not overrun
485
        // the 1 byte.
486
211
        int num_groups = BitUtil::Ceil(literal_count_, 8);
487
211
        int32_t indicator_value = (num_groups << 1) | 1;
488
        DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
489
211
        bit_writer_.buffer()->data()[literal_indicator_byte_idx_] =
490
211
                cast_set<uint8_t>(indicator_value);
491
211
        literal_indicator_byte_idx_ = -1;
492
211
        literal_count_ = 0;
493
211
    }
494
5.36k
}
495
496
template <typename T>
497
21.9k
void RleEncoder<T>::FlushRepeatedRun() {
498
21.9k
    DCHECK_GT(repeat_count_, 0);
499
    // The lsb of 0 indicates this is a repeated run
500
21.9k
    int32_t indicator_value = repeat_count_ << 1 | 0;
501
21.9k
    bit_writer_.PutVlqInt(indicator_value);
502
21.9k
    bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
503
21.9k
    num_buffered_values_ = 0;
504
21.9k
    repeat_count_ = 0;
505
21.9k
}
_ZN5doris10RleEncoderImE16FlushRepeatedRunEv
Line
Count
Source
497
1.91k
void RleEncoder<T>::FlushRepeatedRun() {
498
1.91k
    DCHECK_GT(repeat_count_, 0);
499
    // The lsb of 0 indicates this is a repeated run
500
1.91k
    int32_t indicator_value = repeat_count_ << 1 | 0;
501
1.91k
    bit_writer_.PutVlqInt(indicator_value);
502
1.91k
    bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
503
1.91k
    num_buffered_values_ = 0;
504
1.91k
    repeat_count_ = 0;
505
1.91k
}
_ZN5doris10RleEncoderIbE16FlushRepeatedRunEv
Line
Count
Source
497
20.0k
void RleEncoder<T>::FlushRepeatedRun() {
498
20.0k
    DCHECK_GT(repeat_count_, 0);
499
    // The lsb of 0 indicates this is a repeated run
500
20.0k
    int32_t indicator_value = repeat_count_ << 1 | 0;
501
20.0k
    bit_writer_.PutVlqInt(indicator_value);
502
20.0k
    bit_writer_.PutAligned(current_value_, BitUtil::Ceil(bit_width_, 8));
503
20.0k
    num_buffered_values_ = 0;
504
20.0k
    repeat_count_ = 0;
505
20.0k
}
506
507
// Flush the values that have been buffered.  At this point we decide whether
508
// we need to switch between the run types or continue the current one.
509
template <typename T>
510
43.0k
void RleEncoder<T>::FlushBufferedValues(bool done) {
511
43.0k
    if (repeat_count_ >= 8) {
512
        // Clear the buffered values.  They are part of the repeated run now and we
513
        // don't want to flush them out as literals.
514
26.8k
        num_buffered_values_ = 0;
515
26.8k
        if (literal_count_ != 0) {
516
            // There was a current literal run.  All the values in it have been flushed
517
            // but we still need to update the indicator byte.
518
920
            DCHECK_EQ(literal_count_ % 8, 0);
519
920
            DCHECK_EQ(repeat_count_, 8);
520
920
            FlushLiteralRun(true);
521
920
        }
522
26.8k
        DCHECK_EQ(literal_count_, 0);
523
26.8k
        return;
524
26.8k
    }
525
526
16.2k
    literal_count_ += num_buffered_values_;
527
16.2k
    int num_groups = BitUtil::Ceil(literal_count_, 8);
528
16.2k
    if (num_groups + 1 >= (1 << 6)) {
529
        // We need to start a new literal run because the indicator byte we've reserved
530
        // cannot store more values.
531
167
        DCHECK_GE(literal_indicator_byte_idx_, 0);
532
167
        FlushLiteralRun(true);
533
16.1k
    } else {
534
16.1k
        FlushLiteralRun(done);
535
16.1k
    }
536
16.2k
    repeat_count_ = 0;
537
16.2k
}
_ZN5doris10RleEncoderImE19FlushBufferedValuesEb
Line
Count
Source
510
12.9k
void RleEncoder<T>::FlushBufferedValues(bool done) {
511
12.9k
    if (repeat_count_ >= 8) {
512
        // Clear the buffered values.  They are part of the repeated run now and we
513
        // don't want to flush them out as literals.
514
1.85k
        num_buffered_values_ = 0;
515
1.85k
        if (literal_count_ != 0) {
516
            // There was a current literal run.  All the values in it have been flushed
517
            // but we still need to update the indicator byte.
518
858
            DCHECK_EQ(literal_count_ % 8, 0);
519
858
            DCHECK_EQ(repeat_count_, 8);
520
858
            FlushLiteralRun(true);
521
858
        }
522
1.85k
        DCHECK_EQ(literal_count_, 0);
523
1.85k
        return;
524
1.85k
    }
525
526
11.0k
    literal_count_ += num_buffered_values_;
527
11.0k
    int num_groups = BitUtil::Ceil(literal_count_, 8);
528
11.0k
    if (num_groups + 1 >= (1 << 6)) {
529
        // We need to start a new literal run because the indicator byte we've reserved
530
        // cannot store more values.
531
128
        DCHECK_GE(literal_indicator_byte_idx_, 0);
532
128
        FlushLiteralRun(true);
533
10.9k
    } else {
534
10.9k
        FlushLiteralRun(done);
535
10.9k
    }
536
11.0k
    repeat_count_ = 0;
537
11.0k
}
_ZN5doris10RleEncoderIbE19FlushBufferedValuesEb
Line
Count
Source
510
30.1k
void RleEncoder<T>::FlushBufferedValues(bool done) {
511
30.1k
    if (repeat_count_ >= 8) {
512
        // Clear the buffered values.  They are part of the repeated run now and we
513
        // don't want to flush them out as literals.
514
24.9k
        num_buffered_values_ = 0;
515
24.9k
        if (literal_count_ != 0) {
516
            // There was a current literal run.  All the values in it have been flushed
517
            // but we still need to update the indicator byte.
518
62
            DCHECK_EQ(literal_count_ % 8, 0);
519
62
            DCHECK_EQ(repeat_count_, 8);
520
62
            FlushLiteralRun(true);
521
62
        }
522
24.9k
        DCHECK_EQ(literal_count_, 0);
523
24.9k
        return;
524
24.9k
    }
525
526
5.19k
    literal_count_ += num_buffered_values_;
527
5.19k
    int num_groups = BitUtil::Ceil(literal_count_, 8);
528
5.19k
    if (num_groups + 1 >= (1 << 6)) {
529
        // We need to start a new literal run because the indicator byte we've reserved
530
        // cannot store more values.
531
39
        DCHECK_GE(literal_indicator_byte_idx_, 0);
532
39
        FlushLiteralRun(true);
533
5.15k
    } else {
534
5.15k
        FlushLiteralRun(done);
535
5.15k
    }
536
5.19k
    repeat_count_ = 0;
537
5.19k
}
538
539
template <typename T>
540
0
void RleEncoder<T>::Reserve(int num_bytes, uint8_t val) {
541
0
    for (int i = 0; i < num_bytes; ++i) {
542
0
        bit_writer_.PutValue(val, 8);
543
0
    }
544
0
}
545
546
template <typename T>
547
806
int RleEncoder<T>::Flush() {
548
806
    if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
549
804
        bool all_repeat = literal_count_ == 0 &&
550
804
                          (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
551
        // There is something pending, figure out if it's a repeated or literal run
552
804
        if (repeat_count_ > 0 && all_repeat) {
553
560
            FlushRepeatedRun();
554
560
        } else {
555
244
            literal_count_ += num_buffered_values_;
556
244
            FlushLiteralRun(true);
557
244
            repeat_count_ = 0;
558
244
        }
559
804
    }
560
806
    bit_writer_.Flush();
561
806
    DCHECK_EQ(num_buffered_values_, 0);
562
806
    DCHECK_EQ(literal_count_, 0);
563
806
    DCHECK_EQ(repeat_count_, 0);
564
806
    return bit_writer_.bytes_written();
565
806
}
_ZN5doris10RleEncoderImE5FlushEv
Line
Count
Source
547
394
int RleEncoder<T>::Flush() {
548
394
    if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
549
394
        bool all_repeat = literal_count_ == 0 &&
550
394
                          (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
551
        // There is something pending, figure out if it's a repeated or literal run
552
394
        if (repeat_count_ > 0 && all_repeat) {
553
260
            FlushRepeatedRun();
554
260
        } else {
555
134
            literal_count_ += num_buffered_values_;
556
134
            FlushLiteralRun(true);
557
134
            repeat_count_ = 0;
558
134
        }
559
394
    }
560
394
    bit_writer_.Flush();
561
394
    DCHECK_EQ(num_buffered_values_, 0);
562
394
    DCHECK_EQ(literal_count_, 0);
563
    DCHECK_EQ(repeat_count_, 0);
564
394
    return bit_writer_.bytes_written();
565
394
}
_ZN5doris10RleEncoderIbE5FlushEv
Line
Count
Source
547
412
int RleEncoder<T>::Flush() {
548
412
    if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
549
410
        bool all_repeat = literal_count_ == 0 &&
550
410
                          (repeat_count_ == num_buffered_values_ || num_buffered_values_ == 0);
551
        // There is something pending, figure out if it's a repeated or literal run
552
410
        if (repeat_count_ > 0 && all_repeat) {
553
300
            FlushRepeatedRun();
554
300
        } else {
555
110
            literal_count_ += num_buffered_values_;
556
110
            FlushLiteralRun(true);
557
110
            repeat_count_ = 0;
558
110
        }
559
410
    }
560
412
    bit_writer_.Flush();
561
412
    DCHECK_EQ(num_buffered_values_, 0);
562
412
    DCHECK_EQ(literal_count_, 0);
563
    DCHECK_EQ(repeat_count_, 0);
564
412
    return bit_writer_.bytes_written();
565
412
}
566
567
template <typename T>
568
10.4k
void RleEncoder<T>::Clear() {
569
10.4k
    current_value_ = 0;
570
10.4k
    repeat_count_ = 0;
571
10.4k
    num_buffered_values_ = 0;
572
10.4k
    literal_count_ = 0;
573
10.4k
    literal_indicator_byte_idx_ = -1;
574
10.4k
    bit_writer_.Clear();
575
10.4k
}
_ZN5doris10RleEncoderImE5ClearEv
Line
Count
Source
568
394
void RleEncoder<T>::Clear() {
569
394
    current_value_ = 0;
570
394
    repeat_count_ = 0;
571
394
    num_buffered_values_ = 0;
572
394
    literal_count_ = 0;
573
394
    literal_indicator_byte_idx_ = -1;
574
394
    bit_writer_.Clear();
575
394
}
_ZN5doris10RleEncoderIbE5ClearEv
Line
Count
Source
568
10.0k
void RleEncoder<T>::Clear() {
569
10.0k
    current_value_ = 0;
570
10.0k
    repeat_count_ = 0;
571
10.0k
    num_buffered_values_ = 0;
572
10.0k
    literal_count_ = 0;
573
10.0k
    literal_indicator_byte_idx_ = -1;
574
10.0k
    bit_writer_.Clear();
575
10.0k
}
576
577
// Copy from https://github.com/apache/impala/blob/master/be/src/util/rle-encoding.h
578
// Utility classes to do run length encoding (RLE) for fixed bit width values.  If runs
579
// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
580
// (literal encoding).
581
//
582
// For both types of runs, there is a byte-aligned indicator which encodes the length
583
// of the run and the type of the run.
584
//
585
// This encoding has the benefit that when there aren't any long enough runs, values
586
// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
587
// the run length are byte aligned. This allows for very efficient decoding
588
// implementations.
589
// The encoding is:
590
//    encoded-block := run*
591
//    run := literal-run | repeated-run
592
//    literal-run := literal-indicator < literal bytes >
593
//    repeated-run := repeated-indicator < repeated value. padded to byte boundary >
594
//    literal-indicator := varint_encode( number_of_groups << 1 | 1)
595
//    repeated-indicator := varint_encode( number_of_repetitions << 1 )
596
//
597
// Each run is preceded by a varint. The varint's least significant bit is
598
// used to indicate whether the run is a literal run or a repeated run. The rest
599
// of the varint is used to determine the length of the run (eg how many times the
600
// value repeats).
601
//
602
// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
603
// in groups of 8), so that no matter the bit-width of the value, the sequence will end
604
// on a byte boundary without padding.
605
// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
606
// the actual number of encoded ints. (This means that the total number of encoded values
607
// can not be determined from the encoded data, since the number of values in the last
608
// group may not be a multiple of 8). For the last group of literal runs, we pad
609
// the group to 8 with zeros. This allows for 8 at a time decoding on the read side
610
// without the need for additional checks.
611
//
612
// There is a break-even point when it is more storage efficient to do run length
613
// encoding.  For 1 bit-width values, that point is 8 values.  They require 2 bytes
614
// for both the repeated encoding or the literal encoding.  This value can always
615
// be computed based on the bit-width.
616
// TODO: For 1 bit-width values it can be optimal to use 16 or 24 values, but more
617
// investigation is needed to do this efficiently, see the reverted IMPALA-6658.
618
// TODO: think about how to use this for strings.  The bit packing isn't quite the same.
619
//
620
// Examples with bit-width 1 (eg encoding booleans):
621
// ----------------------------------------
622
// 100 1s followed by 100 0s:
623
// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
624
//  - (total 4 bytes)
625
//
626
// alternating 1s and 0s (200 total):
627
// 200 ints = 25 groups of 8
628
// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
629
// (total 26 bytes, 1 byte overhead)
630
631
// RLE decoder with a batch-oriented interface that enables fast decoding.
632
// Users of this class must first initialize the class to point to a buffer of
633
// RLE-encoded data, passed into the constructor or Reset(). The provided
634
// bit_width must be at most min(sizeof(T) * 8, BatchedBitReader::MAX_BITWIDTH).
635
// Then they can decode data by checking NextNumRepeats()/NextNumLiterals() to
636
// see if the next run is a repeated or literal run, then calling
637
// GetRepeatedValue() or GetLiteralValues() respectively to read the values.
638
//
639
// End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0.
640
// Other decoding errors are signalled by functions returning false. If an
641
// error is encountered then it is not valid to read any more data until
642
// Reset() is called.
643
644
//bit-packed-run-len and rle-run-len must be in the range [1, 2^31 - 1].
645
// This means that a Parquet implementation can always store the run length in a signed 32-bit integer.
646
template <typename T>
647
class RleBatchDecoder {
648
public:
649
35
    RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) {
650
35
        Reset(buffer, buffer_len, bit_width);
651
35
    }
652
653
    RleBatchDecoder() = default;
654
655
    // Reset the decoder to read from a new buffer.
656
    void Reset(uint8_t* buffer, int buffer_len, int bit_width);
657
658
    // Return the size of the current repeated run. Returns zero if the current run is
659
    // a literal run or if no more runs can be read from the input.
660
    int32_t NextNumRepeats();
661
662
    // Get the value of the current repeated run and consume the given number of repeats.
663
    // Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot
664
    // be greater than the remaining number of repeats in the run. 'num_repeats_to_consume'
665
    // can be set to 0 to peek at the value without consuming repeats.
666
    T GetRepeatedValue(int32_t num_repeats_to_consume);
667
668
    // Return the size of the current literal run. Returns zero if the current run is
669
    // a repeated run or if no more runs can be read from the input.
670
    int32_t NextNumLiterals();
671
672
    // Consume 'num_literals_to_consume' literals from the current literal run,
673
    // copying the values to 'values'. 'num_literals_to_consume' must be <=
674
    // NextNumLiterals(). Returns true if the requested number of literals were
675
    // successfully read or false if an error was encountered, e.g. the input was
676
    // truncated.
677
    bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT;
678
679
    // Consume 'num_values_to_consume' values and copy them to 'values'.
680
    // Returns the number of consumed values or 0 if an error occurred.
681
    uint32_t GetBatch(T* values, uint32_t batch_num);
682
683
private:
684
    // Called when both 'literal_count_' and 'repeat_count_' have been exhausted.
685
    // Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal
686
    // or repeated run, or leaves both at 0 if no more values can be read (either because
687
    // the end of the input was reached or an error was encountered decoding).
688
    void NextCounts();
689
690
    /// Fill the literal buffer. Invalid to call if there are already buffered literals.
691
    /// Return false if the input was truncated. This does not advance 'literal_count_'.
692
    bool FillLiteralBuffer() WARN_UNUSED_RESULT;
693
694
20
    bool HaveBufferedLiterals() const { return literal_buffer_pos_ < num_buffered_literals_; }
695
696
    /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing
697
    /// 'literal_count_'. Returns the number of literals outputted.
698
    int32_t OutputBufferedLiterals(int32_t max_to_output, T* values);
699
700
    BatchedBitReader bit_reader_;
701
702
    // Number of bits needed to encode the value. Must be between 0 and 64 after
703
    // the decoder is initialized with a buffer. -1 indicates the decoder was not
704
    // initialized.
705
    int bit_width_ = -1;
706
707
    // If a repeated run, the number of repeats remaining in the current run to be read.
708
    // If the current run is a literal run, this is 0.
709
    int32_t repeat_count_ = 0;
710
711
    // If a literal run, the number of literals remaining in the current run to be read.
712
    // If the current run is a repeated run, this is 0.
713
    int32_t literal_count_ = 0;
714
715
    // If a repeated run, the current repeated value.
716
    T repeated_value_;
717
718
    // Size of buffer for literal values. Large enough to decode a full batch of 32
719
    // literals. The buffer is needed to allow clients to read in batches that are not
720
    // multiples of 32.
721
    static constexpr int LITERAL_BUFFER_LEN = 32;
722
723
    // Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' is the
724
    // position of the next literal to be read from the buffer.
725
    T literal_buffer_[LITERAL_BUFFER_LEN];
726
    int num_buffered_literals_ = 0;
727
    int literal_buffer_pos_ = 0;
728
};
729
730
template <typename T>
731
20
int32_t RleBatchDecoder<T>::OutputBufferedLiterals(int32_t max_to_output, T* values) {
732
20
    int32_t num_to_output =
733
20
            std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_);
734
20
    memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * num_to_output);
735
20
    literal_buffer_pos_ += num_to_output;
736
20
    literal_count_ -= num_to_output;
737
20
    return num_to_output;
738
20
}
739
740
template <typename T>
741
35
void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) {
742
35
    bit_reader_.Reset(buffer, buffer_len);
743
35
    bit_width_ = bit_width;
744
35
    repeat_count_ = 0;
745
35
    literal_count_ = 0;
746
35
    num_buffered_literals_ = 0;
747
35
    literal_buffer_pos_ = 0;
748
35
}
749
750
template <typename T>
751
56
int32_t RleBatchDecoder<T>::NextNumRepeats() {
752
56
    if (repeat_count_ > 0) return repeat_count_;
753
54
    if (literal_count_ == 0) NextCounts();
754
54
    return repeat_count_;
755
56
}
756
757
template <typename T>
758
54
void RleBatchDecoder<T>::NextCounts() {
759
    // Read the next run's indicator int, it could be a literal or repeated run.
760
    // The int is encoded as a ULEB128-encoded value.
761
54
    uint32_t indicator_value = 0;
762
54
    if (UNLIKELY(!bit_reader_.GetUleb128<uint32_t>(&indicator_value))) {
763
0
        return;
764
0
    }
765
766
    // lsb indicates if it is a literal run or repeated run
767
54
    bool is_literal = indicator_value & 1;
768
769
    // Don't try to handle run lengths that don't fit in an int32_t - just fail gracefully.
770
    // The Parquet standard does not allow longer runs - see PARQUET-1290.
771
54
    uint32_t run_len = indicator_value >> 1;
772
54
    if (is_literal) {
773
        // Use int64_t to avoid overflowing multiplication.
774
20
        int64_t literal_count = static_cast<int64_t>(run_len) * 8;
775
20
        if (UNLIKELY(literal_count > std::numeric_limits<int32_t>::max())) return;
776
20
        literal_count_ = cast_set<int32_t>(literal_count);
777
34
    } else {
778
34
        if (UNLIKELY(run_len == 0)) return;
779
34
        bool result = bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_);
780
34
        if (UNLIKELY(!result)) return;
781
34
        repeat_count_ = run_len;
782
34
    }
783
54
}
784
785
template <typename T>
786
36
T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) {
787
36
    repeat_count_ -= num_repeats_to_consume;
788
36
    return repeated_value_;
789
36
}
790
791
template <typename T>
792
20
int32_t RleBatchDecoder<T>::NextNumLiterals() {
793
20
    if (literal_count_ > 0) return literal_count_;
794
0
    if (repeat_count_ == 0) NextCounts();
795
0
    return literal_count_;
796
20
}
797
798
template <typename T>
799
20
bool RleBatchDecoder<T>::GetLiteralValues(int32_t num_literals_to_consume, T* values) {
800
20
    int32_t num_consumed = 0;
801
    // Copy any buffered literals left over from previous calls.
802
20
    if (HaveBufferedLiterals()) {
803
0
        num_consumed = OutputBufferedLiterals(num_literals_to_consume, values);
804
0
    }
805
806
20
    int32_t num_remaining = num_literals_to_consume - num_consumed;
807
    // Copy literals directly to the output, bypassing 'literal_buffer_' when possible.
808
    // Need to round to a batch of 32 if the caller is consuming only part of the current
809
    // run avoid ending on a non-byte boundary.
810
20
    int32_t num_to_bypass =
811
20
            std::min<int32_t>(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32));
812
20
    if (num_to_bypass > 0) {
813
0
        int num_read = bit_reader_.UnpackBatch(bit_width_, num_to_bypass, values + num_consumed);
814
        // If we couldn't read the expected number, that means the input was truncated.
815
0
        if (num_read < num_to_bypass) return false;
816
0
        literal_count_ -= num_to_bypass;
817
0
        num_consumed += num_to_bypass;
818
0
        num_remaining = num_literals_to_consume - num_consumed;
819
0
    }
820
821
20
    if (num_remaining > 0) {
822
        // We weren't able to copy all the literals requested directly from the input.
823
        // Buffer literals and copy over the requested number.
824
20
        if (UNLIKELY(!FillLiteralBuffer())) return false;
825
20
        OutputBufferedLiterals(num_remaining, values + num_consumed);
826
20
    }
827
20
    return true;
828
20
}
829
830
template <typename T>
831
20
bool RleBatchDecoder<T>::FillLiteralBuffer() {
832
20
    int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN, literal_count_);
833
20
    num_buffered_literals_ = bit_reader_.UnpackBatch(bit_width_, num_to_buffer, literal_buffer_);
834
    // If we couldn't read the expected number, that means the input was truncated.
835
20
    if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false;
836
20
    literal_buffer_pos_ = 0;
837
20
    return true;
838
20
}
839
840
template <typename T>
841
37
uint32_t RleBatchDecoder<T>::GetBatch(T* values, uint32_t batch_num) {
842
37
    uint32_t num_consumed = 0;
843
93
    while (num_consumed < batch_num) {
844
        // Add RLE encoded values by repeating the current value this number of times.
845
56
        uint32_t num_repeats = NextNumRepeats();
846
56
        if (num_repeats > 0) {
847
36
            int32_t num_repeats_to_set = std::min(num_repeats, batch_num - num_consumed);
848
36
            T repeated_value = GetRepeatedValue(num_repeats_to_set);
849
292
            for (int i = 0; i < num_repeats_to_set; ++i) {
850
256
                values[num_consumed + i] = repeated_value;
851
256
            }
852
36
            num_consumed += num_repeats_to_set;
853
36
            continue;
854
36
        }
855
856
        // Add remaining literal values, if any.
857
20
        uint32_t num_literals = NextNumLiterals();
858
20
        if (num_literals == 0) {
859
0
            break;
860
0
        }
861
20
        uint32_t num_literals_to_set = std::min(num_literals, batch_num - num_consumed);
862
20
        if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) {
863
0
            return 0;
864
0
        }
865
20
        num_consumed += num_literals_to_set;
866
20
    }
867
37
    return num_consumed;
868
37
}
869
#include "common/compile_check_end.h"
870
} // namespace doris