Coverage Report

Created: 2025-07-26 17:51

/root/doris/be/src/util/quantile_state.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#include "util/quantile_state.h"
18
19
#include <string.h>
20
21
#include <cmath>
22
#include <ostream>
23
#include <utility>
24
25
#include "common/logging.h"
26
#include "util/coding.h"
27
#include "util/slice.h"
28
#include "util/tdigest.h"
29
30
namespace doris {
31
32
122k
QuantileState::QuantileState() : _type(EMPTY), _compression(QUANTILE_STATE_COMPRESSION_MIN) {}
33
34
0
QuantileState::QuantileState(float compression) : _type(EMPTY), _compression(compression) {}
35
36
0
QuantileState::QuantileState(const Slice& slice) {
37
0
    if (!deserialize(slice)) {
  Branch (37:9): [True: 0, False: 0]
38
0
        _type = EMPTY;
39
0
    }
40
0
}
41
42
360k
size_t QuantileState::get_serialized_size() {
43
360k
    size_t size = 1 + sizeof(float); // type(QuantileStateType) + compression(float)
44
360k
    switch (_type) {
  Branch (44:13): [True: 0, False: 360k]
45
200k
    case EMPTY:
  Branch (45:5): [True: 200k, False: 160k]
46
200k
        break;
47
80.0k
    case SINGLE:
  Branch (47:5): [True: 80.0k, False: 280k]
48
80.0k
        size += sizeof(double);
49
80.0k
        break;
50
79.9k
    case EXPLICIT:
  Branch (50:5): [True: 79.9k, False: 280k]
51
79.9k
        size += sizeof(uint16_t) + sizeof(double) * _explicit_data.size();
52
79.9k
        break;
53
4
    case TDIGEST:
  Branch (53:5): [True: 4, False: 359k]
54
4
        size += _tdigest_ptr->serialized_size();
55
4
        break;
56
360k
    }
57
360k
    return size;
58
360k
}
59
60
0
void QuantileState::set_compression(float compression) {
61
0
    DCHECK(compression >= QUANTILE_STATE_COMPRESSION_MIN &&
62
0
           compression <= QUANTILE_STATE_COMPRESSION_MAX);
63
0
    this->_compression = compression;
64
0
}
65
66
80.0k
bool QuantileState::is_valid(const Slice& slice) {
67
80.0k
    if (slice.size < 1) {
  Branch (67:9): [True: 0, False: 80.0k]
68
0
        return false;
69
0
    }
70
80.0k
    const uint8_t* ptr = (uint8_t*)slice.data;
71
80.0k
    const uint8_t* end = (uint8_t*)slice.data + slice.size;
72
80.0k
    float compress_value = *reinterpret_cast<const float*>(ptr);
73
80.0k
    if (compress_value < QUANTILE_STATE_COMPRESSION_MIN ||
  Branch (73:9): [True: 0, False: 80.0k]
74
80.0k
        compress_value > QUANTILE_STATE_COMPRESSION_MAX) {
  Branch (74:9): [True: 0, False: 80.0k]
75
0
        return false;
76
0
    }
77
80.0k
    ptr += sizeof(float);
78
79
80.0k
    auto type = (QuantileStateType)*ptr++;
80
80.0k
    switch (type) {
81
40.0k
    case EMPTY:
  Branch (81:5): [True: 40.0k, False: 40.0k]
82
40.0k
        break;
83
20.0k
    case SINGLE: {
  Branch (83:5): [True: 20.0k, False: 60.0k]
84
20.0k
        if ((ptr + sizeof(double)) > end) {
  Branch (84:13): [True: 0, False: 20.0k]
85
0
            return false;
86
0
        }
87
20.0k
        ptr += sizeof(double);
88
20.0k
        break;
89
20.0k
    }
90
19.9k
    case EXPLICIT: {
  Branch (90:5): [True: 19.9k, False: 60.0k]
91
19.9k
        if ((ptr + sizeof(uint16_t)) > end) {
  Branch (91:13): [True: 0, False: 19.9k]
92
0
            return false;
93
0
        }
94
19.9k
        uint16_t num_explicits = decode_fixed16_le(ptr);
95
19.9k
        ptr += sizeof(uint16_t);
96
19.9k
        ptr += num_explicits * sizeof(double);
97
19.9k
        break;
98
19.9k
    }
99
1
    case TDIGEST: {
  Branch (99:5): [True: 1, False: 79.9k]
100
1
        if ((ptr + sizeof(uint32_t)) > end) {
  Branch (100:13): [True: 0, False: 1]
101
0
            return false;
102
0
        }
103
1
        uint32_t tdigest_serialized_length = decode_fixed32_le(ptr);
104
1
        ptr += tdigest_serialized_length;
105
1
        break;
106
1
    }
107
0
    default:
  Branch (107:5): [True: 0, False: 80.0k]
108
0
        return false;
109
80.0k
    }
110
80.0k
    return ptr == end;
111
80.0k
}
112
113
9
double QuantileState::get_explicit_value_by_percentile(float percentile) const {
114
9
    DCHECK(_type == EXPLICIT);
115
9
    int n = _explicit_data.size();
116
9
    std::vector<double> sorted_data(_explicit_data.begin(), _explicit_data.end());
117
9
    std::sort(sorted_data.begin(), sorted_data.end());
118
119
9
    double index = (n - 1) * percentile;
120
9
    int intIdx = (int)index;
121
9
    if (intIdx == n - 1) {
  Branch (121:9): [True: 3, False: 6]
122
3
        return sorted_data[intIdx];
123
3
    }
124
6
    return sorted_data[intIdx + 1] * (index - intIdx) + sorted_data[intIdx] * (intIdx + 1 - index);
125
9
}
126
127
9
double QuantileState::get_value_by_percentile(float percentile) const {
128
9
    DCHECK(percentile >= 0 && percentile <= 1);
129
9
    switch (_type) {
130
0
    case EMPTY: {
  Branch (130:5): [True: 0, False: 9]
131
0
        return NAN;
132
0
    }
133
0
    case SINGLE: {
  Branch (133:5): [True: 0, False: 9]
134
0
        return _single_data;
135
0
    }
136
9
    case EXPLICIT: {
  Branch (136:5): [True: 9, False: 0]
137
9
        return get_explicit_value_by_percentile(percentile);
138
0
    }
139
0
    case TDIGEST: {
  Branch (139:5): [True: 0, False: 9]
140
0
        return _tdigest_ptr->quantile(percentile);
141
0
    }
142
0
    default:
  Branch (142:5): [True: 0, False: 9]
143
0
        break;
144
9
    }
145
0
    return NAN;
146
9
}
147
148
80.0k
bool QuantileState::deserialize(const Slice& slice) {
149
80.0k
    DCHECK(_type == EMPTY);
150
151
    // in case of insert error data caused be crashed
152
80.0k
    if (slice.data == nullptr || slice.size <= 0) {
  Branch (152:9): [True: 0, False: 80.0k]
  Branch (152:34): [True: 0, False: 80.0k]
153
0
        return false;
154
0
    }
155
    // check input is valid
156
80.0k
    if (!is_valid(slice)) {
  Branch (156:9): [True: 0, False: 80.0k]
157
0
        LOG(WARNING) << "QuantileState deserialize failed: slice is invalid";
158
0
        return false;
159
0
    }
160
161
80.0k
    const uint8_t* ptr = (uint8_t*)slice.data;
162
80.0k
    _compression = *reinterpret_cast<const float*>(ptr);
163
80.0k
    ptr += sizeof(float);
164
    // first byte : type
165
80.0k
    _type = (QuantileStateType)*ptr++;
166
80.0k
    switch (_type) {
167
40.0k
    case EMPTY:
  Branch (167:5): [True: 40.0k, False: 40.0k]
168
        // 1: empty
169
40.0k
        break;
170
20.0k
    case SINGLE: {
  Branch (170:5): [True: 20.0k, False: 60.0k]
171
        // 2: single_data value
172
20.0k
        _single_data = *reinterpret_cast<const double*>(ptr);
173
20.0k
        ptr += sizeof(double);
174
20.0k
        break;
175
0
    }
176
19.9k
    case EXPLICIT: {
  Branch (176:5): [True: 19.9k, False: 60.0k]
177
        // 3: number of explicit values
178
        // make sure that num_explicit is positive
179
19.9k
        uint16_t num_explicits = decode_fixed16_le(ptr);
180
19.9k
        ptr += sizeof(uint16_t);
181
19.9k
        _explicit_data.reserve(std::min(num_explicits * 2, QUANTILE_STATE_EXPLICIT_NUM));
182
19.9k
        _explicit_data.resize(num_explicits);
183
19.9k
        memcpy(&_explicit_data[0], ptr, num_explicits * sizeof(double));
184
19.9k
        ptr += num_explicits * sizeof(double);
185
19.9k
        break;
186
0
    }
187
1
    case TDIGEST: {
  Branch (187:5): [True: 1, False: 79.9k]
188
        // 4: Tdigest object value
189
1
        _tdigest_ptr = std::make_shared<TDigest>(0);
190
1
        _tdigest_ptr->unserialize(ptr);
191
1
        break;
192
0
    }
193
0
    default:
  Branch (193:5): [True: 0, False: 80.0k]
194
        // revert type to EMPTY
195
0
        _type = EMPTY;
196
0
        return false;
197
80.0k
    }
198
80.0k
    return true;
199
80.0k
}
200
201
80.0k
size_t QuantileState::serialize(uint8_t* dst) const {
202
80.0k
    uint8_t* ptr = dst;
203
80.0k
    *reinterpret_cast<float*>(ptr) = _compression;
204
80.0k
    ptr += sizeof(float);
205
80.0k
    switch (_type) {
206
40.0k
    case EMPTY: {
  Branch (206:5): [True: 40.0k, False: 40.0k]
207
40.0k
        *ptr++ = EMPTY;
208
40.0k
        break;
209
0
    }
210
20.0k
    case SINGLE: {
  Branch (210:5): [True: 20.0k, False: 60.0k]
211
20.0k
        *ptr++ = SINGLE;
212
20.0k
        *reinterpret_cast<double*>(ptr) = _single_data;
213
20.0k
        ptr += sizeof(double);
214
20.0k
        break;
215
0
    }
216
19.9k
    case EXPLICIT: {
  Branch (216:5): [True: 19.9k, False: 60.0k]
217
19.9k
        *ptr++ = EXPLICIT;
218
19.9k
        uint16_t size = _explicit_data.size();
219
19.9k
        *reinterpret_cast<uint16_t*>(ptr) = size;
220
19.9k
        ptr += sizeof(uint16_t);
221
19.9k
        memcpy(ptr, &_explicit_data[0], size * sizeof(double));
222
19.9k
        ptr += size * sizeof(double);
223
19.9k
        break;
224
0
    }
225
1
    case TDIGEST: {
  Branch (225:5): [True: 1, False: 79.9k]
226
1
        *ptr++ = TDIGEST;
227
1
        size_t tdigest_size = _tdigest_ptr->serialize(ptr);
228
1
        ptr += tdigest_size;
229
1
        break;
230
0
    }
231
0
    default:
  Branch (231:5): [True: 0, False: 80.0k]
232
0
        break;
233
80.0k
    }
234
80.0k
    return ptr - dst;
235
80.0k
}
236
237
1
void QuantileState::merge(const QuantileState& other) {
238
1
    switch (other._type) {
239
0
    case EMPTY:
  Branch (239:5): [True: 0, False: 1]
240
0
        break;
241
0
    case SINGLE: {
  Branch (241:5): [True: 0, False: 1]
242
0
        add_value(other._single_data);
243
0
        break;
244
0
    }
245
1
    case EXPLICIT: {
  Branch (245:5): [True: 1, False: 0]
246
1
        switch (_type) {
247
0
        case EMPTY:
  Branch (247:9): [True: 0, False: 1]
248
0
            _type = EXPLICIT;
249
0
            _explicit_data = other._explicit_data;
250
0
            break;
251
0
        case SINGLE:
  Branch (251:9): [True: 0, False: 1]
252
0
            _type = EXPLICIT;
253
0
            _explicit_data = other._explicit_data;
254
0
            add_value(_single_data);
255
0
            break;
256
1
        case EXPLICIT:
  Branch (256:9): [True: 1, False: 0]
257
1
            if (_explicit_data.size() + other._explicit_data.size() > QUANTILE_STATE_EXPLICIT_NUM) {
  Branch (257:17): [True: 0, False: 1]
258
0
                _type = TDIGEST;
259
0
                _tdigest_ptr = std::make_shared<TDigest>(_compression);
260
0
                for (int i = 0; i < _explicit_data.size(); i++) {
  Branch (260:33): [True: 0, False: 0]
261
0
                    _tdigest_ptr->add(_explicit_data[i]);
262
0
                }
263
0
                for (int i = 0; i < other._explicit_data.size(); i++) {
  Branch (263:33): [True: 0, False: 0]
264
0
                    _tdigest_ptr->add(other._explicit_data[i]);
265
0
                }
266
1
            } else {
267
1
                _explicit_data.insert(_explicit_data.end(), other._explicit_data.begin(),
268
1
                                      other._explicit_data.end());
269
1
            }
270
1
            break;
271
0
        case TDIGEST:
  Branch (271:9): [True: 0, False: 1]
272
0
            for (int i = 0; i < other._explicit_data.size(); i++) {
  Branch (272:29): [True: 0, False: 0]
273
0
                _tdigest_ptr->add(other._explicit_data[i]);
274
0
            }
275
0
            break;
276
0
        default:
  Branch (276:9): [True: 0, False: 1]
277
0
            break;
278
1
        }
279
1
        break;
280
1
    }
281
1
    case TDIGEST: {
  Branch (281:5): [True: 0, False: 1]
282
0
        switch (_type) {
283
0
        case EMPTY:
  Branch (283:9): [True: 0, False: 0]
284
0
            _type = TDIGEST;
285
0
            _tdigest_ptr = other._tdigest_ptr;
286
0
            break;
287
0
        case SINGLE:
  Branch (287:9): [True: 0, False: 0]
288
0
            _type = TDIGEST;
289
0
            _tdigest_ptr = other._tdigest_ptr;
290
0
            _tdigest_ptr->add(_single_data);
291
0
            break;
292
0
        case EXPLICIT:
  Branch (292:9): [True: 0, False: 0]
293
0
            _type = TDIGEST;
294
0
            _tdigest_ptr = other._tdigest_ptr;
295
0
            for (int i = 0; i < _explicit_data.size(); i++) {
  Branch (295:29): [True: 0, False: 0]
296
0
                _tdigest_ptr->add(_explicit_data[i]);
297
0
            }
298
0
            break;
299
0
        case TDIGEST:
  Branch (299:9): [True: 0, False: 0]
300
0
            _tdigest_ptr->merge(other._tdigest_ptr.get());
301
0
            break;
302
0
        default:
  Branch (302:9): [True: 0, False: 0]
303
0
            break;
304
0
        }
305
0
        break;
306
0
    }
307
0
    default:
  Branch (307:5): [True: 0, False: 1]
308
0
        return;
309
1
    }
310
1
}
311
312
44.1k
void QuantileState::add_value(const double& value) {
313
44.1k
    switch (_type) {
  Branch (313:13): [True: 0, False: 44.1k]
314
22.0k
    case EMPTY:
  Branch (314:5): [True: 22.0k, False: 22.0k]
315
22.0k
        _single_data = value;
316
22.0k
        _type = SINGLE;
317
22.0k
        break;
318
20.0k
    case SINGLE:
  Branch (318:5): [True: 20.0k, False: 24.1k]
319
20.0k
        _explicit_data.emplace_back(_single_data);
320
20.0k
        _explicit_data.emplace_back(value);
321
20.0k
        _type = EXPLICIT;
322
20.0k
        break;
323
2.05k
    case EXPLICIT:
  Branch (323:5): [True: 2.05k, False: 42.0k]
324
2.05k
        if (_explicit_data.size() == QUANTILE_STATE_EXPLICIT_NUM) {
  Branch (324:13): [True: 1, False: 2.05k]
325
1
            _tdigest_ptr = std::make_shared<TDigest>(_compression);
326
2.04k
            for (int i = 0; i < _explicit_data.size(); i++) {
  Branch (326:29): [True: 2.04k, False: 1]
327
2.04k
                _tdigest_ptr->add(_explicit_data[i]);
328
2.04k
            }
329
1
            _explicit_data.clear();
330
1
            _explicit_data.shrink_to_fit();
331
1
            _type = TDIGEST;
332
333
2.05k
        } else {
334
2.05k
            _explicit_data.emplace_back(value);
335
2.05k
        }
336
2.05k
        break;
337
1
    case TDIGEST:
  Branch (337:5): [True: 1, False: 44.1k]
338
1
        _tdigest_ptr->add(value);
339
1
        break;
340
44.1k
    }
341
44.1k
}
342
343
0
void QuantileState::clear() {
344
0
    _type = EMPTY;
345
0
    _tdigest_ptr.reset();
346
0
    _explicit_data.clear();
347
0
    _explicit_data.shrink_to_fit();
348
0
}
349
350
} // namespace doris