Coverage Report

Created: 2024-11-20 12:30

/root/doris/be/src/util/tdigest.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
/*
19
 * Licensed to Derrick R. Burns under one or more
20
 * contributor license agreements.  See the NOTICES file distributed with
21
 * this work for additional information regarding copyright ownership.
22
 * The ASF licenses this file to You under the Apache License, Version 2.0
23
 * (the "License"); you may not use this file except in compliance with
24
 * the License.  You may obtain a copy of the License at
25
 *
26
 *     http://www.apache.org/licenses/LICENSE-2.0
27
 *
28
 * Unless required by applicable law or agreed to in writing, software
29
 * distributed under the License is distributed on an "AS IS" BASIS,
30
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
31
 * See the License for the specific language governing permissions and
32
 * limitations under the License.
33
 */
34
35
// T-Digest :  Percentile and Quantile Estimation of Big Data
36
// A new data structure for accurate on-line accumulation of rank-based statistics
37
// such as quantiles and trimmed means.
38
// See original paper: "Computing extremely accurate quantiles using t-digest"
39
// by Ted Dunning and Otmar Ertl for more details
40
// https://github.com/tdunning/t-digest/blob/07b8f2ca2be8d0a9f04df2feadad5ddc1bb73c88/docs/t-digest-paper/histo.pdf.
41
// https://github.com/derrickburns/tdigest
42
43
#pragma once
44
45
#include <algorithm>
46
#include <cfloat>
47
#include <cmath>
48
#include <iostream>
49
#include <memory>
50
#include <queue>
51
#include <utility>
52
#include <vector>
53
54
#include "common/factory_creator.h"
55
#include "common/logging.h"
56
#include "udf/udf.h"
57
#include "util/debug_util.h"
58
#include "util/radix_sort.h"
59
60
namespace doris {
61
62
using Value = float;
63
using Weight = float;
64
using Index = size_t;
65
66
const size_t kHighWater = 40000;
67
68
class Centroid {
69
public:
70
2.04k
    Centroid() : Centroid(0.0, 0.0) {}
71
72
15.2k
    Centroid(Value mean, Weight weight) : _mean(mean), _weight(weight) {}
73
74
6.29k
    Value mean() const noexcept { return _mean; }
75
76
17.1k
    Weight weight() const noexcept { return _weight; }
77
78
136k
    Value& mean() noexcept { return _mean; }
79
80
1.20k
    Weight& weight() noexcept { return _weight; }
81
82
9.78k
    void add(const Centroid& c) {
83
9.78k
        DCHECK_GT(c._weight, 0);
84
9.78k
        if (_weight != 0.0) {
85
9.78k
            _weight += c._weight;
86
9.78k
            _mean += c._weight * (c._mean - _mean) / _weight;
87
9.78k
        } else {
88
0
            _weight = c._weight;
89
0
            _mean = c._mean;
90
0
        }
91
9.78k
    }
92
93
private:
94
    Value _mean = 0;
95
    Weight _weight = 0;
96
};
97
98
struct CentroidList {
99
1
    CentroidList(const std::vector<Centroid>& s) : iter(s.cbegin()), end(s.cend()) {}
100
    std::vector<Centroid>::const_iterator iter;
101
    std::vector<Centroid>::const_iterator end;
102
103
100
    bool advance() { return ++iter != end; }
104
};
105
106
class CentroidListComparator {
107
public:
108
2
    CentroidListComparator() {}
109
110
0
    bool operator()(const CentroidList& left, const CentroidList& right) const {
111
0
        return left.iter->mean() > right.iter->mean();
112
0
    }
113
};
114
115
using CentroidListQueue =
116
        std::priority_queue<CentroidList, std::vector<CentroidList>, CentroidListComparator>;
117
118
struct CentroidComparator {
119
3.10k
    bool operator()(const Centroid& a, const Centroid& b) const { return a.mean() < b.mean(); }
120
};
121
122
class TDigest {
123
    ENABLE_FACTORY_CREATOR(TDigest);
124
    struct TDigestRadixSortTraits {
125
        using Element = Centroid;
126
        using Key = Value;
127
        using CountType = uint32_t;
128
        using KeyBits = uint32_t;
129
130
        static constexpr size_t PART_SIZE_BITS = 8;
131
132
        using Transform = RadixSortFloatTransform<KeyBits>;
133
        using Allocator = RadixSortMallocAllocator;
134
135
133k
        static Key& extractKey(Element& elem) { return elem.mean(); }
136
    };
137
138
    class TDigestComparator {
139
    public:
140
2
        TDigestComparator() {}
141
142
0
        bool operator()(const TDigest* left, const TDigest* right) const {
143
0
            return left->totalSize() > right->totalSize();
144
0
        }
145
    };
146
    using TDigestQueue =
147
            std::priority_queue<const TDigest*, std::vector<const TDigest*>, TDigestComparator>;
148
149
public:
150
0
    TDigest() : TDigest(10000) {}
151
152
13
    explicit TDigest(Value compression) : TDigest(compression, 0) {}
153
154
13
    TDigest(Value compression, Index bufferSize) : TDigest(compression, bufferSize, 0) {}
155
156
    TDigest(Value compression, Index unmergedSize, Index mergedSize)
157
            : _compression(compression),
158
              _max_processed(processedSize(mergedSize, compression)),
159
13
              _max_unprocessed(unprocessedSize(unmergedSize, compression)) {
160
13
        _processed.reserve(_max_processed);
161
13
        _unprocessed.reserve(_max_unprocessed + 1);
162
13
    }
163
164
    TDigest(std::vector<Centroid>&& processed, std::vector<Centroid>&& unprocessed,
165
            Value compression, Index unmergedSize, Index mergedSize)
166
0
            : TDigest(compression, unmergedSize, mergedSize) {
167
0
        _processed = std::move(processed);
168
0
        _unprocessed = std::move(unprocessed);
169
0
170
0
        _processed_weight = weight(_processed);
171
0
        _unprocessed_weight = weight(_unprocessed);
172
0
        if (_processed.size() > 0) {
173
0
            _min = std::min(_min, _processed[0].mean());
174
0
            _max = std::max(_max, (_processed.cend() - 1)->mean());
175
0
        }
176
0
        updateCumulative();
177
0
    }
178
179
0
    static Weight weight(std::vector<Centroid>& centroids) noexcept {
180
0
        Weight w = 0.0;
181
0
        for (auto centroid : centroids) {
182
0
            w += centroid.weight();
183
0
        }
184
0
        return w;
185
0
    }
186
187
0
    TDigest& operator=(TDigest&& o) {
188
0
        _compression = o._compression;
189
0
        _max_processed = o._max_processed;
190
0
        _max_unprocessed = o._max_unprocessed;
191
0
        _processed_weight = o._processed_weight;
192
0
        _unprocessed_weight = o._unprocessed_weight;
193
0
        _processed = std::move(o._processed);
194
0
        _unprocessed = std::move(o._unprocessed);
195
0
        _cumulative = std::move(o._cumulative);
196
0
        _min = o._min;
197
0
        _max = o._max;
198
0
        return *this;
199
0
    }
200
201
    TDigest(TDigest&& o)
202
            : TDigest(std::move(o._processed), std::move(o._unprocessed), o._compression,
203
0
                      o._max_unprocessed, o._max_processed) {}
204
205
13
    static inline Index processedSize(Index size, Value compression) noexcept {
206
13
        return (size == 0) ? static_cast<Index>(2 * std::ceil(compression)) : size;
207
13
    }
208
209
13
    static inline Index unprocessedSize(Index size, Value compression) noexcept {
210
13
        return (size == 0) ? static_cast<Index>(8 * std::ceil(compression)) : size;
211
13
    }
212
213
    // merge in another t-digest
214
1
    void merge(const TDigest* other) {
215
1
        std::vector<const TDigest*> others {other};
216
1
        add(others.cbegin(), others.cend());
217
1
    }
218
219
3
    const std::vector<Centroid>& processed() const { return _processed; }
220
221
0
    const std::vector<Centroid>& unprocessed() const { return _unprocessed; }
222
223
0
    Index maxUnprocessed() const { return _max_unprocessed; }
224
225
0
    Index maxProcessed() const { return _max_processed; }
226
227
1
    void add(std::vector<const TDigest*> digests) { add(digests.cbegin(), digests.cend()); }
228
229
    // merge in a vector of tdigests in the most efficient manner possible
230
    // in constant space
231
    // works for any value of kHighWater
232
    void add(std::vector<const TDigest*>::const_iterator iter,
233
2
             std::vector<const TDigest*>::const_iterator end) {
234
2
        if (iter != end) {
235
2
            auto size = std::distance(iter, end);
236
2
            TDigestQueue pq(TDigestComparator {});
237
4
            for (; iter != end; iter++) {
238
2
                pq.push((*iter));
239
2
            }
240
2
            std::vector<const TDigest*> batch;
241
2
            batch.reserve(size);
242
243
2
            size_t totalSize = 0;
244
4
            while (!pq.empty()) {
245
2
                auto td = pq.top();
246
2
                batch.push_back(td);
247
2
                pq.pop();
248
2
                totalSize += td->totalSize();
249
2
                if (totalSize >= kHighWater || pq.empty()) {
250
2
                    mergeProcessed(batch);
251
2
                    mergeUnprocessed(batch);
252
2
                    processIfNecessary();
253
2
                    batch.clear();
254
2
                    totalSize = 0;
255
2
                }
256
2
            }
257
2
            updateCumulative();
258
2
        }
259
2
    }
260
261
0
    Weight processedWeight() const { return _processed_weight; }
262
263
0
    Weight unprocessedWeight() const { return _unprocessed_weight; }
264
265
41
    bool haveUnprocessed() const { return _unprocessed.size() > 0; }
266
267
2
    size_t totalSize() const { return _processed.size() + _unprocessed.size(); }
268
269
2
    long totalWeight() const { return static_cast<long>(_processed_weight + _unprocessed_weight); }
270
271
    // return the cdf on the t-digest
272
11
    Value cdf(Value x) {
273
11
        if (haveUnprocessed() || isDirty()) process();
274
11
        return cdfProcessed(x);
275
11
    }
276
277
13.2k
    bool isDirty() {
278
13.2k
        return _processed.size() > _max_processed || _unprocessed.size() > _max_unprocessed;
279
13.2k
    }
280
281
    // return the cdf on the processed values
282
11
    Value cdfProcessed(Value x) const {
283
11
        VLOG_CRITICAL << "cdf value " << x;
284
11
        VLOG_CRITICAL << "processed size " << _processed.size();
285
11
        if (_processed.size() == 0) {
286
            // no data to examine
287
0
            VLOG_CRITICAL << "no processed values";
288
289
0
            return 0.0;
290
11
        } else if (_processed.size() == 1) {
291
0
            VLOG_CRITICAL << "one processed value "
292
0
                          << " _min " << _min << " _max " << _max;
293
            // exactly one centroid, should have _max==_min
294
0
            auto width = _max - _min;
295
0
            if (x < _min) {
296
0
                return 0.0;
297
0
            } else if (x > _max) {
298
0
                return 1.0;
299
0
            } else if (x - _min <= width) {
300
                // _min and _max are too close together to do any viable interpolation
301
0
                return 0.5;
302
0
            } else {
303
                // interpolate if somehow we have weight > 0 and _max != _min
304
0
                return (x - _min) / (_max - _min);
305
0
            }
306
11
        } else {
307
11
            auto n = _processed.size();
308
11
            if (x <= _min) {
309
1
                VLOG_CRITICAL << "below _min "
310
0
                              << " _min " << _min << " x " << x;
311
1
                return 0;
312
1
            }
313
314
10
            if (x >= _max) {
315
1
                VLOG_CRITICAL << "above _max "
316
0
                              << " _max " << _max << " x " << x;
317
1
                return 1;
318
1
            }
319
320
            // check for the left tail
321
9
            if (x <= mean(0)) {
322
0
                VLOG_CRITICAL << "left tail "
323
0
                              << " _min " << _min << " mean(0) " << mean(0) << " x " << x;
324
325
                // note that this is different than mean(0) > _min ... this guarantees interpolation works
326
0
                if (mean(0) - _min > 0) {
327
0
                    return (x - _min) / (mean(0) - _min) * weight(0) / _processed_weight / 2.0;
328
0
                } else {
329
0
                    return 0;
330
0
                }
331
0
            }
332
333
            // and the right tail
334
9
            if (x >= mean(n - 1)) {
335
0
                VLOG_CRITICAL << "right tail"
336
0
                              << " _max " << _max << " mean(n - 1) " << mean(n - 1) << " x " << x;
337
338
0
                if (_max - mean(n - 1) > 0) {
339
0
                    return 1.0 - (_max - x) / (_max - mean(n - 1)) * weight(n - 1) /
340
0
                                         _processed_weight / 2.0;
341
0
                } else {
342
0
                    return 1;
343
0
                }
344
0
            }
345
346
9
            CentroidComparator cc;
347
9
            auto iter =
348
9
                    std::upper_bound(_processed.cbegin(), _processed.cend(), Centroid(x, 0), cc);
349
350
9
            auto i = std::distance(_processed.cbegin(), iter);
351
9
            auto z1 = x - (iter - 1)->mean();
352
9
            auto z2 = (iter)->mean() - x;
353
9
            DCHECK_LE(0.0, z1);
354
9
            DCHECK_LE(0.0, z2);
355
9
            VLOG_CRITICAL << "middle "
356
0
                          << " z1 " << z1 << " z2 " << z2 << " x " << x;
357
358
9
            return weightedAverage(_cumulative[i - 1], z2, _cumulative[i], z1) / _processed_weight;
359
9
        }
360
11
    }
361
362
    // this returns a quantile on the t-digest
363
30
    Value quantile(Value q) {
364
30
        if (haveUnprocessed() || isDirty()) process();
365
30
        return quantileProcessed(q);
366
30
    }
367
368
    // this returns a quantile on the currently processed values without changing the t-digest
369
    // the value will not represent the unprocessed values
370
30
    Value quantileProcessed(Value q) const {
371
30
        if (q < 0 || q > 1) {
372
0
            VLOG_CRITICAL << "q should be in [0,1], got " << q;
373
0
            return NAN;
374
0
        }
375
376
30
        if (_processed.size() == 0) {
377
            // no sorted means no data, no way to get a quantile
378
0
            return NAN;
379
30
        } else if (_processed.size() == 1) {
380
            // with one data point, all quantiles lead to Rome
381
382
3
            return mean(0);
383
3
        }
384
385
        // we know that there are at least two sorted now
386
27
        auto n = _processed.size();
387
388
        // if values were stored in a sorted array, index would be the offset we are Weighterested in
389
27
        const auto index = q * _processed_weight;
390
391
        // at the boundaries, we return _min or _max
392
27
        if (index <= weight(0) / 2.0) {
393
5
            DCHECK_GT(weight(0), 0);
394
5
            return _min + 2.0 * index / weight(0) * (mean(0) - _min);
395
5
        }
396
397
22
        auto iter = std::lower_bound(_cumulative.cbegin(), _cumulative.cend(), index);
398
399
22
        if (iter + 1 != _cumulative.cend()) {
400
17
            auto i = std::distance(_cumulative.cbegin(), iter);
401
17
            auto z1 = index - *(iter - 1);
402
17
            auto z2 = *(iter)-index;
403
            // VLOG_CRITICAL << "z2 " << z2 << " index " << index << " z1 " << z1;
404
17
            return weightedAverage(mean(i - 1), z2, mean(i), z1);
405
17
        }
406
407
5
        DCHECK_LE(index, _processed_weight);
408
5
        DCHECK_GE(index, _processed_weight - weight(n - 1) / 2.0);
409
410
5
        auto z1 = index - _processed_weight - weight(n - 1) / 2.0;
411
5
        auto z2 = weight(n - 1) / 2 - z1;
412
5
        return weightedAverage(mean(n - 1), z1, _max, z2);
413
22
    }
414
415
0
    Value compression() const { return _compression; }
416
417
3.17k
    void add(Value x) { add(x, 1); }
418
419
3
    void compress() { process(); }
420
421
    // add a single centroid to the unprocessed vector, processing previously unprocessed sorted if our limit has
422
    // been reached.
423
13.1k
    bool add(Value x, Weight w) {
424
13.1k
        if (std::isnan(x)) {
425
0
            return false;
426
0
        }
427
13.1k
        _unprocessed.push_back(Centroid(x, w));
428
13.1k
        _unprocessed_weight += w;
429
13.1k
        processIfNecessary();
430
13.1k
        return true;
431
13.1k
    }
432
433
    void add(std::vector<Centroid>::const_iterator iter,
434
0
             std::vector<Centroid>::const_iterator end) {
435
0
        while (iter != end) {
436
0
            const size_t diff = std::distance(iter, end);
437
0
            const size_t room = _max_unprocessed - _unprocessed.size();
438
0
            auto mid = iter + std::min(diff, room);
439
0
            while (iter != mid) _unprocessed.push_back(*(iter++));
440
0
            if (_unprocessed.size() >= _max_unprocessed) {
441
0
                process();
442
0
            }
443
0
        }
444
0
    }
445
446
5
    uint32_t serialized_size() {
447
5
        return sizeof(uint32_t) + sizeof(Value) * 5 + sizeof(Index) * 2 + sizeof(uint32_t) * 3 +
448
5
               _processed.size() * sizeof(Centroid) + _unprocessed.size() * sizeof(Centroid) +
449
5
               _cumulative.size() * sizeof(Weight);
450
5
    }
451
452
1
    size_t serialize(uint8_t* writer) {
453
1
        uint8_t* dst = writer;
454
1
        uint32_t total_size = serialized_size();
455
1
        memcpy(writer, &total_size, sizeof(uint32_t));
456
1
        writer += sizeof(uint32_t);
457
1
        memcpy(writer, &_compression, sizeof(Value));
458
1
        writer += sizeof(Value);
459
1
        memcpy(writer, &_min, sizeof(Value));
460
1
        writer += sizeof(Value);
461
1
        memcpy(writer, &_max, sizeof(Value));
462
1
        writer += sizeof(Value);
463
1
        memcpy(writer, &_max_processed, sizeof(Index));
464
1
        writer += sizeof(Index);
465
1
        memcpy(writer, &_max_unprocessed, sizeof(Index));
466
1
        writer += sizeof(Index);
467
1
        memcpy(writer, &_processed_weight, sizeof(Value));
468
1
        writer += sizeof(Value);
469
1
        memcpy(writer, &_unprocessed_weight, sizeof(Value));
470
1
        writer += sizeof(Value);
471
472
1
        uint32_t size = _processed.size();
473
1
        memcpy(writer, &size, sizeof(uint32_t));
474
1
        writer += sizeof(uint32_t);
475
1
        for (int i = 0; i < size; i++) {
476
0
            memcpy(writer, &_processed[i], sizeof(Centroid));
477
0
            writer += sizeof(Centroid);
478
0
        }
479
480
1
        size = _unprocessed.size();
481
1
        memcpy(writer, &size, sizeof(uint32_t));
482
1
        writer += sizeof(uint32_t);
483
        //TODO(weixiang): may be once memcpy is enough!
484
2.05k
        for (int i = 0; i < size; i++) {
485
2.04k
            memcpy(writer, &_unprocessed[i], sizeof(Centroid));
486
2.04k
            writer += sizeof(Centroid);
487
2.04k
        }
488
489
1
        size = _cumulative.size();
490
1
        memcpy(writer, &size, sizeof(uint32_t));
491
1
        writer += sizeof(uint32_t);
492
1
        for (int i = 0; i < size; i++) {
493
0
            memcpy(writer, &_cumulative[i], sizeof(Weight));
494
0
            writer += sizeof(Weight);
495
0
        }
496
1
        return writer - dst;
497
1
    }
498
499
1
    void unserialize(const uint8_t* type_reader) {
500
1
        uint32_t total_length = 0;
501
1
        memcpy(&total_length, type_reader, sizeof(uint32_t));
502
1
        type_reader += sizeof(uint32_t);
503
1
        memcpy(&_compression, type_reader, sizeof(Value));
504
1
        type_reader += sizeof(Value);
505
1
        memcpy(&_min, type_reader, sizeof(Value));
506
1
        type_reader += sizeof(Value);
507
1
        memcpy(&_max, type_reader, sizeof(Value));
508
1
        type_reader += sizeof(Value);
509
510
1
        memcpy(&_max_processed, type_reader, sizeof(Index));
511
1
        type_reader += sizeof(Index);
512
1
        memcpy(&_max_unprocessed, type_reader, sizeof(Index));
513
1
        type_reader += sizeof(Index);
514
1
        memcpy(&_processed_weight, type_reader, sizeof(Value));
515
1
        type_reader += sizeof(Value);
516
1
        memcpy(&_unprocessed_weight, type_reader, sizeof(Value));
517
1
        type_reader += sizeof(Value);
518
519
1
        uint32_t size;
520
1
        memcpy(&size, type_reader, sizeof(uint32_t));
521
1
        type_reader += sizeof(uint32_t);
522
1
        _processed.resize(size);
523
1
        for (int i = 0; i < size; i++) {
524
0
            memcpy(&_processed[i], type_reader, sizeof(Centroid));
525
0
            type_reader += sizeof(Centroid);
526
0
        }
527
1
        memcpy(&size, type_reader, sizeof(uint32_t));
528
1
        type_reader += sizeof(uint32_t);
529
1
        _unprocessed.resize(size);
530
2.05k
        for (int i = 0; i < size; i++) {
531
2.04k
            memcpy(&_unprocessed[i], type_reader, sizeof(Centroid));
532
2.04k
            type_reader += sizeof(Centroid);
533
2.04k
        }
534
1
        memcpy(&size, type_reader, sizeof(uint32_t));
535
1
        type_reader += sizeof(uint32_t);
536
1
        _cumulative.resize(size);
537
1
        for (int i = 0; i < size; i++) {
538
0
            memcpy(&_cumulative[i], type_reader, sizeof(Weight));
539
0
            type_reader += sizeof(Weight);
540
0
        }
541
1
    }
542
543
private:
544
    Value _compression;
545
546
    Value _min = std::numeric_limits<Value>::max();
547
548
    Value _max = std::numeric_limits<Value>::min();
549
550
    Index _max_processed;
551
552
    Index _max_unprocessed;
553
554
    Value _processed_weight = 0.0;
555
556
    Value _unprocessed_weight = 0.0;
557
558
    std::vector<Centroid> _processed;
559
560
    std::vector<Centroid> _unprocessed;
561
562
    std::vector<Weight> _cumulative;
563
564
    // return mean of i-th centroid
565
65
    Value mean(int i) const noexcept { return _processed[i].mean(); }
566
567
    // return weight of i-th centroid
568
2.57k
    Weight weight(int i) const noexcept { return _processed[i].weight(); }
569
570
    // append all unprocessed centroids into current unprocessed vector
571
2
    void mergeUnprocessed(const std::vector<const TDigest*>& tdigests) {
572
2
        if (tdigests.size() == 0) return;
573
574
2
        size_t total = _unprocessed.size();
575
2
        for (auto& td : tdigests) {
576
2
            total += td->_unprocessed.size();
577
2
        }
578
579
2
        _unprocessed.reserve(total);
580
2
        for (auto& td : tdigests) {
581
2
            _unprocessed.insert(_unprocessed.end(), td->_unprocessed.cbegin(),
582
2
                                td->_unprocessed.cend());
583
2
            _unprocessed_weight += td->_unprocessed_weight;
584
2
        }
585
2
    }
586
587
    // merge all processed centroids together into a single sorted vector
588
2
    void mergeProcessed(const std::vector<const TDigest*>& tdigests) {
589
2
        if (tdigests.size() == 0) return;
590
591
2
        size_t total = 0;
592
2
        CentroidListQueue pq(CentroidListComparator {});
593
2
        for (auto& td : tdigests) {
594
2
            auto& sorted = td->_processed;
595
2
            auto size = sorted.size();
596
2
            if (size > 0) {
597
1
                pq.push(CentroidList(sorted));
598
1
                total += size;
599
1
                _processed_weight += td->_processed_weight;
600
1
            }
601
2
        }
602
2
        if (total == 0) return;
603
604
1
        if (_processed.size() > 0) {
605
0
            pq.push(CentroidList(_processed));
606
0
            total += _processed.size();
607
0
        }
608
609
1
        std::vector<Centroid> sorted;
610
1
        VLOG_CRITICAL << "total " << total;
611
1
        sorted.reserve(total);
612
613
101
        while (!pq.empty()) {
614
100
            auto best = pq.top();
615
100
            pq.pop();
616
100
            sorted.push_back(*(best.iter));
617
100
            if (best.advance()) pq.push(best);
618
100
        }
619
1
        _processed = std::move(sorted);
620
1
        if (_processed.size() > 0) {
621
1
            _min = std::min(_min, _processed[0].mean());
622
1
            _max = std::max(_max, (_processed.cend() - 1)->mean());
623
1
        }
624
1
    }
625
626
13.1k
    void processIfNecessary() {
627
13.1k
        if (isDirty()) {
628
1
            process();
629
1
        }
630
13.1k
    }
631
632
10
    void updateCumulative() {
633
10
        const auto n = _processed.size();
634
10
        _cumulative.clear();
635
10
        _cumulative.reserve(n + 1);
636
10
        auto previous = 0.0;
637
2.52k
        for (Index i = 0; i < n; i++) {
638
2.51k
            auto current = weight(i);
639
2.51k
            auto halfCurrent = current / 2.0;
640
2.51k
            _cumulative.push_back(previous + halfCurrent);
641
2.51k
            previous = previous + current;
642
2.51k
        }
643
10
        _cumulative.push_back(previous);
644
10
    }
645
646
    // merges _unprocessed centroids and _processed centroids together and processes them
647
    // when complete, _unprocessed will be empty and _processed will have at most _max_processed centroids
648
8
    void process() {
649
8
        CentroidComparator cc;
650
8
        RadixSort<TDigestRadixSortTraits>::executeLSD(_unprocessed.data(), _unprocessed.size());
651
8
        auto count = _unprocessed.size();
652
8
        _unprocessed.insert(_unprocessed.end(), _processed.cbegin(), _processed.cend());
653
8
        std::inplace_merge(_unprocessed.begin(), _unprocessed.begin() + count, _unprocessed.end(),
654
8
                           cc);
655
656
8
        _processed_weight += _unprocessed_weight;
657
8
        _unprocessed_weight = 0;
658
8
        _processed.clear();
659
660
8
        _processed.push_back(_unprocessed[0]);
661
8
        Weight wSoFar = _unprocessed[0].weight();
662
8
        Weight wLimit = _processed_weight * integratedQ(1.0);
663
664
8
        auto end = _unprocessed.end();
665
12.2k
        for (auto iter = _unprocessed.cbegin() + 1; iter < end; iter++) {
666
12.1k
            auto& centroid = *iter;
667
12.1k
            Weight projectedW = wSoFar + centroid.weight();
668
12.1k
            if (projectedW <= wLimit) {
669
9.78k
                wSoFar = projectedW;
670
9.78k
                (_processed.end() - 1)->add(centroid);
671
9.78k
            } else {
672
2.41k
                auto k1 = integratedLocation(wSoFar / _processed_weight);
673
2.41k
                wLimit = _processed_weight * integratedQ(k1 + 1.0);
674
2.41k
                wSoFar += centroid.weight();
675
2.41k
                _processed.emplace_back(centroid);
676
2.41k
            }
677
12.1k
        }
678
8
        _unprocessed.clear();
679
8
        _min = std::min(_min, _processed[0].mean());
680
8
        VLOG_CRITICAL << "new _min " << _min;
681
8
        _max = std::max(_max, (_processed.cend() - 1)->mean());
682
8
        VLOG_CRITICAL << "new _max " << _max;
683
8
        updateCumulative();
684
8
    }
685
686
0
    int checkWeights() { return checkWeights(_processed, _processed_weight); }
687
688
0
    size_t checkWeights(const std::vector<Centroid>& sorted, Value total) {
689
0
        size_t badWeight = 0;
690
0
        auto k1 = 0.0;
691
0
        auto q = 0.0;
692
0
        for (auto iter = sorted.cbegin(); iter != sorted.cend(); iter++) {
693
0
            auto w = iter->weight();
694
0
            auto dq = w / total;
695
0
            auto k2 = integratedLocation(q + dq);
696
0
            if (k2 - k1 > 1 && w != 1) {
697
0
                VLOG_CRITICAL << "Oversize centroid at " << std::distance(sorted.cbegin(), iter)
698
0
                              << " k1 " << k1 << " k2 " << k2 << " dk " << (k2 - k1) << " w " << w
699
0
                              << " q " << q;
700
0
                badWeight++;
701
0
            }
702
0
            if (k2 - k1 > 1.5 && w != 1) {
703
0
                VLOG_CRITICAL << "Egregiously Oversize centroid at "
704
0
                              << std::distance(sorted.cbegin(), iter) << " k1 " << k1 << " k2 "
705
0
                              << k2 << " dk " << (k2 - k1) << " w " << w << " q " << q;
706
0
                badWeight++;
707
0
            }
708
0
            q += dq;
709
0
            k1 = k2;
710
0
        }
711
0
712
0
        return badWeight;
713
0
    }
714
715
    /**
716
    * Converts a quantile into a centroid scale value.  The centroid scale is nomin_ally
717
    * the number k of the centroid that a quantile point q should belong to.  Due to
718
    * round-offs, however, we can't align things perfectly without splitting points
719
    * and sorted.  We don't want to do that, so we have to allow for offsets.
720
    * In the end, the criterion is that any quantile range that spans a centroid
721
    * scale range more than one should be split across more than one centroid if
722
    * possible.  This won't be possible if the quantile range refers to a single point
723
    * or an already existing centroid.
724
    * <p/>
725
    * This mapping is steep near q=0 or q=1 so each centroid there will correspond to
726
    * less q range.  Near q=0.5, the mapping is flatter so that sorted there will
727
    * represent a larger chunk of quantiles.
728
    *
729
    * @param q The quantile scale value to be mapped.
730
    * @return The centroid scale value corresponding to q.
731
    */
732
2.41k
    Value integratedLocation(Value q) const {
733
2.41k
        return _compression * (std::asin(2.0 * q - 1.0) + M_PI / 2) / M_PI;
734
2.41k
    }
735
736
2.41k
    Value integratedQ(Value k) const {
737
2.41k
        return (std::sin(std::min(k, _compression) * M_PI / _compression - M_PI / 2) + 1) / 2;
738
2.41k
    }
739
740
    /**
741
     * Same as {@link #weightedAverageSorted(Value, Value, Value, Value)} but flips
742
     * the order of the variables if <code>x2</code> is greater than
743
     * <code>x1</code>.
744
    */
745
31
    static Value weightedAverage(Value x1, Value w1, Value x2, Value w2) {
746
31
        return (x1 <= x2) ? weightedAverageSorted(x1, w1, x2, w2)
747
31
                          : weightedAverageSorted(x2, w2, x1, w1);
748
31
    }
749
750
    /**
751
    * Compute the weighted average between <code>x1</code> with a weight of
752
    * <code>w1</code> and <code>x2</code> with a weight of <code>w2</code>.
753
    * This expects <code>x1</code> to be less than or equal to <code>x2</code>
754
    * and is guaranteed to return a number between <code>x1</code> and
755
    * <code>x2</code>.
756
    */
757
31
    static Value weightedAverageSorted(Value x1, Value w1, Value x2, Value w2) {
758
31
        DCHECK_LE(x1, x2);
759
31
        const Value x = (x1 * w1 + x2 * w2) / (w1 + w2);
760
31
        return std::max(x1, std::min(x, x2));
761
31
    }
762
763
0
    static Value interpolate(Value x, Value x0, Value x1) { return (x - x0) / (x1 - x0); }
764
765
    /**
766
    * Computes an interpolated value of a quantile that is between two sorted.
767
    *
768
    * Index is the quantile desired multiplied by the total number of samples - 1.
769
    *
770
    * @param index              Denormalized quantile desired
771
    * @param previousIndex      The denormalized quantile corresponding to the center of the previous centroid.
772
    * @param nextIndex          The denormalized quantile corresponding to the center of the following centroid.
773
    * @param previousMean       The mean of the previous centroid.
774
    * @param nextMean           The mean of the following centroid.
775
    * @return  The interpolated mean.
776
    */
777
    static Value quantile(Value index, Value previousIndex, Value nextIndex, Value previousMean,
778
0
                          Value nextMean) {
779
0
        const auto delta = nextIndex - previousIndex;
780
0
        const auto previousWeight = (nextIndex - index) / delta;
781
0
        const auto nextWeight = (index - previousIndex) / delta;
782
0
        return previousMean * previousWeight + nextMean * nextWeight;
783
0
    }
784
};
785
786
} // namespace doris