Coverage Report

Created: 2026-03-13 09:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/function/function_quantile_state.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Functions/FunctionBitmap.h
19
// and modified by Doris
20
21
#include <fmt/format.h>
22
#include <glog/logging.h>
23
24
#include <boost/iterator/iterator_facade.hpp>
25
#include <cstddef>
26
#include <memory>
27
#include <utility>
28
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/status.h"
31
#include "core/assert_cast.h"
32
#include "core/block/block.h"
33
#include "core/block/column_numbers.h"
34
#include "core/block/column_with_type_and_name.h"
35
#include "core/column/column.h"
36
#include "core/column/column_complex.h"
37
#include "core/column/column_const.h"
38
#include "core/column/column_nullable.h"
39
#include "core/column/column_string.h"
40
#include "core/column/column_vector.h"
41
#include "core/data_type/data_type.h"
42
#include "core/data_type/data_type_nullable.h"
43
#include "core/data_type/data_type_number.h"
44
#include "core/data_type/data_type_quantilestate.h" // IWYU pragma: keep
45
#include "core/data_type/data_type_string.h"
46
#include "core/types.h"
47
#include "core/value/quantile_state.h"
48
#include "exec/common/util.hpp"
49
#include "exprs/aggregate/aggregate_function.h"
50
#include "exprs/function/function.h"
51
#include "exprs/function/function_const.h"
52
#include "exprs/function/function_helpers.h"
53
#include "exprs/function/function_totype.h"
54
#include "exprs/function/simple_function_factory.h"
55
#include "util/url_coding.h"
56
57
namespace doris {
58
class FunctionContext;
59
} // namespace doris
60
61
namespace doris {
62
63
struct QuantileStateEmpty {
64
    static constexpr auto name = "quantile_state_empty";
65
    using ReturnColVec = ColumnQuantileState;
66
4
    static DataTypePtr get_return_type() { return std::make_shared<DataTypeQuantileState>(); }
67
4
    static auto init_value() { return QuantileState {}; }
68
};
69
70
class FunctionToQuantileState : public IFunction {
71
public:
72
    static constexpr auto name = "to_quantile_state";
73
1
    String get_name() const override { return name; }
74
75
174
    static FunctionPtr create() { return std::make_shared<FunctionToQuantileState>(); }
76
77
335
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
78
335
        return std::make_shared<DataTypeQuantileState>();
79
335
    }
80
81
165
    size_t get_number_of_arguments() const override { return 2; }
82
83
335
    bool use_default_implementation_for_nulls() const override { return false; }
84
85
    template <bool is_nullable>
86
    Status execute_internal(const ColumnPtr& column, const DataTypePtr& data_type,
87
170
                            MutableColumnPtr& column_result, float compression) const {
88
170
        auto type_error = [&]() {
89
0
            return Status::RuntimeError("Illegal column {} of argument of function {}",
90
0
                                        column->get_name(), get_name());
91
0
        };
92
170
        const ColumnNullable* col_nullable = nullptr;
93
170
        const ColumnUInt8* col_nullmap = nullptr;
94
170
        const ColumnFloat64* col = nullptr;
95
170
        const NullMap* nullmap = nullptr;
96
170
        if constexpr (is_nullable) {
97
82
            col_nullable = check_and_get_column<ColumnNullable>(column.get());
98
82
            col_nullmap = check_and_get_column<ColumnUInt8>(
99
82
                    col_nullable->get_null_map_column_ptr().get());
100
82
            col = check_and_get_column<ColumnFloat64>(col_nullable->get_nested_column_ptr().get());
101
82
            if (col == nullptr || col_nullmap == nullptr) {
102
0
                return type_error();
103
0
            }
104
105
82
            nullmap = &col_nullmap->get_data();
106
88
        } else {
107
88
            col = check_and_get_column<ColumnFloat64>(column.get());
108
88
        }
109
0
        auto* res_column = reinterpret_cast<ColumnQuantileState*>(column_result.get());
110
170
        auto& res_data = res_column->get_data();
111
112
170
        size_t size = col->size();
113
19.6k
        for (size_t i = 0; i < size; ++i) {
114
19.3k
            if constexpr (is_nullable) {
115
11.3k
                if ((*nullmap)[i]) {
116
60
                    res_data[i].clear();
117
60
                    continue;
118
60
                }
119
11.3k
            }
120
11.2k
            auto value = (double)col->get_data()[i];
121
19.3k
            res_data[i].set_compression(compression);
122
19.3k
            res_data[i].add_value(value);
123
19.3k
        }
124
142
        return Status::OK();
125
170
    }
_ZNK5doris23FunctionToQuantileState16execute_internalILb1EEENS_6StatusERKNS_3COWINS_7IColumnEE13immutable_ptrIS4_EERKSt10shared_ptrIKNS_9IDataTypeEERNS5_11mutable_ptrIS4_EEf
Line
Count
Source
87
82
                            MutableColumnPtr& column_result, float compression) const {
88
82
        auto type_error = [&]() {
89
82
            return Status::RuntimeError("Illegal column {} of argument of function {}",
90
82
                                        column->get_name(), get_name());
91
82
        };
92
82
        const ColumnNullable* col_nullable = nullptr;
93
82
        const ColumnUInt8* col_nullmap = nullptr;
94
82
        const ColumnFloat64* col = nullptr;
95
82
        const NullMap* nullmap = nullptr;
96
82
        if constexpr (is_nullable) {
97
82
            col_nullable = check_and_get_column<ColumnNullable>(column.get());
98
82
            col_nullmap = check_and_get_column<ColumnUInt8>(
99
82
                    col_nullable->get_null_map_column_ptr().get());
100
82
            col = check_and_get_column<ColumnFloat64>(col_nullable->get_nested_column_ptr().get());
101
82
            if (col == nullptr || col_nullmap == nullptr) {
102
0
                return type_error();
103
0
            }
104
105
82
            nullmap = &col_nullmap->get_data();
106
        } else {
107
            col = check_and_get_column<ColumnFloat64>(column.get());
108
        }
109
0
        auto* res_column = reinterpret_cast<ColumnQuantileState*>(column_result.get());
110
82
        auto& res_data = res_column->get_data();
111
112
82
        size_t size = col->size();
113
11.4k
        for (size_t i = 0; i < size; ++i) {
114
11.3k
            if constexpr (is_nullable) {
115
11.3k
                if ((*nullmap)[i]) {
116
60
                    res_data[i].clear();
117
60
                    continue;
118
60
                }
119
11.3k
            }
120
11.2k
            auto value = (double)col->get_data()[i];
121
11.3k
            res_data[i].set_compression(compression);
122
11.3k
            res_data[i].add_value(value);
123
11.3k
        }
124
142
        return Status::OK();
125
82
    }
_ZNK5doris23FunctionToQuantileState16execute_internalILb0EEENS_6StatusERKNS_3COWINS_7IColumnEE13immutable_ptrIS4_EERKSt10shared_ptrIKNS_9IDataTypeEERNS5_11mutable_ptrIS4_EEf
Line
Count
Source
87
88
                            MutableColumnPtr& column_result, float compression) const {
88
88
        auto type_error = [&]() {
89
88
            return Status::RuntimeError("Illegal column {} of argument of function {}",
90
88
                                        column->get_name(), get_name());
91
88
        };
92
88
        const ColumnNullable* col_nullable = nullptr;
93
88
        const ColumnUInt8* col_nullmap = nullptr;
94
88
        const ColumnFloat64* col = nullptr;
95
88
        const NullMap* nullmap = nullptr;
96
        if constexpr (is_nullable) {
97
            col_nullable = check_and_get_column<ColumnNullable>(column.get());
98
            col_nullmap = check_and_get_column<ColumnUInt8>(
99
                    col_nullable->get_null_map_column_ptr().get());
100
            col = check_and_get_column<ColumnFloat64>(col_nullable->get_nested_column_ptr().get());
101
            if (col == nullptr || col_nullmap == nullptr) {
102
                return type_error();
103
            }
104
105
            nullmap = &col_nullmap->get_data();
106
88
        } else {
107
88
            col = check_and_get_column<ColumnFloat64>(column.get());
108
88
        }
109
88
        auto* res_column = reinterpret_cast<ColumnQuantileState*>(column_result.get());
110
88
        auto& res_data = res_column->get_data();
111
112
88
        size_t size = col->size();
113
8.17k
        for (size_t i = 0; i < size; ++i) {
114
            if constexpr (is_nullable) {
115
                if ((*nullmap)[i]) {
116
                    res_data[i].clear();
117
                    continue;
118
                }
119
            }
120
8.08k
            auto value = (double)col->get_data()[i];
121
8.08k
            res_data[i].set_compression(compression);
122
8.08k
            res_data[i].add_value(value);
123
8.08k
        }
124
88
        return Status::OK();
125
88
    }
126
127
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
128
170
                        uint32_t result, size_t input_rows_count) const override {
129
170
        const ColumnPtr& column = block.get_by_position(arguments[0]).column;
130
170
        const DataTypePtr& data_type = block.get_by_position(arguments[0]).type;
131
170
        const auto* compression_arg = check_and_get_column_const<ColumnFloat32>(
132
170
                block.get_by_position(arguments.back()).column.get());
133
170
        float compression = 2048;
134
170
        if (compression_arg) {
135
85
            auto compression_arg_val = compression_arg->get_value<TYPE_FLOAT>();
136
85
            if (compression_arg_val >= QUANTILE_STATE_COMPRESSION_MIN &&
137
85
                compression_arg_val <= QUANTILE_STATE_COMPRESSION_MAX) {
138
27
                compression = compression_arg_val;
139
27
            }
140
85
        }
141
170
        MutableColumnPtr column_result = get_return_type_impl({})->create_column();
142
170
        column_result->resize(input_rows_count);
143
144
170
        Status status = Status::OK();
145
170
        if (data_type->is_nullable()) {
146
82
            RETURN_IF_ERROR(execute_internal<true>(column, data_type, column_result, compression));
147
88
        } else {
148
88
            RETURN_IF_ERROR(execute_internal<false>(column, data_type, column_result, compression));
149
88
        }
150
170
        if (status.ok()) {
151
170
            block.replace_by_position(result, std::move(column_result));
152
170
        }
153
170
        return status;
154
170
    }
155
};
156
157
class FunctionQuantileStatePercent : public IFunction {
158
public:
159
    static constexpr auto name = "quantile_percent";
160
1
    String get_name() const override { return name; }
161
162
85
    static FunctionPtr create() { return std::make_shared<FunctionQuantileStatePercent>(); }
163
164
76
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
165
76
        return std::make_shared<DataTypeFloat64>();
166
76
    }
167
168
76
    size_t get_number_of_arguments() const override { return 2; }
169
170
176
    bool use_default_implementation_for_nulls() const override { return false; }
171
172
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
173
100
                        uint32_t result, size_t input_rows_count) const override {
174
100
        auto res_data_column = ColumnFloat64::create();
175
100
        auto& res = res_data_column->get_data();
176
100
        auto data_null_map = ColumnUInt8::create(input_rows_count, 0);
177
100
        auto& null_map = data_null_map->get_data();
178
179
100
        auto column = block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();
180
100
        if (const auto* nullable = check_and_get_column<const ColumnNullable>(*column)) {
181
3
            VectorizedUtils::update_null_map(null_map, nullable->get_null_map_data());
182
3
            column = nullable->get_nested_column_ptr();
183
3
        }
184
100
        const auto* str_col = assert_cast<const ColumnQuantileState*>(column.get());
185
100
        const auto& col_data = str_col->get_data();
186
100
        const auto* percent_arg = check_and_get_column_const<ColumnFloat32>(
187
100
                block.get_by_position(arguments.back()).column.get());
188
189
100
        if (!percent_arg) {
190
0
            return Status::InvalidArgument(
191
0
                    "Second argument to {} must be a constant float describing type", get_name());
192
0
        }
193
100
        auto percent_arg_value = percent_arg->get_value<TYPE_FLOAT>();
194
100
        if (percent_arg_value < 0 || percent_arg_value > 1) {
195
0
            return Status::InvalidArgument(
196
0
                    "the input argument of percentage: {} is not valid, must be in range [0,1] ",
197
0
                    percent_arg_value);
198
0
        }
199
200
100
        res.reserve(input_rows_count);
201
351
        for (size_t i = 0; i < input_rows_count; ++i) {
202
251
            if (null_map[i]) {
203
                // if null push_back meaningless result to make sure idxs can be matched
204
1
                res.push_back(0);
205
1
                continue;
206
1
            }
207
208
250
            res.push_back(col_data[i].get_value_by_percentile(percent_arg_value));
209
250
        }
210
211
100
        block.replace_by_position(result, std::move(res_data_column));
212
100
        return Status::OK();
213
100
    }
214
};
215
216
class FunctionQuantileStateFromBase64 : public IFunction {
217
public:
218
    static constexpr auto name = "quantile_state_from_base64";
219
1
    String get_name() const override { return name; }
220
221
18
    static FunctionPtr create() { return std::make_shared<FunctionQuantileStateFromBase64>(); }
222
223
9
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
224
9
        return std::make_shared<DataTypeNullable>(std::make_shared<DataTypeQuantileState>());
225
9
    }
226
227
9
    size_t get_number_of_arguments() const override { return 1; }
228
229
19
    bool use_default_implementation_for_nulls() const override { return true; }
230
231
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
232
7
                        uint32_t result, size_t input_rows_count) const override {
233
7
        auto res_null_map = ColumnUInt8::create(input_rows_count, 0);
234
7
        auto res_data_column = ColumnQuantileState::create();
235
7
        auto& null_map = res_null_map->get_data();
236
7
        auto& res = res_data_column->get_data();
237
238
7
        auto& argument_column = block.get_by_position(arguments[0]).column;
239
7
        const auto& str_column = static_cast<const ColumnString&>(*argument_column);
240
7
        const ColumnString::Chars& data = str_column.get_chars();
241
7
        const ColumnString::Offsets& offsets = str_column.get_offsets();
242
243
7
        res.reserve(input_rows_count);
244
245
7
        std::string decode_buff;
246
7
        int64_t last_decode_buff_len = 0;
247
7
        int64_t curr_decode_buff_len = 0;
248
14
        for (size_t i = 0; i < input_rows_count; ++i) {
249
7
            const char* src_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
250
7
            int64_t src_size = offsets[i] - offsets[i - 1];
251
252
7
            if (src_size == 0 || 0 != src_size % 4) {
253
3
                res.emplace_back();
254
3
                null_map[i] = 1;
255
3
                continue;
256
3
            }
257
258
4
            curr_decode_buff_len = src_size + 3;
259
4
            if (curr_decode_buff_len > last_decode_buff_len) {
260
4
                decode_buff.resize(curr_decode_buff_len);
261
4
                last_decode_buff_len = curr_decode_buff_len;
262
4
            }
263
4
            auto outlen = base64_decode(src_str, src_size, decode_buff.data());
264
4
            if (outlen < 0) {
265
0
                res.emplace_back();
266
0
                null_map[i] = 1;
267
4
            } else {
268
4
                doris::Slice decoded_slice(decode_buff.data(), outlen);
269
4
                doris::QuantileState quantile_state;
270
4
                if (!quantile_state.deserialize(decoded_slice)) {
271
0
                    return Status::RuntimeError(fmt::format(
272
0
                            "quantile_state_from_base64 decode failed: base64: {}", src_str));
273
4
                } else {
274
4
                    res.emplace_back(std::move(quantile_state));
275
4
                }
276
4
            }
277
4
        }
278
279
7
        block.get_by_position(result).column =
280
7
                ColumnNullable::create(std::move(res_data_column), std::move(res_null_map));
281
7
        return Status::OK();
282
7
    }
283
};
284
285
struct NameQuantileStateToBase64 {
286
    static constexpr auto name = "quantile_state_to_base64";
287
};
288
289
struct QuantileStateToBase64 {
290
    using ReturnType = DataTypeString;
291
    static constexpr auto PrimitiveTypeImpl = PrimitiveType::TYPE_QUANTILE_STATE;
292
    using Type = DataTypeQuantileState::FieldType;
293
    using ReturnColumnType = ColumnString;
294
    using Chars = ColumnString::Chars;
295
    using Offsets = ColumnString::Offsets;
296
297
15
    static Status vector(const std::vector<QuantileState>& data, Chars& chars, Offsets& offsets) {
298
15
        size_t size = data.size();
299
15
        offsets.resize(size);
300
15
        size_t output_char_size = 0;
301
34
        for (size_t i = 0; i < size; ++i) {
302
19
            auto& quantile_state_val = const_cast<QuantileState&>(data[i]);
303
19
            auto ser_size = quantile_state_val.get_serialized_size();
304
19
            output_char_size += (int)(4.0 * ceil((double)ser_size / 3.0));
305
19
        }
306
15
        ColumnString::check_chars_length(output_char_size, size);
307
15
        chars.resize(output_char_size);
308
15
        auto* chars_data = chars.data();
309
310
15
        size_t cur_ser_size = 0;
311
15
        size_t last_ser_size = 0;
312
15
        std::string ser_buff;
313
15
        size_t encoded_offset = 0;
314
34
        for (size_t i = 0; i < size; ++i) {
315
19
            auto& quantile_state_val = const_cast<QuantileState&>(data[i]);
316
317
19
            cur_ser_size = quantile_state_val.get_serialized_size();
318
19
            if (cur_ser_size > last_ser_size) {
319
19
                last_ser_size = cur_ser_size;
320
19
                ser_buff.resize(cur_ser_size);
321
19
            }
322
19
            size_t real_size =
323
19
                    quantile_state_val.serialize(reinterpret_cast<uint8_t*>(ser_buff.data()));
324
19
            auto outlen = base64_encode((const unsigned char*)ser_buff.data(), real_size,
325
19
                                        chars_data + encoded_offset);
326
19
            DCHECK(outlen > 0);
327
328
19
            encoded_offset += outlen;
329
19
            offsets[i] = cast_set<uint32_t>(encoded_offset);
330
19
        }
331
15
        return Status::OK();
332
15
    }
333
};
334
335
using FunctionQuantileStateToBase64 =
336
        FunctionUnaryToType<QuantileStateToBase64, NameQuantileStateToBase64>;
337
338
8
void register_function_quantile_state(SimpleFunctionFactory& factory) {
339
8
    factory.register_function<FunctionConst<QuantileStateEmpty, false>>();
340
8
    factory.register_function<FunctionQuantileStatePercent>();
341
8
    factory.register_function<FunctionToQuantileState>();
342
8
    factory.register_function<FunctionQuantileStateFromBase64>();
343
8
    factory.register_function<FunctionQuantileStateToBase64>();
344
8
}
345
346
} // namespace doris