Coverage Report

Created: 2025-09-12 10:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/util/tdigest.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
/*
19
 * Licensed to Derrick R. Burns under one or more
20
 * contributor license agreements.  See the NOTICES file distributed with
21
 * this work for additional information regarding copyright ownership.
22
 * The ASF licenses this file to You under the Apache License, Version 2.0
23
 * (the "License"); you may not use this file except in compliance with
24
 * the License.  You may obtain a copy of the License at
25
 *
26
 *     http://www.apache.org/licenses/LICENSE-2.0
27
 *
28
 * Unless required by applicable law or agreed to in writing, software
29
 * distributed under the License is distributed on an "AS IS" BASIS,
30
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
31
 * See the License for the specific language governing permissions and
32
 * limitations under the License.
33
 */
34
35
// T-Digest :  Percentile and Quantile Estimation of Big Data
36
// A new data structure for accurate on-line accumulation of rank-based statistics
37
// such as quantiles and trimmed means.
38
// See original paper: "Computing extremely accurate quantiles using t-digest"
39
// by Ted Dunning and Otmar Ertl for more details
40
// https://github.com/tdunning/t-digest/blob/07b8f2ca2be8d0a9f04df2feadad5ddc1bb73c88/docs/t-digest-paper/histo.pdf.
41
// https://github.com/derrickburns/tdigest
42
43
#pragma once
44
45
#include <pdqsort.h>
46
47
#include <algorithm>
48
#include <cfloat>
49
#include <cmath>
50
#include <iostream>
51
#include <memory>
52
#include <queue>
53
#include <utility>
54
#include <vector>
55
56
#include "common/factory_creator.h"
57
#include "common/logging.h"
58
59
namespace doris {
60
#include "common/compile_check_begin.h"
61
62
using Value = float;
63
using Weight = float;
64
using Index = size_t;
65
66
const size_t kHighWater = 40000;
67
68
class Centroid {
69
public:
70
40.7k
    Centroid() : Centroid(0.0, 0.0) {}
71
72
73.0k
    Centroid(Value mean, Weight weight) : _mean(mean), _weight(weight) {}
73
74
643k
    Value mean() const noexcept { return _mean; }
75
76
91.8k
    Weight weight() const noexcept { return _weight; }
77
78
2.87k
    Value& mean() noexcept { return _mean; }
79
80
1.67k
    Weight& weight() noexcept { return _weight; }
81
82
31.4k
    void add(const Centroid& c) {
83
31.4k
        DCHECK_GT(c._weight, 0);
84
31.4k
        if (_weight != 0.0) {
85
31.4k
            _weight += c._weight;
86
31.4k
            _mean += c._weight * (c._mean - _mean) / _weight;
87
31.4k
        } else {
88
0
            _weight = c._weight;
89
0
            _mean = c._mean;
90
0
        }
91
31.4k
    }
92
93
private:
94
    Value _mean = 0;
95
    Weight _weight = 0;
96
};
97
98
struct CentroidList {
99
1
    CentroidList(const std::vector<Centroid>& s) : iter(s.cbegin()), end(s.cend()) {}
100
    std::vector<Centroid>::const_iterator iter;
101
    std::vector<Centroid>::const_iterator end;
102
103
100
    bool advance() { return ++iter != end; }
104
};
105
106
class CentroidListComparator {
107
public:
108
    CentroidListComparator() = default;
109
110
0
    bool operator()(const CentroidList& left, const CentroidList& right) const {
111
0
        return left.iter->mean() > right.iter->mean();
112
0
    }
113
};
114
115
using CentroidListQueue =
116
        std::priority_queue<CentroidList, std::vector<CentroidList>, CentroidListComparator>;
117
118
struct CentroidComparator {
119
320k
    bool operator()(const Centroid& a, const Centroid& b) const { return a.mean() < b.mean(); }
120
};
121
122
class TDigest {
123
    ENABLE_FACTORY_CREATOR(TDigest);
124
125
    class TDigestComparator {
126
    public:
127
        TDigestComparator() = default;
128
129
0
        bool operator()(const TDigest* left, const TDigest* right) const {
130
0
            return left->totalSize() > right->totalSize();
131
0
        }
132
    };
133
    using TDigestQueue =
134
            std::priority_queue<const TDigest*, std::vector<const TDigest*>, TDigestComparator>;
135
136
public:
137
0
    TDigest() : TDigest(10000) {}
138
139
1.48k
    explicit TDigest(Value compression) : TDigest(compression, 0) {}
140
141
1.48k
    TDigest(Value compression, Index bufferSize) : TDigest(compression, bufferSize, 0) {}
142
143
    TDigest(Value compression, Index unmergedSize, Index mergedSize)
144
1.48k
            : _compression(compression),
145
1.48k
              _max_processed(processedSize(mergedSize, compression)),
146
1.48k
              _max_unprocessed(unprocessedSize(unmergedSize, compression)) {
147
1.48k
        _processed.reserve(_max_processed);
148
1.48k
        _unprocessed.reserve(_max_unprocessed + 1);
149
1.48k
    }
150
151
    TDigest(std::vector<Centroid>&& processed, std::vector<Centroid>&& unprocessed,
152
            Value compression, Index unmergedSize, Index mergedSize)
153
0
            : TDigest(compression, unmergedSize, mergedSize) {
154
0
        _processed = std::move(processed);
155
0
        _unprocessed = std::move(unprocessed);
156
0
157
0
        _processed_weight = weight(_processed);
158
0
        _unprocessed_weight = weight(_unprocessed);
159
0
        if (_processed.size() > 0) {
160
0
            _min = std::min(_min, _processed[0].mean());
161
0
            _max = std::max(_max, (_processed.cend() - 1)->mean());
162
0
        }
163
0
        updateCumulative();
164
0
    }
165
166
0
    static Weight weight(std::vector<Centroid>& centroids) noexcept {
167
0
        Weight w = 0.0;
168
0
        for (auto centroid : centroids) {
169
0
            w += centroid.weight();
170
0
        }
171
0
        return w;
172
0
    }
173
174
0
    TDigest& operator=(TDigest&& o) {
175
0
        _compression = o._compression;
176
0
        _max_processed = o._max_processed;
177
0
        _max_unprocessed = o._max_unprocessed;
178
0
        _processed_weight = o._processed_weight;
179
0
        _unprocessed_weight = o._unprocessed_weight;
180
0
        _processed = std::move(o._processed);
181
0
        _unprocessed = std::move(o._unprocessed);
182
0
        _cumulative = std::move(o._cumulative);
183
0
        _min = o._min;
184
0
        _max = o._max;
185
0
        return *this;
186
0
    }
187
188
    TDigest(TDigest&& o)
189
            : TDigest(std::move(o._processed), std::move(o._unprocessed), o._compression,
190
0
                      o._max_unprocessed, o._max_processed) {}
191
192
1.48k
    static inline Index processedSize(Index size, Value compression) noexcept {
193
1.48k
        return (size == 0) ? static_cast<Index>(2 * std::ceil(compression)) : size;
194
1.48k
    }
195
196
1.48k
    static inline Index unprocessedSize(Index size, Value compression) noexcept {
197
1.48k
        return (size == 0) ? static_cast<Index>(8 * std::ceil(compression)) : size;
198
1.48k
    }
199
200
    // merge in another t-digest
201
425
    void merge(const TDigest* other) {
202
425
        std::vector<const TDigest*> others {other};
203
425
        add(others.cbegin(), others.cend());
204
425
    }
205
206
    const std::vector<Centroid>& processed() const { return _processed; }
207
208
0
    const std::vector<Centroid>& unprocessed() const { return _unprocessed; }
209
210
0
    Index maxUnprocessed() const { return _max_unprocessed; }
211
212
0
    Index maxProcessed() const { return _max_processed; }
213
214
    void add(std::vector<const TDigest*> digests) { add(digests.cbegin(), digests.cend()); }
215
216
    // merge in a vector of tdigests in the most efficient manner possible
217
    // in constant space
218
    // works for any value of kHighWater
219
    void add(std::vector<const TDigest*>::const_iterator iter,
220
426
             std::vector<const TDigest*>::const_iterator end) {
221
426
        if (iter != end) {
222
426
            auto size = std::distance(iter, end);
223
426
            TDigestQueue pq(TDigestComparator {});
224
852
            for (; iter != end; iter++) {
225
426
                pq.push((*iter));
226
426
            }
227
426
            std::vector<const TDigest*> batch;
228
426
            batch.reserve(size);
229
230
426
            size_t totalSize = 0;
231
852
            while (!pq.empty()) {
232
426
                const auto* td = pq.top();
233
426
                batch.push_back(td);
234
426
                pq.pop();
235
426
                totalSize += td->totalSize();
236
426
                if (totalSize >= kHighWater || pq.empty()) {
237
426
                    mergeProcessed(batch);
238
426
                    mergeUnprocessed(batch);
239
426
                    processIfNecessary();
240
426
                    batch.clear();
241
426
                    totalSize = 0;
242
426
                }
243
426
            }
244
426
            updateCumulative();
245
426
        }
246
426
    }
247
248
0
    Weight processedWeight() const { return _processed_weight; }
249
250
0
    Weight unprocessedWeight() const { return _unprocessed_weight; }
251
252
549
    bool haveUnprocessed() const { return _unprocessed.size() > 0; }
253
254
426
    size_t totalSize() const { return _processed.size() + _unprocessed.size(); }
255
256
    long totalWeight() const { return static_cast<long>(_processed_weight + _unprocessed_weight); }
257
258
    // return the cdf on the t-digest
259
    Value cdf(Value x) {
260
        if (haveUnprocessed() || isDirty()) {
261
            process();
262
        }
263
        return cdfProcessed(x);
264
    }
265
266
32.8k
    bool isDirty() {
267
32.8k
        return _processed.size() > _max_processed || _unprocessed.size() > _max_unprocessed;
268
32.8k
    }
269
270
    // return the cdf on the processed values
271
0
    Value cdfProcessed(Value x) const {
272
0
        VLOG_CRITICAL << "cdf value " << x;
273
0
        VLOG_CRITICAL << "processed size " << _processed.size();
274
0
        if (_processed.size() == 0) {
275
0
            // no data to examine
276
0
            VLOG_CRITICAL << "no processed values";
277
0
278
0
            return 0.0;
279
0
        } else if (_processed.size() == 1) {
280
0
            VLOG_CRITICAL << "one processed value "
281
0
                          << " _min " << _min << " _max " << _max;
282
0
            // exactly one centroid, should have _max==_min
283
0
            auto width = _max - _min;
284
0
            if (x < _min) {
285
0
                return 0.0;
286
0
            } else if (x > _max) {
287
0
                return 1.0;
288
0
            } else if (x - _min <= width) {
289
0
                // _min and _max are too close together to do any viable interpolation
290
0
                return 0.5;
291
0
            } else {
292
0
                // interpolate if somehow we have weight > 0 and _max != _min
293
0
                return (x - _min) / (_max - _min);
294
0
            }
295
0
        } else {
296
0
            auto n = _processed.size();
297
0
            if (x <= _min) {
298
0
                VLOG_CRITICAL << "below _min "
299
0
                              << " _min " << _min << " x " << x;
300
0
                return 0;
301
0
            }
302
0
303
0
            if (x >= _max) {
304
0
                VLOG_CRITICAL << "above _max "
305
0
                              << " _max " << _max << " x " << x;
306
0
                return 1;
307
0
            }
308
0
309
0
            // check for the left tail
310
0
            if (x <= mean(0)) {
311
0
                VLOG_CRITICAL << "left tail "
312
0
                              << " _min " << _min << " mean(0) " << mean(0) << " x " << x;
313
0
314
0
                // note that this is different than mean(0) > _min ... this guarantees interpolation works
315
0
                if (mean(0) - _min > 0) {
316
0
                    return static_cast<Value>((x - _min) / (mean(0) - _min) * weight(0) /
317
0
                                              _processed_weight / 2.0);
318
0
                } else {
319
0
                    return 0;
320
0
                }
321
0
            }
322
0
323
0
            // and the right tail
324
0
            if (x >= mean(n - 1)) {
325
0
                VLOG_CRITICAL << "right tail"
326
0
                              << " _max " << _max << " mean(n - 1) " << mean(n - 1) << " x " << x;
327
0
328
0
                if (_max - mean(n - 1) > 0) {
329
0
                    return static_cast<Value>(1.0 - (_max - x) / (_max - mean(n - 1)) *
330
0
                                                            weight(n - 1) / _processed_weight /
331
0
                                                            2.0);
332
0
                } else {
333
0
                    return 1;
334
0
                }
335
0
            }
336
0
337
0
            CentroidComparator cc;
338
0
            auto iter =
339
0
                    std::upper_bound(_processed.cbegin(), _processed.cend(), Centroid(x, 0), cc);
340
0
341
0
            auto i = std::distance(_processed.cbegin(), iter);
342
0
            auto z1 = x - (iter - 1)->mean();
343
0
            auto z2 = (iter)->mean() - x;
344
0
            DCHECK_LE(0.0, z1);
345
0
            DCHECK_LE(0.0, z2);
346
0
            VLOG_CRITICAL << "middle "
347
0
                          << " z1 " << z1 << " z2 " << z2 << " x " << x;
348
0
349
0
            return weightedAverage(_cumulative[i - 1], z2, _cumulative[i], z1) / _processed_weight;
350
0
        }
351
0
    }
352
353
    // this returns a quantile on the t-digest
354
538
    Value quantile(Value q) {
355
538
        if (haveUnprocessed() || isDirty()) {
356
466
            process();
357
466
        }
358
538
        return quantileProcessed(q);
359
538
    }
360
361
    // this returns a quantile on the currently processed values without changing the t-digest
362
    // the value will not represent the unprocessed values
363
538
    Value quantileProcessed(Value q) const {
364
538
        if (q < 0 || q > 1) {
365
0
            VLOG_CRITICAL << "q should be in [0,1], got " << q;
366
0
            return NAN;
367
0
        }
368
369
538
        if (_processed.size() == 0) {
370
            // no sorted means no data, no way to get a quantile
371
0
            return NAN;
372
538
        } else if (_processed.size() == 1) {
373
            // with one data point, all quantiles lead to Rome
374
375
179
            return mean(0);
376
179
        }
377
378
        // we know that there are at least two sorted now
379
359
        auto n = _processed.size();
380
381
        // if values were stored in a sorted array, index would be the offset we are Weighterested in
382
359
        const auto index = q * _processed_weight;
383
384
        // at the boundaries, we return _min or _max
385
359
        if (index <= weight(0) / 2.0) {
386
10
            DCHECK_GT(weight(0), 0);
387
10
            return static_cast<Value>(_min + 2.0 * index / weight(0) * (mean(0) - _min));
388
10
        }
389
390
349
        auto iter = std::lower_bound(_cumulative.cbegin(), _cumulative.cend(), index);
391
392
349
        if (iter != _cumulative.cend() && iter != _cumulative.cbegin() &&
393
349
            iter + 1 != _cumulative.cend()) {
394
294
            auto i = std::distance(_cumulative.cbegin(), iter);
395
294
            auto z1 = index - *(iter - 1);
396
294
            auto z2 = *(iter)-index;
397
            // VLOG_CRITICAL << "z2 " << z2 << " index " << index << " z1 " << z1;
398
294
            return weightedAverage(mean(i - 1), z2, mean(i), z1);
399
294
        }
400
401
349
        DCHECK_LE(index, _processed_weight);
402
55
        DCHECK_GE(index, _processed_weight - weight(n - 1) / 2.0);
403
404
55
        auto z1 = static_cast<Value>(index - _processed_weight - weight(n - 1) / 2.0);
405
55
        auto z2 = static_cast<Value>(weight(n - 1) / 2 - z1);
406
55
        return weightedAverage(mean(n - 1), z1, _max, z2);
407
349
    }
408
409
0
    Value compression() const { return _compression; }
410
411
22.1k
    void add(Value x) { add(x, 1); }
412
413
    void compress() { process(); }
414
415
    // add a single centroid to the unprocessed vector, processing previously unprocessed sorted if our limit has
416
    // been reached.
417
32.3k
    bool add(Value x, Weight w) {
418
32.3k
        if (std::isnan(x)) {
419
0
            return false;
420
0
        }
421
32.3k
        _unprocessed.emplace_back(x, w);
422
32.3k
        _unprocessed_weight += w;
423
32.3k
        processIfNecessary();
424
32.3k
        return true;
425
32.3k
    }
426
427
    void add(std::vector<Centroid>::const_iterator iter,
428
0
             std::vector<Centroid>::const_iterator end) {
429
0
        while (iter != end) {
430
0
            const size_t diff = std::distance(iter, end);
431
0
            const size_t room = _max_unprocessed - _unprocessed.size();
432
0
            auto mid = iter + std::min(diff, room);
433
0
            while (iter != mid) {
434
0
                _unprocessed.push_back(*(iter++));
435
0
            }
436
0
            if (_unprocessed.size() >= _max_unprocessed) {
437
0
                process();
438
0
            }
439
0
        }
440
0
    }
441
442
913
    uint32_t serialized_size() {
443
913
        return static_cast<uint32_t>(sizeof(uint32_t) + sizeof(Value) * 5 + sizeof(Index) * 2 +
444
913
                                     sizeof(uint32_t) * 3 + _processed.size() * sizeof(Centroid) +
445
913
                                     _unprocessed.size() * sizeof(Centroid) +
446
913
                                     _cumulative.size() * sizeof(Weight));
447
913
    }
448
449
454
    size_t serialize(uint8_t* writer) {
450
454
        uint8_t* dst = writer;
451
454
        uint32_t total_size = serialized_size();
452
454
        memcpy(writer, &total_size, sizeof(uint32_t));
453
454
        writer += sizeof(uint32_t);
454
454
        memcpy(writer, &_compression, sizeof(Value));
455
454
        writer += sizeof(Value);
456
454
        memcpy(writer, &_min, sizeof(Value));
457
454
        writer += sizeof(Value);
458
454
        memcpy(writer, &_max, sizeof(Value));
459
454
        writer += sizeof(Value);
460
454
        memcpy(writer, &_max_processed, sizeof(Index));
461
454
        writer += sizeof(Index);
462
454
        memcpy(writer, &_max_unprocessed, sizeof(Index));
463
454
        writer += sizeof(Index);
464
454
        memcpy(writer, &_processed_weight, sizeof(Value));
465
454
        writer += sizeof(Value);
466
454
        memcpy(writer, &_unprocessed_weight, sizeof(Value));
467
454
        writer += sizeof(Value);
468
469
454
        auto size = static_cast<uint32_t>(_processed.size());
470
454
        memcpy(writer, &size, sizeof(uint32_t));
471
454
        writer += sizeof(uint32_t);
472
454
        for (int i = 0; i < size; i++) {
473
0
            memcpy(writer, &_processed[i], sizeof(Centroid));
474
0
            writer += sizeof(Centroid);
475
0
        }
476
477
454
        size = static_cast<uint32_t>(_unprocessed.size());
478
454
        memcpy(writer, &size, sizeof(uint32_t));
479
454
        writer += sizeof(uint32_t);
480
        //TODO(weixiang): may be once memcpy is enough!
481
21.2k
        for (int i = 0; i < size; i++) {
482
20.8k
            memcpy(writer, &_unprocessed[i], sizeof(Centroid));
483
20.8k
            writer += sizeof(Centroid);
484
20.8k
        }
485
486
454
        size = static_cast<uint32_t>(_cumulative.size());
487
454
        memcpy(writer, &size, sizeof(uint32_t));
488
454
        writer += sizeof(uint32_t);
489
601
        for (int i = 0; i < size; i++) {
490
147
            memcpy(writer, &_cumulative[i], sizeof(Weight));
491
147
            writer += sizeof(Weight);
492
147
        }
493
454
        return writer - dst;
494
454
    }
495
496
432
    void unserialize(const uint8_t* type_reader) {
497
432
        uint32_t total_length = 0;
498
432
        memcpy(&total_length, type_reader, sizeof(uint32_t));
499
432
        type_reader += sizeof(uint32_t);
500
432
        memcpy(&_compression, type_reader, sizeof(Value));
501
432
        type_reader += sizeof(Value);
502
432
        memcpy(&_min, type_reader, sizeof(Value));
503
432
        type_reader += sizeof(Value);
504
432
        memcpy(&_max, type_reader, sizeof(Value));
505
432
        type_reader += sizeof(Value);
506
507
432
        memcpy(&_max_processed, type_reader, sizeof(Index));
508
432
        type_reader += sizeof(Index);
509
432
        memcpy(&_max_unprocessed, type_reader, sizeof(Index));
510
432
        type_reader += sizeof(Index);
511
432
        memcpy(&_processed_weight, type_reader, sizeof(Value));
512
432
        type_reader += sizeof(Value);
513
432
        memcpy(&_unprocessed_weight, type_reader, sizeof(Value));
514
432
        type_reader += sizeof(Value);
515
516
432
        uint32_t size;
517
432
        memcpy(&size, type_reader, sizeof(uint32_t));
518
432
        type_reader += sizeof(uint32_t);
519
432
        _processed.resize(size);
520
432
        for (int i = 0; i < size; i++) {
521
0
            memcpy(&_processed[i], type_reader, sizeof(Centroid));
522
0
            type_reader += sizeof(Centroid);
523
0
        }
524
432
        memcpy(&size, type_reader, sizeof(uint32_t));
525
432
        type_reader += sizeof(uint32_t);
526
432
        _unprocessed.resize(size);
527
41.1k
        for (int i = 0; i < size; i++) {
528
40.7k
            memcpy(&_unprocessed[i], type_reader, sizeof(Centroid));
529
40.7k
            type_reader += sizeof(Centroid);
530
40.7k
        }
531
432
        memcpy(&size, type_reader, sizeof(uint32_t));
532
432
        type_reader += sizeof(uint32_t);
533
432
        _cumulative.resize(size);
534
553
        for (int i = 0; i < size; i++) {
535
121
            memcpy(&_cumulative[i], type_reader, sizeof(Weight));
536
121
            type_reader += sizeof(Weight);
537
121
        }
538
432
    }
539
540
private:
541
    Value _compression;
542
543
    Value _min = std::numeric_limits<Value>::max();
544
545
    Value _max = std::numeric_limits<Value>::min();
546
547
    Index _max_processed;
548
549
    Index _max_unprocessed;
550
551
    Value _processed_weight = 0.0;
552
553
    Value _unprocessed_weight = 0.0;
554
555
    std::vector<Centroid> _processed;
556
557
    std::vector<Centroid> _unprocessed;
558
559
    std::vector<Weight> _cumulative;
560
561
    // return mean of i-th centroid
562
848
    Value mean(int64_t i) const noexcept { return _processed[i].mean(); }
563
564
    // return weight of i-th centroid
565
20.8k
    Weight weight(int64_t i) const noexcept { return _processed[i].weight(); }
566
567
    // append all unprocessed centroids into current unprocessed vector
568
426
    void mergeUnprocessed(const std::vector<const TDigest*>& tdigests) {
569
426
        if (tdigests.size() == 0) {
570
0
            return;
571
0
        }
572
573
426
        size_t total = _unprocessed.size();
574
426
        for (const auto& td : tdigests) {
575
426
            total += td->_unprocessed.size();
576
426
        }
577
578
426
        _unprocessed.reserve(total);
579
426
        for (const auto& td : tdigests) {
580
426
            _unprocessed.insert(_unprocessed.end(), td->_unprocessed.cbegin(),
581
426
                                td->_unprocessed.cend());
582
426
            _unprocessed_weight += td->_unprocessed_weight;
583
426
        }
584
426
    }
585
586
    // merge all processed centroids together into a single sorted vector
587
426
    void mergeProcessed(const std::vector<const TDigest*>& tdigests) {
588
426
        if (tdigests.size() == 0) {
589
0
            return;
590
0
        }
591
592
426
        size_t total = 0;
593
426
        CentroidListQueue pq(CentroidListComparator {});
594
426
        for (const auto& td : tdigests) {
595
426
            const auto& sorted = td->_processed;
596
426
            auto size = sorted.size();
597
426
            if (size > 0) {
598
1
                pq.push(CentroidList(sorted));
599
1
                total += size;
600
1
                _processed_weight += td->_processed_weight;
601
1
            }
602
426
        }
603
426
        if (total == 0) {
604
425
            return;
605
425
        }
606
607
1
        if (_processed.size() > 0) {
608
0
            pq.push(CentroidList(_processed));
609
0
            total += _processed.size();
610
0
        }
611
612
1
        std::vector<Centroid> sorted;
613
1
        VLOG_CRITICAL << "total " << total;
614
1
        sorted.reserve(total);
615
616
101
        while (!pq.empty()) {
617
100
            auto best = pq.top();
618
100
            pq.pop();
619
100
            sorted.push_back(*(best.iter));
620
100
            if (best.advance()) {
621
99
                pq.push(best);
622
99
            }
623
100
        }
624
1
        _processed = std::move(sorted);
625
1
        if (_processed.size() > 0) {
626
1
            _min = std::min(_min, _processed[0].mean());
627
1
            _max = std::max(_max, (_processed.cend() - 1)->mean());
628
1
        }
629
1
    }
630
631
32.7k
    void processIfNecessary() {
632
32.7k
        if (isDirty()) {
633
1
            process();
634
1
        }
635
32.7k
    }
636
637
896
    void updateCumulative() {
638
896
        const auto n = _processed.size();
639
896
        _cumulative.clear();
640
896
        _cumulative.reserve(n + 1);
641
896
        Weight previous = 0.0;
642
21.2k
        for (Index i = 0; i < n; i++) {
643
20.3k
            Weight current = weight(i);
644
20.3k
            auto halfCurrent = static_cast<Weight>(current / 2.0);
645
20.3k
            _cumulative.push_back(previous + halfCurrent);
646
20.3k
            previous = previous + current;
647
20.3k
        }
648
896
        _cumulative.push_back(previous);
649
896
    }
650
651
    // merges _unprocessed centroids and _processed centroids together and processes them
652
    // when complete, _unprocessed will be empty and _processed will have at most _max_processed centroids
653
470
    void process() {
654
470
        CentroidComparator cc;
655
        // select percentile_approx(lo_orderkey,0.5) from lineorder;
656
        // have test pdqsort and RadixSort, find here pdqsort performance is better when data is struct Centroid
657
        // But when sort plain type like int/float of std::vector<T>, find RadixSort is better
658
470
        pdqsort(_unprocessed.begin(), _unprocessed.end(), cc);
659
470
        auto count = _unprocessed.size();
660
470
        _unprocessed.insert(_unprocessed.end(), _processed.cbegin(), _processed.cend());
661
470
        std::inplace_merge(_unprocessed.begin(), _unprocessed.begin() + count, _unprocessed.end(),
662
470
                           cc);
663
664
470
        _processed_weight += _unprocessed_weight;
665
470
        _unprocessed_weight = 0;
666
470
        _processed.clear();
667
668
470
        _processed.push_back(_unprocessed[0]);
669
470
        Weight wSoFar = _unprocessed[0].weight();
670
470
        Weight wLimit = _processed_weight * integratedQ(1.0);
671
672
470
        auto end = _unprocessed.end();
673
51.6k
        for (auto iter = _unprocessed.cbegin() + 1; iter < end; iter++) {
674
51.1k
            const auto& centroid = *iter;
675
51.1k
            Weight projectedW = wSoFar + centroid.weight();
676
51.1k
            if (projectedW <= wLimit) {
677
31.4k
                wSoFar = projectedW;
678
31.4k
                (_processed.end() - 1)->add(centroid);
679
31.4k
            } else {
680
19.7k
                auto k1 = integratedLocation(wSoFar / _processed_weight);
681
19.7k
                wLimit = _processed_weight * integratedQ(static_cast<Value>(k1 + 1.0));
682
19.7k
                wSoFar += centroid.weight();
683
19.7k
                _processed.emplace_back(centroid);
684
19.7k
            }
685
51.1k
        }
686
470
        _unprocessed.clear();
687
470
        _min = std::min(_min, _processed[0].mean());
688
470
        VLOG_CRITICAL << "new _min " << _min;
689
470
        _max = std::max(_max, (_processed.cend() - 1)->mean());
690
470
        VLOG_CRITICAL << "new _max " << _max;
691
470
        updateCumulative();
692
470
    }
693
694
0
    size_t checkWeights(const std::vector<Centroid>& sorted, Value total) {
695
0
        size_t badWeight = 0;
696
0
        auto k1 = 0.0;
697
0
        auto q = 0.0;
698
0
        for (auto iter = sorted.cbegin(); iter != sorted.cend(); iter++) {
699
0
            auto w = iter->weight();
700
0
            auto dq = w / total;
701
0
            auto k2 = integratedLocation(static_cast<Value>(q + dq));
702
0
            if (k2 - k1 > 1 && w != 1) {
703
0
                VLOG_CRITICAL << "Oversize centroid at " << std::distance(sorted.cbegin(), iter)
704
0
                              << " k1 " << k1 << " k2 " << k2 << " dk " << (k2 - k1) << " w " << w
705
0
                              << " q " << q;
706
0
                badWeight++;
707
0
            }
708
0
            if (k2 - k1 > 1.5 && w != 1) {
709
0
                VLOG_CRITICAL << "Egregiously Oversize centroid at "
710
0
                              << std::distance(sorted.cbegin(), iter) << " k1 " << k1 << " k2 "
711
0
                              << k2 << " dk " << (k2 - k1) << " w " << w << " q " << q;
712
0
                badWeight++;
713
0
            }
714
0
            q += dq;
715
0
            k1 = k2;
716
0
        }
717
0
718
0
        return badWeight;
719
0
    }
720
721
    /**
722
    * Converts a quantile into a centroid scale value.  The centroid scale is nomin_ally
723
    * the number k of the centroid that a quantile point q should belong to.  Due to
724
    * round-offs, however, we can't align things perfectly without splitting points
725
    * and sorted.  We don't want to do that, so we have to allow for offsets.
726
    * In the end, the criterion is that any quantile range that spans a centroid
727
    * scale range more than one should be split across more than one centroid if
728
    * possible.  This won't be possible if the quantile range refers to a single point
729
    * or an already existing centroid.
730
    * <p/>
731
    * This mapping is steep near q=0 or q=1 so each centroid there will correspond to
732
    * less q range.  Near q=0.5, the mapping is flatter so that sorted there will
733
    * represent a larger chunk of quantiles.
734
    *
735
    * @param q The quantile scale value to be mapped.
736
    * @return The centroid scale value corresponding to q.
737
    */
738
19.7k
    Value integratedLocation(Value q) const {
739
19.7k
        return static_cast<Value>(_compression * (std::asin(2.0 * q - 1.0) + M_PI / 2) / M_PI);
740
19.7k
    }
741
742
20.2k
    Value integratedQ(Value k) const {
743
20.2k
        return static_cast<Value>(
744
20.2k
                (std::sin(std::min(k, _compression) * M_PI / _compression - M_PI / 2) + 1) / 2);
745
20.2k
    }
746
747
    /**
748
     * Same as {@link #weightedAverageSorted(Value, Value, Value, Value)} but flips
749
     * the order of the variables if <code>x2</code> is greater than
750
     * <code>x1</code>.
751
    */
752
357
    static Value weightedAverage(Value x1, Value w1, Value x2, Value w2) {
753
357
        return (x1 <= x2) ? weightedAverageSorted(x1, w1, x2, w2)
754
357
                          : weightedAverageSorted(x2, w2, x1, w1);
755
357
    }
756
757
    /**
758
    * Compute the weighted average between <code>x1</code> with a weight of
759
    * <code>w1</code> and <code>x2</code> with a weight of <code>w2</code>.
760
    * This expects <code>x1</code> to be less than or equal to <code>x2</code>
761
    * and is guaranteed to return a number between <code>x1</code> and
762
    * <code>x2</code>.
763
    */
764
357
    static Value weightedAverageSorted(Value x1, Value w1, Value x2, Value w2) {
765
357
        DCHECK_LE(x1, x2);
766
357
        const Value x = (x1 * w1 + x2 * w2) / (w1 + w2);
767
357
        return std::max(x1, std::min(x, x2));
768
357
    }
769
770
0
    static Value interpolate(Value x, Value x0, Value x1) { return (x - x0) / (x1 - x0); }
771
772
    /**
773
    * Computes an interpolated value of a quantile that is between two sorted.
774
    *
775
    * Index is the quantile desired multiplied by the total number of samples - 1.
776
    *
777
    * @param index              Denormalized quantile desired
778
    * @param previousIndex      The denormalized quantile corresponding to the center of the previous centroid.
779
    * @param nextIndex          The denormalized quantile corresponding to the center of the following centroid.
780
    * @param previousMean       The mean of the previous centroid.
781
    * @param nextMean           The mean of the following centroid.
782
    * @return  The interpolated mean.
783
    */
784
    static Value quantile(Value index, Value previousIndex, Value nextIndex, Value previousMean,
785
0
                          Value nextMean) {
786
0
        const auto delta = nextIndex - previousIndex;
787
0
        const auto previousWeight = (nextIndex - index) / delta;
788
0
        const auto nextWeight = (index - previousIndex) / delta;
789
0
        return previousMean * previousWeight + nextMean * nextWeight;
790
0
    }
791
};
792
#include "common/compile_check_end.h"
793
} // namespace doris