Coverage Report

Created: 2026-05-14 18:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/function/function_quantile_state.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Functions/FunctionBitmap.h
19
// and modified by Doris
20
21
#include <fmt/format.h>
22
#include <glog/logging.h>
23
24
#include <boost/iterator/iterator_facade.hpp>
25
#include <cstddef>
26
#include <memory>
27
#include <utility>
28
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/status.h"
31
#include "core/assert_cast.h"
32
#include "core/block/block.h"
33
#include "core/block/column_numbers.h"
34
#include "core/block/column_with_type_and_name.h"
35
#include "core/column/column.h"
36
#include "core/column/column_complex.h"
37
#include "core/column/column_const.h"
38
#include "core/column/column_nullable.h"
39
#include "core/column/column_string.h"
40
#include "core/column/column_vector.h"
41
#include "core/data_type/data_type.h"
42
#include "core/data_type/data_type_nullable.h"
43
#include "core/data_type/data_type_number.h"
44
#include "core/data_type/data_type_quantilestate.h" // IWYU pragma: keep
45
#include "core/data_type/data_type_string.h"
46
#include "core/types.h"
47
#include "core/value/quantile_state.h"
48
#include "exec/common/util.hpp"
49
#include "exprs/aggregate/aggregate_function.h"
50
#include "exprs/function/function.h"
51
#include "exprs/function/function_const.h"
52
#include "exprs/function/function_helpers.h"
53
#include "exprs/function/function_totype.h"
54
#include "exprs/function/simple_function_factory.h"
55
#include "util/url_coding.h"
56
57
namespace doris {
58
class FunctionContext;
59
} // namespace doris
60
61
namespace doris {
62
63
struct QuantileStateEmpty {
64
    static constexpr auto name = "quantile_state_empty";
65
    using ReturnColVec = ColumnQuantileState;
66
4
    static DataTypePtr get_return_type() { return std::make_shared<DataTypeQuantileState>(); }
67
4
    static auto init_value() { return QuantileState {}; }
68
};
69
70
class FunctionToQuantileState : public IFunction {
71
public:
72
    static constexpr auto name = "to_quantile_state";
73
1
    String get_name() const override { return name; }
74
75
175
    static FunctionPtr create() { return std::make_shared<FunctionToQuantileState>(); }
76
77
345
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
78
345
        return std::make_shared<DataTypeQuantileState>();
79
345
    }
80
81
166
    size_t get_number_of_arguments() const override { return 2; }
82
83
345
    bool use_default_implementation_for_nulls() const override { return false; }
84
85
    template <bool is_nullable>
86
    Status execute_internal(const ColumnPtr& column, const DataTypePtr& data_type,
87
179
                            MutableColumnPtr& column_result, float compression) const {
88
179
        const ColumnNullable* col_nullable = nullptr;
89
179
        const ColumnUInt8* col_nullmap = nullptr;
90
179
        const ColumnFloat64* col = nullptr;
91
179
        const NullMap* nullmap = nullptr;
92
179
        if constexpr (is_nullable) {
93
89
            col_nullable = assert_cast<const ColumnNullable*>(column.get());
94
89
            col_nullmap =
95
89
                    assert_cast<const ColumnUInt8*>(col_nullable->get_null_map_column_ptr().get());
96
89
            col = assert_cast<const ColumnFloat64*>(col_nullable->get_nested_column_ptr().get());
97
98
89
            nullmap = &col_nullmap->get_data();
99
90
        } else {
100
90
            col = assert_cast<const ColumnFloat64*>(column.get());
101
90
        }
102
179
        auto* res_column = reinterpret_cast<ColumnQuantileState*>(column_result.get());
103
179
        auto& res_data = res_column->get_data();
104
105
179
        size_t size = col->size();
106
14.6k
        for (size_t i = 0; i < size; ++i) {
107
14.4k
            if constexpr (is_nullable) {
108
6.31k
                if ((*nullmap)[i]) {
109
60
                    res_data[i].clear();
110
60
                    continue;
111
60
                }
112
6.31k
            }
113
6.25k
            auto value = (double)col->get_data()[i];
114
14.4k
            res_data[i].set_compression(compression);
115
14.4k
            res_data[i].add_value(value);
116
14.4k
        }
117
149
        return Status::OK();
118
179
    }
_ZNK5doris23FunctionToQuantileState16execute_internalILb1EEENS_6StatusERKNS_3COWINS_7IColumnEE13immutable_ptrIS4_EERKSt10shared_ptrIKNS_9IDataTypeEERNS5_11mutable_ptrIS4_EEf
Line
Count
Source
87
89
                            MutableColumnPtr& column_result, float compression) const {
88
89
        const ColumnNullable* col_nullable = nullptr;
89
89
        const ColumnUInt8* col_nullmap = nullptr;
90
89
        const ColumnFloat64* col = nullptr;
91
89
        const NullMap* nullmap = nullptr;
92
89
        if constexpr (is_nullable) {
93
89
            col_nullable = assert_cast<const ColumnNullable*>(column.get());
94
89
            col_nullmap =
95
89
                    assert_cast<const ColumnUInt8*>(col_nullable->get_null_map_column_ptr().get());
96
89
            col = assert_cast<const ColumnFloat64*>(col_nullable->get_nested_column_ptr().get());
97
98
89
            nullmap = &col_nullmap->get_data();
99
        } else {
100
            col = assert_cast<const ColumnFloat64*>(column.get());
101
        }
102
89
        auto* res_column = reinterpret_cast<ColumnQuantileState*>(column_result.get());
103
89
        auto& res_data = res_column->get_data();
104
105
89
        size_t size = col->size();
106
6.46k
        for (size_t i = 0; i < size; ++i) {
107
6.31k
            if constexpr (is_nullable) {
108
6.31k
                if ((*nullmap)[i]) {
109
60
                    res_data[i].clear();
110
60
                    continue;
111
60
                }
112
6.31k
            }
113
6.25k
            auto value = (double)col->get_data()[i];
114
6.31k
            res_data[i].set_compression(compression);
115
6.31k
            res_data[i].add_value(value);
116
6.31k
        }
117
149
        return Status::OK();
118
89
    }
_ZNK5doris23FunctionToQuantileState16execute_internalILb0EEENS_6StatusERKNS_3COWINS_7IColumnEE13immutable_ptrIS4_EERKSt10shared_ptrIKNS_9IDataTypeEERNS5_11mutable_ptrIS4_EEf
Line
Count
Source
87
90
                            MutableColumnPtr& column_result, float compression) const {
88
90
        const ColumnNullable* col_nullable = nullptr;
89
90
        const ColumnUInt8* col_nullmap = nullptr;
90
90
        const ColumnFloat64* col = nullptr;
91
90
        const NullMap* nullmap = nullptr;
92
        if constexpr (is_nullable) {
93
            col_nullable = assert_cast<const ColumnNullable*>(column.get());
94
            col_nullmap =
95
                    assert_cast<const ColumnUInt8*>(col_nullable->get_null_map_column_ptr().get());
96
            col = assert_cast<const ColumnFloat64*>(col_nullable->get_nested_column_ptr().get());
97
98
            nullmap = &col_nullmap->get_data();
99
90
        } else {
100
90
            col = assert_cast<const ColumnFloat64*>(column.get());
101
90
        }
102
90
        auto* res_column = reinterpret_cast<ColumnQuantileState*>(column_result.get());
103
90
        auto& res_data = res_column->get_data();
104
105
90
        size_t size = col->size();
106
8.17k
        for (size_t i = 0; i < size; ++i) {
107
            if constexpr (is_nullable) {
108
                if ((*nullmap)[i]) {
109
                    res_data[i].clear();
110
                    continue;
111
                }
112
            }
113
8.08k
            auto value = (double)col->get_data()[i];
114
8.08k
            res_data[i].set_compression(compression);
115
8.08k
            res_data[i].add_value(value);
116
8.08k
        }
117
90
        return Status::OK();
118
90
    }
119
120
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
121
179
                        uint32_t result, size_t input_rows_count) const override {
122
179
        const ColumnPtr& column = block.get_by_position(arguments[0]).column;
123
179
        const DataTypePtr& data_type = block.get_by_position(arguments[0]).type;
124
179
        const auto* compression_arg = check_and_get_column_const<ColumnFloat32>(
125
179
                block.get_by_position(arguments.back()).column.get());
126
179
        float compression = 2048;
127
179
        if (compression_arg) {
128
92
            auto compression_arg_val = compression_arg->get_value<TYPE_FLOAT>();
129
92
            if (compression_arg_val >= QUANTILE_STATE_COMPRESSION_MIN &&
130
92
                compression_arg_val <= QUANTILE_STATE_COMPRESSION_MAX) {
131
34
                compression = compression_arg_val;
132
34
            }
133
92
        }
134
179
        MutableColumnPtr column_result = get_return_type_impl({})->create_column();
135
179
        column_result->resize(input_rows_count);
136
137
179
        Status status = Status::OK();
138
179
        if (data_type->is_nullable()) {
139
89
            RETURN_IF_ERROR(execute_internal<true>(column, data_type, column_result, compression));
140
90
        } else {
141
90
            RETURN_IF_ERROR(execute_internal<false>(column, data_type, column_result, compression));
142
90
        }
143
179
        if (status.ok()) {
144
179
            block.replace_by_position(result, std::move(column_result));
145
179
        }
146
179
        return status;
147
179
    }
148
};
149
150
class FunctionQuantileStatePercent : public IFunction {
151
public:
152
    static constexpr auto name = "quantile_percent";
153
1
    String get_name() const override { return name; }
154
155
86
    static FunctionPtr create() { return std::make_shared<FunctionQuantileStatePercent>(); }
156
157
77
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
158
77
        return std::make_shared<DataTypeFloat64>();
159
77
    }
160
161
77
    size_t get_number_of_arguments() const override { return 2; }
162
163
176
    bool use_default_implementation_for_nulls() const override { return false; }
164
165
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
166
99
                        uint32_t result, size_t input_rows_count) const override {
167
99
        auto res_data_column = ColumnFloat64::create();
168
99
        auto& res = res_data_column->get_data();
169
99
        auto data_null_map = ColumnUInt8::create(input_rows_count, 0);
170
99
        auto& null_map = data_null_map->get_data();
171
172
99
        auto column = block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();
173
99
        if (const auto* nullable = check_and_get_column<const ColumnNullable>(*column)) {
174
3
            VectorizedUtils::update_null_map(null_map, nullable->get_null_map_data());
175
3
            column = nullable->get_nested_column_ptr();
176
3
        }
177
99
        const auto* str_col = assert_cast<const ColumnQuantileState*>(column.get());
178
99
        const auto& col_data = str_col->get_data();
179
99
        const auto* percent_arg = check_and_get_column_const<ColumnFloat32>(
180
99
                block.get_by_position(arguments.back()).column.get());
181
182
99
        if (!percent_arg) {
183
0
            return Status::InvalidArgument(
184
0
                    "Second argument to {} must be a constant float describing type", get_name());
185
0
        }
186
99
        auto percent_arg_value = percent_arg->get_value<TYPE_FLOAT>();
187
99
        if (percent_arg_value < 0 || percent_arg_value > 1) {
188
0
            return Status::InvalidArgument(
189
0
                    "the input argument of percentage: {} is not valid, must be in range [0,1] ",
190
0
                    percent_arg_value);
191
0
        }
192
193
99
        res.reserve(input_rows_count);
194
353
        for (size_t i = 0; i < input_rows_count; ++i) {
195
254
            if (null_map[i]) {
196
                // if null push_back meaningless result to make sure idxs can be matched
197
1
                res.push_back(0);
198
1
                continue;
199
1
            }
200
201
253
            res.push_back(col_data[i].get_value_by_percentile(percent_arg_value));
202
253
        }
203
204
99
        block.replace_by_position(result, std::move(res_data_column));
205
99
        return Status::OK();
206
99
    }
207
};
208
209
class FunctionQuantileStateFromBase64 : public IFunction {
210
public:
211
    static constexpr auto name = "quantile_state_from_base64";
212
1
    String get_name() const override { return name; }
213
214
18
    static FunctionPtr create() { return std::make_shared<FunctionQuantileStateFromBase64>(); }
215
216
9
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
217
9
        return std::make_shared<DataTypeNullable>(std::make_shared<DataTypeQuantileState>());
218
9
    }
219
220
9
    size_t get_number_of_arguments() const override { return 1; }
221
222
19
    bool use_default_implementation_for_nulls() const override { return true; }
223
224
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
225
7
                        uint32_t result, size_t input_rows_count) const override {
226
7
        auto res_null_map = ColumnUInt8::create(input_rows_count, 0);
227
7
        auto res_data_column = ColumnQuantileState::create();
228
7
        auto& null_map = res_null_map->get_data();
229
7
        auto& res = res_data_column->get_data();
230
231
7
        auto& argument_column = block.get_by_position(arguments[0]).column;
232
7
        const auto& str_column = static_cast<const ColumnString&>(*argument_column);
233
7
        const ColumnString::Chars& data = str_column.get_chars();
234
7
        const ColumnString::Offsets& offsets = str_column.get_offsets();
235
236
7
        res.reserve(input_rows_count);
237
238
7
        std::string decode_buff;
239
7
        int64_t last_decode_buff_len = 0;
240
7
        int64_t curr_decode_buff_len = 0;
241
14
        for (size_t i = 0; i < input_rows_count; ++i) {
242
7
            const char* src_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
243
7
            int64_t src_size = offsets[i] - offsets[i - 1];
244
245
7
            if (src_size == 0 || 0 != src_size % 4) {
246
3
                res.emplace_back();
247
3
                null_map[i] = 1;
248
3
                continue;
249
3
            }
250
251
4
            curr_decode_buff_len = src_size + 3;
252
4
            if (curr_decode_buff_len > last_decode_buff_len) {
253
4
                decode_buff.resize(curr_decode_buff_len);
254
4
                last_decode_buff_len = curr_decode_buff_len;
255
4
            }
256
4
            auto outlen = base64_decode(src_str, src_size, decode_buff.data());
257
4
            if (outlen < 0) {
258
0
                res.emplace_back();
259
0
                null_map[i] = 1;
260
4
            } else {
261
4
                doris::Slice decoded_slice(decode_buff.data(), outlen);
262
4
                doris::QuantileState quantile_state;
263
4
                if (!quantile_state.deserialize(decoded_slice)) {
264
0
                    return Status::RuntimeError(fmt::format(
265
0
                            "quantile_state_from_base64 decode failed: base64: {}", src_str));
266
4
                } else {
267
4
                    res.emplace_back(std::move(quantile_state));
268
4
                }
269
4
            }
270
4
        }
271
272
7
        block.get_by_position(result).column =
273
7
                ColumnNullable::create(std::move(res_data_column), std::move(res_null_map));
274
7
        return Status::OK();
275
7
    }
276
};
277
278
struct NameQuantileStateToBase64 {
279
    static constexpr auto name = "quantile_state_to_base64";
280
};
281
282
struct QuantileStateToBase64 {
283
    using ReturnType = DataTypeString;
284
    static constexpr auto PrimitiveTypeImpl = PrimitiveType::TYPE_QUANTILE_STATE;
285
    using Type = DataTypeQuantileState::FieldType;
286
    using ReturnColumnType = ColumnString;
287
    using Chars = ColumnString::Chars;
288
    using Offsets = ColumnString::Offsets;
289
290
15
    static Status vector(const std::vector<QuantileState>& data, Chars& chars, Offsets& offsets) {
291
15
        size_t size = data.size();
292
15
        offsets.resize(size);
293
15
        size_t output_char_size = 0;
294
34
        for (size_t i = 0; i < size; ++i) {
295
19
            auto& quantile_state_val = const_cast<QuantileState&>(data[i]);
296
19
            auto ser_size = quantile_state_val.get_serialized_size();
297
19
            output_char_size += (int)(4.0 * ceil((double)ser_size / 3.0));
298
19
        }
299
15
        ColumnString::check_chars_length(output_char_size, size);
300
15
        chars.resize(output_char_size);
301
15
        auto* chars_data = chars.data();
302
303
15
        size_t cur_ser_size = 0;
304
15
        size_t last_ser_size = 0;
305
15
        std::string ser_buff;
306
15
        size_t encoded_offset = 0;
307
34
        for (size_t i = 0; i < size; ++i) {
308
19
            auto& quantile_state_val = const_cast<QuantileState&>(data[i]);
309
310
19
            cur_ser_size = quantile_state_val.get_serialized_size();
311
19
            if (cur_ser_size > last_ser_size) {
312
19
                last_ser_size = cur_ser_size;
313
19
                ser_buff.resize(cur_ser_size);
314
19
            }
315
19
            size_t real_size =
316
19
                    quantile_state_val.serialize(reinterpret_cast<uint8_t*>(ser_buff.data()));
317
19
            auto outlen = base64_encode((const unsigned char*)ser_buff.data(), real_size,
318
19
                                        chars_data + encoded_offset);
319
19
            DCHECK(outlen > 0);
320
321
19
            encoded_offset += outlen;
322
19
            offsets[i] = cast_set<uint32_t>(encoded_offset);
323
19
        }
324
15
        return Status::OK();
325
15
    }
326
};
327
328
using FunctionQuantileStateToBase64 =
329
        FunctionUnaryToType<QuantileStateToBase64, NameQuantileStateToBase64>;
330
331
8
void register_function_quantile_state(SimpleFunctionFactory& factory) {
332
8
    factory.register_function<FunctionConst<QuantileStateEmpty, false>>();
333
8
    factory.register_function<FunctionQuantileStatePercent>();
334
8
    factory.register_function<FunctionToQuantileState>();
335
8
    factory.register_function<FunctionQuantileStateFromBase64>();
336
8
    factory.register_function<FunctionQuantileStateToBase64>();
337
8
}
338
339
} // namespace doris