Coverage Report

Created: 2025-07-25 20:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/util/quantile_state.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#include "util/quantile_state.h"
18
19
#include <string.h>
20
21
#include <cmath>
22
#include <ostream>
23
#include <utility>
24
25
#include "common/logging.h"
26
#include "util/coding.h"
27
#include "util/slice.h"
28
#include "util/tdigest.h"
29
#include "vec/common/unaligned.h"
30
31
namespace doris {
32
#include "common/compile_check_begin.h"
33
128k
QuantileState::QuantileState() : _type(EMPTY), _compression(QUANTILE_STATE_COMPRESSION_MIN) {}
34
35
0
QuantileState::QuantileState(float compression) : _type(EMPTY), _compression(compression) {}
36
37
0
QuantileState::QuantileState(const Slice& slice) {
38
0
    if (!deserialize(slice)) {
39
0
        _type = EMPTY;
40
0
    }
41
0
}
42
43
375k
size_t QuantileState::get_serialized_size() {
44
375k
    size_t size = 1 + sizeof(float); // type(QuantileStateType) + compression(float)
45
375k
    switch (_type) {
46
200k
    case EMPTY:
47
200k
        break;
48
80.0k
    case SINGLE:
49
80.0k
        size += sizeof(double);
50
80.0k
        break;
51
95.3k
    case EXPLICIT:
52
95.3k
        size += sizeof(uint16_t) + sizeof(double) * _explicit_data.size();
53
95.3k
        break;
54
4
    case TDIGEST:
55
4
        size += _tdigest_ptr->serialized_size();
56
4
        break;
57
375k
    }
58
375k
    return size;
59
375k
}
60
61
0
void QuantileState::set_compression(float compression) {
62
0
    DCHECK(compression >= QUANTILE_STATE_COMPRESSION_MIN &&
63
0
           compression <= QUANTILE_STATE_COMPRESSION_MAX);
64
0
    this->_compression = compression;
65
0
}
66
67
83.3k
bool QuantileState::is_valid(const Slice& slice) {
68
83.3k
    if (slice.size < 1) {
69
0
        return false;
70
0
    }
71
83.3k
    const uint8_t* ptr = (uint8_t*)slice.data;
72
83.3k
    const uint8_t* end = (uint8_t*)slice.data + slice.size;
73
83.3k
    float compress_value = unaligned_load<float>(ptr);
74
83.3k
    if (compress_value < QUANTILE_STATE_COMPRESSION_MIN ||
75
83.3k
        compress_value > QUANTILE_STATE_COMPRESSION_MAX) {
76
0
        return false;
77
0
    }
78
83.3k
    ptr += sizeof(float);
79
80
83.3k
    auto type = (QuantileStateType)*ptr++;
81
83.3k
    switch (type) {
82
40.0k
    case EMPTY:
83
40.0k
        break;
84
20.0k
    case SINGLE: {
85
20.0k
        if ((ptr + sizeof(double)) > end) {
86
0
            return false;
87
0
        }
88
20.0k
        ptr += sizeof(double);
89
20.0k
        break;
90
20.0k
    }
91
23.2k
    case EXPLICIT: {
92
23.2k
        if ((ptr + sizeof(uint16_t)) > end) {
93
0
            return false;
94
0
        }
95
23.2k
        uint16_t num_explicits = decode_fixed16_le(ptr);
96
23.2k
        ptr += sizeof(uint16_t);
97
23.2k
        ptr += num_explicits * sizeof(double);
98
23.2k
        break;
99
23.2k
    }
100
1
    case TDIGEST: {
101
1
        if ((ptr + sizeof(uint32_t)) > end) {
102
0
            return false;
103
0
        }
104
1
        uint32_t tdigest_serialized_length = decode_fixed32_le(ptr);
105
1
        ptr += tdigest_serialized_length;
106
1
        break;
107
1
    }
108
0
    default:
109
0
        return false;
110
83.3k
    }
111
83.3k
    return ptr == end;
112
83.3k
}
113
114
9
double QuantileState::get_explicit_value_by_percentile(float percentile) const {
115
9
    DCHECK(_type == EXPLICIT);
116
9
    size_t n = _explicit_data.size();
117
9
    std::vector<double> sorted_data(_explicit_data.begin(), _explicit_data.end());
118
9
    std::sort(sorted_data.begin(), sorted_data.end());
119
120
9
    double index = double(n - 1) * percentile;
121
9
    int intIdx = (int)index;
122
9
    if (intIdx == n - 1) {
123
3
        return sorted_data[intIdx];
124
3
    }
125
6
    return sorted_data[intIdx + 1] * (index - intIdx) + sorted_data[intIdx] * (intIdx + 1 - index);
126
9
}
127
128
12
double QuantileState::get_value_by_percentile(float percentile) const {
129
12
    DCHECK(percentile >= 0 && percentile <= 1);
130
12
    switch (_type) {
131
0
    case EMPTY: {
132
0
        return NAN;
133
0
    }
134
3
    case SINGLE: {
135
3
        return _single_data;
136
0
    }
137
9
    case EXPLICIT: {
138
9
        return get_explicit_value_by_percentile(percentile);
139
0
    }
140
0
    case TDIGEST: {
141
0
        return _tdigest_ptr->quantile(percentile);
142
0
    }
143
0
    default:
144
0
        break;
145
12
    }
146
0
    return NAN;
147
12
}
148
149
83.3k
bool QuantileState::deserialize(const Slice& slice) {
150
83.3k
    DCHECK(_type == EMPTY);
151
152
    // in case of insert error data caused be crashed
153
83.3k
    if (slice.data == nullptr || slice.size <= 0) {
154
0
        return false;
155
0
    }
156
    // check input is valid
157
83.3k
    if (!is_valid(slice)) {
158
0
        LOG(WARNING) << "QuantileState deserialize failed: slice is invalid";
159
0
        return false;
160
0
    }
161
162
83.3k
    const uint8_t* ptr = (uint8_t*)slice.data;
163
83.3k
    _compression = unaligned_load<float>(ptr);
164
83.3k
    ptr += sizeof(float);
165
    // first byte : type
166
83.3k
    _type = (QuantileStateType)*ptr++;
167
83.3k
    switch (_type) {
168
40.0k
    case EMPTY:
169
        // 1: empty
170
40.0k
        break;
171
20.0k
    case SINGLE: {
172
        // 2: single_data value
173
20.0k
        _single_data = unaligned_load<double>(ptr);
174
20.0k
        ptr += sizeof(double);
175
20.0k
        break;
176
0
    }
177
23.2k
    case EXPLICIT: {
178
        // 3: number of explicit values
179
        // make sure that num_explicit is positive
180
23.2k
        uint16_t num_explicits = decode_fixed16_le(ptr);
181
23.2k
        ptr += sizeof(uint16_t);
182
23.2k
        _explicit_data.reserve(std::min(num_explicits * 2, QUANTILE_STATE_EXPLICIT_NUM));
183
23.2k
        _explicit_data.resize(num_explicits);
184
23.2k
        memcpy(&_explicit_data[0], ptr, num_explicits * sizeof(double));
185
23.2k
        ptr += num_explicits * sizeof(double);
186
23.2k
        break;
187
0
    }
188
1
    case TDIGEST: {
189
        // 4: Tdigest object value
190
1
        _tdigest_ptr = std::make_shared<TDigest>(0);
191
1
        _tdigest_ptr->unserialize(ptr);
192
1
        break;
193
0
    }
194
0
    default:
195
        // revert type to EMPTY
196
0
        _type = EMPTY;
197
0
        return false;
198
83.3k
    }
199
83.3k
    return true;
200
83.3k
}
201
202
88.8k
size_t QuantileState::serialize(uint8_t* dst) const {
203
88.8k
    uint8_t* ptr = dst;
204
88.8k
    unaligned_store<float>(ptr, _compression);
205
88.8k
    ptr += sizeof(float);
206
88.8k
    switch (_type) {
207
40.0k
    case EMPTY: {
208
40.0k
        *ptr++ = EMPTY;
209
40.0k
        break;
210
0
    }
211
20.0k
    case SINGLE: {
212
20.0k
        *ptr++ = SINGLE;
213
20.0k
        unaligned_store<double>(ptr, _single_data);
214
20.0k
        ptr += sizeof(double);
215
20.0k
        break;
216
0
    }
217
28.7k
    case EXPLICIT: {
218
28.7k
        *ptr++ = EXPLICIT;
219
28.7k
        auto size = (uint16_t)_explicit_data.size();
220
28.7k
        unaligned_store<uint16_t>(ptr, size);
221
28.7k
        ptr += sizeof(uint16_t);
222
28.7k
        memcpy(ptr, &_explicit_data[0], size * sizeof(double));
223
28.7k
        ptr += size * sizeof(double);
224
28.7k
        break;
225
0
    }
226
1
    case TDIGEST: {
227
1
        *ptr++ = TDIGEST;
228
1
        size_t tdigest_size = _tdigest_ptr->serialize(ptr);
229
1
        ptr += tdigest_size;
230
1
        break;
231
0
    }
232
0
    default:
233
0
        break;
234
88.8k
    }
235
88.8k
    return ptr - dst;
236
88.8k
}
237
238
1
void QuantileState::merge(const QuantileState& other) {
239
1
    switch (other._type) {
240
0
    case EMPTY:
241
0
        break;
242
0
    case SINGLE: {
243
0
        add_value(other._single_data);
244
0
        break;
245
0
    }
246
1
    case EXPLICIT: {
247
1
        switch (_type) {
248
0
        case EMPTY:
249
0
            _type = EXPLICIT;
250
0
            _explicit_data = other._explicit_data;
251
0
            break;
252
0
        case SINGLE:
253
0
            _type = EXPLICIT;
254
0
            _explicit_data = other._explicit_data;
255
0
            add_value(_single_data);
256
0
            break;
257
1
        case EXPLICIT:
258
1
            if (_explicit_data.size() + other._explicit_data.size() > QUANTILE_STATE_EXPLICIT_NUM) {
259
0
                _type = TDIGEST;
260
0
                _tdigest_ptr = std::make_shared<TDigest>(_compression);
261
0
                for (int i = 0; i < _explicit_data.size(); i++) {
262
0
                    _tdigest_ptr->add((float)_explicit_data[i]);
263
0
                }
264
0
                for (int i = 0; i < other._explicit_data.size(); i++) {
265
0
                    _tdigest_ptr->add((float)other._explicit_data[i]);
266
0
                }
267
1
            } else {
268
1
                _explicit_data.insert(_explicit_data.end(), other._explicit_data.begin(),
269
1
                                      other._explicit_data.end());
270
1
            }
271
1
            break;
272
0
        case TDIGEST:
273
0
            for (int i = 0; i < other._explicit_data.size(); i++) {
274
0
                _tdigest_ptr->add((float)other._explicit_data[i]);
275
0
            }
276
0
            break;
277
0
        default:
278
0
            break;
279
1
        }
280
1
        break;
281
1
    }
282
1
    case TDIGEST: {
283
0
        switch (_type) {
284
0
        case EMPTY:
285
0
            _type = TDIGEST;
286
0
            _tdigest_ptr = other._tdigest_ptr;
287
0
            break;
288
0
        case SINGLE:
289
0
            _type = TDIGEST;
290
0
            _tdigest_ptr = other._tdigest_ptr;
291
0
            _tdigest_ptr->add((float)_single_data);
292
0
            break;
293
0
        case EXPLICIT:
294
0
            _type = TDIGEST;
295
0
            _tdigest_ptr = other._tdigest_ptr;
296
0
            for (int i = 0; i < _explicit_data.size(); i++) {
297
0
                _tdigest_ptr->add((float)_explicit_data[i]);
298
0
            }
299
0
            break;
300
0
        case TDIGEST:
301
0
            _tdigest_ptr->merge(other._tdigest_ptr.get());
302
0
            break;
303
0
        default:
304
0
            break;
305
0
        }
306
0
        break;
307
0
    }
308
0
    default:
309
0
        return;
310
1
    }
311
1
}
312
313
1.56M
void QuantileState::add_value(const double& value) {
314
1.56M
    switch (_type) {
315
25.3k
    case EMPTY:
316
25.3k
        _single_data = value;
317
25.3k
        _type = SINGLE;
318
25.3k
        break;
319
23.2k
    case SINGLE:
320
23.2k
        _explicit_data.emplace_back(_single_data);
321
23.2k
        _explicit_data.emplace_back(value);
322
23.2k
        _type = EXPLICIT;
323
23.2k
        break;
324
1.51M
    case EXPLICIT:
325
1.51M
        if (_explicit_data.size() == QUANTILE_STATE_EXPLICIT_NUM) {
326
1
            _tdigest_ptr = std::make_shared<TDigest>(_compression);
327
2.04k
            for (int i = 0; i < _explicit_data.size(); i++) {
328
2.04k
                _tdigest_ptr->add((float)_explicit_data[i]);
329
2.04k
            }
330
1
            _explicit_data.clear();
331
1
            _explicit_data.shrink_to_fit();
332
1
            _type = TDIGEST;
333
334
1.51M
        } else {
335
1.51M
            _explicit_data.emplace_back(value);
336
1.51M
        }
337
1.51M
        break;
338
1
    case TDIGEST:
339
1
        _tdigest_ptr->add((float)value);
340
1
        break;
341
1.56M
    }
342
1.56M
}
343
344
0
void QuantileState::clear() {
345
0
    _type = EMPTY;
346
0
    _tdigest_ptr.reset();
347
0
    _explicit_data.clear();
348
0
    _explicit_data.shrink_to_fit();
349
0
}
350
351
} // namespace doris