Coverage Report

Created: 2024-11-21 23:52

/root/doris/be/src/util/quantile_state.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#include "util/quantile_state.h"
18
19
#include <string.h>
20
21
#include <cmath>
22
#include <ostream>
23
#include <utility>
24
25
#include "common/logging.h"
26
#include "util/coding.h"
27
#include "util/slice.h"
28
#include "util/tdigest.h"
29
30
namespace doris {
31
32
122k
QuantileState::QuantileState() : _type(EMPTY), _compression(QUANTILE_STATE_COMPRESSION_MIN) {}
33
34
0
QuantileState::QuantileState(float compression) : _type(EMPTY), _compression(compression) {}
35
36
0
QuantileState::QuantileState(const Slice& slice) {
37
0
    if (!deserialize(slice)) {
38
0
        _type = EMPTY;
39
0
    }
40
0
}
41
42
360k
size_t QuantileState::get_serialized_size() {
43
360k
    size_t size = 1 + sizeof(float); // type(QuantileStateType) + compression(float)
44
360k
    switch (_type) {
45
200k
    case EMPTY:
46
200k
        break;
47
80.0k
    case SINGLE:
48
80.0k
        size += sizeof(double);
49
80.0k
        break;
50
79.9k
    case EXPLICIT:
51
79.9k
        size += sizeof(uint16_t) + sizeof(double) * _explicit_data.size();
52
79.9k
        break;
53
4
    case TDIGEST:
54
4
        size += _tdigest_ptr->serialized_size();
55
4
        break;
56
360k
    }
57
360k
    return size;
58
360k
}
59
60
0
void QuantileState::set_compression(float compression) {
61
0
    DCHECK(compression >= QUANTILE_STATE_COMPRESSION_MIN &&
62
0
           compression <= QUANTILE_STATE_COMPRESSION_MAX);
63
0
    this->_compression = compression;
64
0
}
65
66
80.0k
bool QuantileState::is_valid(const Slice& slice) {
67
80.0k
    if (slice.size < 1) {
68
0
        return false;
69
0
    }
70
80.0k
    const uint8_t* ptr = (uint8_t*)slice.data;
71
80.0k
    const uint8_t* end = (uint8_t*)slice.data + slice.size;
72
80.0k
    float compress_value = *reinterpret_cast<const float*>(ptr);
73
80.0k
    if (compress_value < QUANTILE_STATE_COMPRESSION_MIN ||
74
80.0k
        compress_value > QUANTILE_STATE_COMPRESSION_MAX) {
75
0
        return false;
76
0
    }
77
80.0k
    ptr += sizeof(float);
78
79
80.0k
    auto type = (QuantileStateType)*ptr++;
80
80.0k
    switch (type) {
81
40.0k
    case EMPTY:
82
40.0k
        break;
83
20.0k
    case SINGLE: {
84
20.0k
        if ((ptr + sizeof(double)) > end) {
85
0
            return false;
86
0
        }
87
20.0k
        ptr += sizeof(double);
88
20.0k
        break;
89
20.0k
    }
90
19.9k
    case EXPLICIT: {
91
19.9k
        if ((ptr + sizeof(uint16_t)) > end) {
92
0
            return false;
93
0
        }
94
19.9k
        uint16_t num_explicits = decode_fixed16_le(ptr);
95
19.9k
        ptr += sizeof(uint16_t);
96
19.9k
        ptr += num_explicits * sizeof(double);
97
19.9k
        break;
98
19.9k
    }
99
1
    case TDIGEST: {
100
1
        if ((ptr + sizeof(uint32_t)) > end) {
101
0
            return false;
102
0
        }
103
1
        uint32_t tdigest_serialized_length = decode_fixed32_le(ptr);
104
1
        ptr += tdigest_serialized_length;
105
1
        break;
106
1
    }
107
0
    default:
108
0
        return false;
109
80.0k
    }
110
80.0k
    return ptr == end;
111
80.0k
}
112
113
9
double QuantileState::get_explicit_value_by_percentile(float percentile) const {
114
9
    DCHECK(_type == EXPLICIT);
115
9
    int n = _explicit_data.size();
116
9
    std::vector<double> sorted_data(_explicit_data.begin(), _explicit_data.end());
117
9
    std::sort(sorted_data.begin(), sorted_data.end());
118
119
9
    double index = (n - 1) * percentile;
120
9
    int intIdx = (int)index;
121
9
    if (intIdx == n - 1) {
122
3
        return sorted_data[intIdx];
123
3
    }
124
6
    return sorted_data[intIdx + 1] * (index - intIdx) + sorted_data[intIdx] * (intIdx + 1 - index);
125
9
}
126
127
9
double QuantileState::get_value_by_percentile(float percentile) const {
128
9
    DCHECK(percentile >= 0 && percentile <= 1);
129
9
    switch (_type) {
130
0
    case EMPTY: {
131
0
        return NAN;
132
0
    }
133
0
    case SINGLE: {
134
0
        return _single_data;
135
0
    }
136
9
    case EXPLICIT: {
137
9
        return get_explicit_value_by_percentile(percentile);
138
0
    }
139
0
    case TDIGEST: {
140
0
        return _tdigest_ptr->quantile(percentile);
141
0
    }
142
0
    default:
143
0
        break;
144
9
    }
145
0
    return NAN;
146
9
}
147
148
80.0k
bool QuantileState::deserialize(const Slice& slice) {
149
80.0k
    DCHECK(_type == EMPTY);
150
151
    // in case of insert error data caused be crashed
152
80.0k
    if (slice.data == nullptr || slice.size <= 0) {
153
0
        return false;
154
0
    }
155
    // check input is valid
156
80.0k
    if (!is_valid(slice)) {
157
0
        LOG(WARNING) << "QuantileState deserialize failed: slice is invalid";
158
0
        return false;
159
0
    }
160
161
80.0k
    const uint8_t* ptr = (uint8_t*)slice.data;
162
80.0k
    _compression = *reinterpret_cast<const float*>(ptr);
163
80.0k
    ptr += sizeof(float);
164
    // first byte : type
165
80.0k
    _type = (QuantileStateType)*ptr++;
166
80.0k
    switch (_type) {
167
40.0k
    case EMPTY:
168
        // 1: empty
169
40.0k
        break;
170
20.0k
    case SINGLE: {
171
        // 2: single_data value
172
20.0k
        _single_data = *reinterpret_cast<const double*>(ptr);
173
20.0k
        ptr += sizeof(double);
174
20.0k
        break;
175
0
    }
176
19.9k
    case EXPLICIT: {
177
        // 3: number of explicit values
178
        // make sure that num_explicit is positive
179
19.9k
        uint16_t num_explicits = decode_fixed16_le(ptr);
180
19.9k
        ptr += sizeof(uint16_t);
181
19.9k
        _explicit_data.reserve(std::min(num_explicits * 2, QUANTILE_STATE_EXPLICIT_NUM));
182
19.9k
        _explicit_data.resize(num_explicits);
183
19.9k
        memcpy(&_explicit_data[0], ptr, num_explicits * sizeof(double));
184
19.9k
        ptr += num_explicits * sizeof(double);
185
19.9k
        break;
186
0
    }
187
1
    case TDIGEST: {
188
        // 4: Tdigest object value
189
1
        _tdigest_ptr = std::make_shared<TDigest>(0);
190
1
        _tdigest_ptr->unserialize(ptr);
191
1
        break;
192
0
    }
193
0
    default:
194
        // revert type to EMPTY
195
0
        _type = EMPTY;
196
0
        return false;
197
80.0k
    }
198
80.0k
    return true;
199
80.0k
}
200
201
80.0k
size_t QuantileState::serialize(uint8_t* dst) const {
202
80.0k
    uint8_t* ptr = dst;
203
80.0k
    *reinterpret_cast<float*>(ptr) = _compression;
204
80.0k
    ptr += sizeof(float);
205
80.0k
    switch (_type) {
206
40.0k
    case EMPTY: {
207
40.0k
        *ptr++ = EMPTY;
208
40.0k
        break;
209
0
    }
210
20.0k
    case SINGLE: {
211
20.0k
        *ptr++ = SINGLE;
212
20.0k
        *reinterpret_cast<double*>(ptr) = _single_data;
213
20.0k
        ptr += sizeof(double);
214
20.0k
        break;
215
0
    }
216
19.9k
    case EXPLICIT: {
217
19.9k
        *ptr++ = EXPLICIT;
218
19.9k
        uint16_t size = _explicit_data.size();
219
19.9k
        *reinterpret_cast<uint16_t*>(ptr) = size;
220
19.9k
        ptr += sizeof(uint16_t);
221
19.9k
        memcpy(ptr, &_explicit_data[0], size * sizeof(double));
222
19.9k
        ptr += size * sizeof(double);
223
19.9k
        break;
224
0
    }
225
1
    case TDIGEST: {
226
1
        *ptr++ = TDIGEST;
227
1
        size_t tdigest_size = _tdigest_ptr->serialize(ptr);
228
1
        ptr += tdigest_size;
229
1
        break;
230
0
    }
231
0
    default:
232
0
        break;
233
80.0k
    }
234
80.0k
    return ptr - dst;
235
80.0k
}
236
237
1
void QuantileState::merge(const QuantileState& other) {
238
1
    switch (other._type) {
239
0
    case EMPTY:
240
0
        break;
241
0
    case SINGLE: {
242
0
        add_value(other._single_data);
243
0
        break;
244
0
    }
245
1
    case EXPLICIT: {
246
1
        switch (_type) {
247
0
        case EMPTY:
248
0
            _type = EXPLICIT;
249
0
            _explicit_data = other._explicit_data;
250
0
            break;
251
0
        case SINGLE:
252
0
            _type = EXPLICIT;
253
0
            _explicit_data = other._explicit_data;
254
0
            add_value(_single_data);
255
0
            break;
256
1
        case EXPLICIT:
257
1
            if (_explicit_data.size() + other._explicit_data.size() > QUANTILE_STATE_EXPLICIT_NUM) {
258
0
                _type = TDIGEST;
259
0
                _tdigest_ptr = std::make_shared<TDigest>(_compression);
260
0
                for (int i = 0; i < _explicit_data.size(); i++) {
261
0
                    _tdigest_ptr->add(_explicit_data[i]);
262
0
                }
263
0
                for (int i = 0; i < other._explicit_data.size(); i++) {
264
0
                    _tdigest_ptr->add(other._explicit_data[i]);
265
0
                }
266
1
            } else {
267
1
                _explicit_data.insert(_explicit_data.end(), other._explicit_data.begin(),
268
1
                                      other._explicit_data.end());
269
1
            }
270
1
            break;
271
0
        case TDIGEST:
272
0
            for (int i = 0; i < other._explicit_data.size(); i++) {
273
0
                _tdigest_ptr->add(other._explicit_data[i]);
274
0
            }
275
0
            break;
276
0
        default:
277
0
            break;
278
1
        }
279
1
        break;
280
1
    }
281
1
    case TDIGEST: {
282
0
        switch (_type) {
283
0
        case EMPTY:
284
0
            _type = TDIGEST;
285
0
            _tdigest_ptr = other._tdigest_ptr;
286
0
            break;
287
0
        case SINGLE:
288
0
            _type = TDIGEST;
289
0
            _tdigest_ptr = other._tdigest_ptr;
290
0
            _tdigest_ptr->add(_single_data);
291
0
            break;
292
0
        case EXPLICIT:
293
0
            _type = TDIGEST;
294
0
            _tdigest_ptr = other._tdigest_ptr;
295
0
            for (int i = 0; i < _explicit_data.size(); i++) {
296
0
                _tdigest_ptr->add(_explicit_data[i]);
297
0
            }
298
0
            break;
299
0
        case TDIGEST:
300
0
            _tdigest_ptr->merge(other._tdigest_ptr.get());
301
0
            break;
302
0
        default:
303
0
            break;
304
0
        }
305
0
        break;
306
0
    }
307
0
    default:
308
0
        return;
309
1
    }
310
1
}
311
312
44.1k
void QuantileState::add_value(const double& value) {
313
44.1k
    switch (_type) {
314
22.0k
    case EMPTY:
315
22.0k
        _single_data = value;
316
22.0k
        _type = SINGLE;
317
22.0k
        break;
318
20.0k
    case SINGLE:
319
20.0k
        _explicit_data.emplace_back(_single_data);
320
20.0k
        _explicit_data.emplace_back(value);
321
20.0k
        _type = EXPLICIT;
322
20.0k
        break;
323
2.05k
    case EXPLICIT:
324
2.05k
        if (_explicit_data.size() == QUANTILE_STATE_EXPLICIT_NUM) {
325
1
            _tdigest_ptr = std::make_shared<TDigest>(_compression);
326
2.04k
            for (int i = 0; i < _explicit_data.size(); i++) {
327
2.04k
                _tdigest_ptr->add(_explicit_data[i]);
328
2.04k
            }
329
1
            _explicit_data.clear();
330
1
            _explicit_data.shrink_to_fit();
331
1
            _type = TDIGEST;
332
333
2.05k
        } else {
334
2.05k
            _explicit_data.emplace_back(value);
335
2.05k
        }
336
2.05k
        break;
337
1
    case TDIGEST:
338
1
        _tdigest_ptr->add(value);
339
1
        break;
340
44.1k
    }
341
44.1k
}
342
343
0
void QuantileState::clear() {
344
0
    _type = EMPTY;
345
0
    _tdigest_ptr.reset();
346
0
    _explicit_data.clear();
347
0
    _explicit_data.shrink_to_fit();
348
0
}
349
350
} // namespace doris