Coverage Report

Created: 2024-11-18 10:37

/root/doris/be/src/util/quantile_state.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
#include "util/quantile_state.h"
18
19
#include <string.h>
20
21
#include <cmath>
22
#include <ostream>
23
#include <utility>
24
25
#include "common/logging.h"
26
#include "util/coding.h"
27
#include "util/slice.h"
28
#include "util/tdigest.h"
29
30
namespace doris {
31
32
template <typename T>
33
82.0k
QuantileState<T>::QuantileState() : _type(EMPTY), _compression(QUANTILE_STATE_COMPRESSION_MIN) {}
34
35
template <typename T>
36
0
QuantileState<T>::QuantileState(float compression) : _type(EMPTY), _compression(compression) {}
37
38
template <typename T>
39
0
QuantileState<T>::QuantileState(const Slice& slice) {
40
0
    if (!deserialize(slice)) {
41
0
        _type = EMPTY;
42
0
    }
43
0
}
44
45
template <typename T>
46
240k
size_t QuantileState<T>::get_serialized_size() {
47
240k
    size_t size = 1 + sizeof(float); // type(QuantileStateType) + compression(float)
48
240k
    switch (_type) {
49
80.0k
    case EMPTY:
50
80.0k
        break;
51
80.0k
    case SINGLE:
52
80.0k
        size += sizeof(T);
53
80.0k
        break;
54
79.9k
    case EXPLICIT:
55
79.9k
        size += sizeof(uint16_t) + sizeof(T) * _explicit_data.size();
56
79.9k
        break;
57
4
    case TDIGEST:
58
4
        size += _tdigest_ptr->serialized_size();
59
4
        break;
60
240k
    }
61
240k
    return size;
62
240k
}
63
64
template <typename T>
65
0
void QuantileState<T>::set_compression(float compression) {
66
0
    DCHECK(compression >= QUANTILE_STATE_COMPRESSION_MIN &&
67
0
           compression <= QUANTILE_STATE_COMPRESSION_MAX);
68
0
    this->_compression = compression;
69
0
}
70
71
template <typename T>
72
60.0k
bool QuantileState<T>::is_valid(const Slice& slice) {
73
60.0k
    if (slice.size < 1) {
74
0
        return false;
75
0
    }
76
60.0k
    const uint8_t* ptr = (uint8_t*)slice.data;
77
60.0k
    const uint8_t* end = (uint8_t*)slice.data + slice.size;
78
60.0k
    float compress_value = *reinterpret_cast<const float*>(ptr);
79
60.0k
    if (compress_value < QUANTILE_STATE_COMPRESSION_MIN ||
80
60.0k
        compress_value > QUANTILE_STATE_COMPRESSION_MAX) {
81
0
        return false;
82
0
    }
83
60.0k
    ptr += sizeof(float);
84
85
60.0k
    auto type = (QuantileStateType)*ptr++;
86
60.0k
    switch (type) {
87
20.0k
    case EMPTY:
88
20.0k
        break;
89
20.0k
    case SINGLE: {
90
20.0k
        if ((ptr + sizeof(T)) > end) {
91
0
            return false;
92
0
        }
93
20.0k
        ptr += sizeof(T);
94
20.0k
        break;
95
20.0k
    }
96
19.9k
    case EXPLICIT: {
97
19.9k
        if ((ptr + sizeof(uint16_t)) > end) {
98
0
            return false;
99
0
        }
100
19.9k
        uint16_t num_explicits = decode_fixed16_le(ptr);
101
19.9k
        ptr += sizeof(uint16_t);
102
19.9k
        ptr += num_explicits * sizeof(T);
103
19.9k
        break;
104
19.9k
    }
105
1
    case TDIGEST: {
106
1
        if ((ptr + sizeof(uint32_t)) > end) {
107
0
            return false;
108
0
        }
109
1
        uint32_t tdigest_serialized_length = decode_fixed32_le(ptr);
110
1
        ptr += tdigest_serialized_length;
111
1
        break;
112
1
    }
113
0
    default:
114
0
        return false;
115
60.0k
    }
116
60.0k
    return ptr == end;
117
60.0k
}
118
119
template <typename T>
120
9
T QuantileState<T>::get_explicit_value_by_percentile(float percentile) const {
121
9
    DCHECK(_type == EXPLICIT);
122
9
    int n = _explicit_data.size();
123
9
    std::vector<T> sorted_data(_explicit_data.begin(), _explicit_data.end());
124
9
    std::sort(sorted_data.begin(), sorted_data.end());
125
126
9
    double index = (n - 1) * percentile;
127
9
    int intIdx = (int)index;
128
9
    if (intIdx == n - 1) {
129
3
        return sorted_data[intIdx];
130
3
    }
131
6
    return sorted_data[intIdx + 1] * (index - intIdx) + sorted_data[intIdx] * (intIdx + 1 - index);
132
9
}
133
134
template <typename T>
135
9
T QuantileState<T>::get_value_by_percentile(float percentile) const {
136
9
    DCHECK(percentile >= 0 && percentile <= 1);
137
9
    switch (_type) {
138
0
    case EMPTY: {
139
0
        return NAN;
140
0
    }
141
0
    case SINGLE: {
142
0
        return _single_data;
143
0
    }
144
9
    case EXPLICIT: {
145
9
        return get_explicit_value_by_percentile(percentile);
146
0
    }
147
0
    case TDIGEST: {
148
0
        return _tdigest_ptr->quantile(percentile);
149
0
    }
150
0
    default:
151
0
        break;
152
9
    }
153
0
    return NAN;
154
9
}
155
156
template <typename T>
157
60.0k
bool QuantileState<T>::deserialize(const Slice& slice) {
158
60.0k
    DCHECK(_type == EMPTY);
159
160
    // in case of insert error data caused be crashed
161
60.0k
    if (slice.data == nullptr || slice.size <= 0) {
162
0
        return false;
163
0
    }
164
    // check input is valid
165
60.0k
    if (!is_valid(slice)) {
166
0
        LOG(WARNING) << "QuantileState deserialize failed: slice is invalid";
167
0
        return false;
168
0
    }
169
170
60.0k
    const uint8_t* ptr = (uint8_t*)slice.data;
171
60.0k
    _compression = *reinterpret_cast<const float*>(ptr);
172
60.0k
    ptr += sizeof(float);
173
    // first byte : type
174
60.0k
    _type = (QuantileStateType)*ptr++;
175
60.0k
    switch (_type) {
176
20.0k
    case EMPTY:
177
        // 1: empty
178
20.0k
        break;
179
20.0k
    case SINGLE: {
180
        // 2: single_data value
181
20.0k
        _single_data = *reinterpret_cast<const T*>(ptr);
182
20.0k
        ptr += sizeof(T);
183
20.0k
        break;
184
0
    }
185
19.9k
    case EXPLICIT: {
186
        // 3: number of explicit values
187
        // make sure that num_explicit is positive
188
19.9k
        uint16_t num_explicits = decode_fixed16_le(ptr);
189
19.9k
        ptr += sizeof(uint16_t);
190
19.9k
        _explicit_data.reserve(std::min(num_explicits * 2, QUANTILE_STATE_EXPLICIT_NUM));
191
19.9k
        _explicit_data.resize(num_explicits);
192
19.9k
        memcpy(&_explicit_data[0], ptr, num_explicits * sizeof(T));
193
19.9k
        ptr += num_explicits * sizeof(T);
194
19.9k
        break;
195
0
    }
196
1
    case TDIGEST: {
197
        // 4: Tdigest object value
198
1
        _tdigest_ptr = std::make_shared<TDigest>(0);
199
1
        _tdigest_ptr->unserialize(ptr);
200
1
        break;
201
0
    }
202
0
    default:
203
        // revert type to EMPTY
204
0
        _type = EMPTY;
205
0
        return false;
206
60.0k
    }
207
60.0k
    return true;
208
60.0k
}
209
210
template <typename T>
211
60.0k
size_t QuantileState<T>::serialize(uint8_t* dst) const {
212
60.0k
    uint8_t* ptr = dst;
213
60.0k
    *reinterpret_cast<float*>(ptr) = _compression;
214
60.0k
    ptr += sizeof(float);
215
60.0k
    switch (_type) {
216
20.0k
    case EMPTY: {
217
20.0k
        *ptr++ = EMPTY;
218
20.0k
        break;
219
0
    }
220
20.0k
    case SINGLE: {
221
20.0k
        *ptr++ = SINGLE;
222
20.0k
        *reinterpret_cast<T*>(ptr) = _single_data;
223
20.0k
        ptr += sizeof(T);
224
20.0k
        break;
225
0
    }
226
19.9k
    case EXPLICIT: {
227
19.9k
        *ptr++ = EXPLICIT;
228
19.9k
        uint16_t size = _explicit_data.size();
229
19.9k
        *reinterpret_cast<uint16_t*>(ptr) = size;
230
19.9k
        ptr += sizeof(uint16_t);
231
19.9k
        memcpy(ptr, &_explicit_data[0], size * sizeof(T));
232
19.9k
        ptr += size * sizeof(T);
233
19.9k
        break;
234
0
    }
235
1
    case TDIGEST: {
236
1
        *ptr++ = TDIGEST;
237
1
        size_t tdigest_size = _tdigest_ptr->serialize(ptr);
238
1
        ptr += tdigest_size;
239
1
        break;
240
0
    }
241
0
    default:
242
0
        break;
243
60.0k
    }
244
60.0k
    return ptr - dst;
245
60.0k
}
246
247
template <typename T>
248
1
void QuantileState<T>::merge(const QuantileState<T>& other) {
249
1
    switch (other._type) {
250
0
    case EMPTY:
251
0
        break;
252
0
    case SINGLE: {
253
0
        add_value(other._single_data);
254
0
        break;
255
0
    }
256
1
    case EXPLICIT: {
257
1
        switch (_type) {
258
0
        case EMPTY:
259
0
            _type = EXPLICIT;
260
0
            _explicit_data = other._explicit_data;
261
0
            break;
262
0
        case SINGLE:
263
0
            _type = EXPLICIT;
264
0
            _explicit_data = other._explicit_data;
265
0
            add_value(_single_data);
266
0
            break;
267
1
        case EXPLICIT:
268
1
            if (_explicit_data.size() + other._explicit_data.size() > QUANTILE_STATE_EXPLICIT_NUM) {
269
0
                _type = TDIGEST;
270
0
                _tdigest_ptr = std::make_shared<TDigest>(_compression);
271
0
                for (int i = 0; i < _explicit_data.size(); i++) {
272
0
                    _tdigest_ptr->add(_explicit_data[i]);
273
0
                }
274
0
                for (int i = 0; i < other._explicit_data.size(); i++) {
275
0
                    _tdigest_ptr->add(other._explicit_data[i]);
276
0
                }
277
1
            } else {
278
1
                _explicit_data.insert(_explicit_data.end(), other._explicit_data.begin(),
279
1
                                      other._explicit_data.end());
280
1
            }
281
1
            break;
282
0
        case TDIGEST:
283
0
            for (int i = 0; i < other._explicit_data.size(); i++) {
284
0
                _tdigest_ptr->add(other._explicit_data[i]);
285
0
            }
286
0
            break;
287
0
        default:
288
0
            break;
289
1
        }
290
1
        break;
291
1
    }
292
1
    case TDIGEST: {
293
0
        switch (_type) {
294
0
        case EMPTY:
295
0
            _type = TDIGEST;
296
0
            _tdigest_ptr = std::move(other._tdigest_ptr);
297
0
            break;
298
0
        case SINGLE:
299
0
            _type = TDIGEST;
300
0
            _tdigest_ptr = std::move(other._tdigest_ptr);
301
0
            _tdigest_ptr->add(_single_data);
302
0
            break;
303
0
        case EXPLICIT:
304
0
            _type = TDIGEST;
305
0
            _tdigest_ptr = std::move(other._tdigest_ptr);
306
0
            for (int i = 0; i < _explicit_data.size(); i++) {
307
0
                _tdigest_ptr->add(_explicit_data[i]);
308
0
            }
309
0
            break;
310
0
        case TDIGEST:
311
0
            _tdigest_ptr->merge(other._tdigest_ptr.get());
312
0
            break;
313
0
        default:
314
0
            break;
315
0
        }
316
0
        break;
317
0
    }
318
0
    default:
319
0
        return;
320
1
    }
321
1
}
322
323
template <typename T>
324
44.1k
void QuantileState<T>::add_value(const T& value) {
325
44.1k
    switch (_type) {
326
22.0k
    case EMPTY:
327
22.0k
        _single_data = value;
328
22.0k
        _type = SINGLE;
329
22.0k
        break;
330
20.0k
    case SINGLE:
331
20.0k
        _explicit_data.emplace_back(_single_data);
332
20.0k
        _explicit_data.emplace_back(value);
333
20.0k
        _type = EXPLICIT;
334
20.0k
        break;
335
2.05k
    case EXPLICIT:
336
2.05k
        if (_explicit_data.size() == QUANTILE_STATE_EXPLICIT_NUM) {
337
1
            _tdigest_ptr = std::make_shared<TDigest>(_compression);
338
2.04k
            for (int i = 0; i < _explicit_data.size(); i++) {
339
2.04k
                _tdigest_ptr->add(_explicit_data[i]);
340
2.04k
            }
341
1
            _explicit_data.clear();
342
1
            _explicit_data.shrink_to_fit();
343
1
            _type = TDIGEST;
344
345
2.05k
        } else {
346
2.05k
            _explicit_data.emplace_back(value);
347
2.05k
        }
348
2.05k
        break;
349
1
    case TDIGEST:
350
1
        _tdigest_ptr->add(value);
351
1
        break;
352
44.1k
    }
353
44.1k
}
354
355
template <typename T>
356
0
void QuantileState<T>::clear() {
357
0
    _type = EMPTY;
358
0
    _tdigest_ptr.reset();
359
0
    _explicit_data.clear();
360
0
    _explicit_data.shrink_to_fit();
361
0
}
362
363
template class QuantileState<double>;
364
365
} // namespace doris