Coverage Report

Created: 2026-05-27 00:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/function/function_quantile_state.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Functions/FunctionBitmap.h
19
// and modified by Doris
20
21
#include <fmt/format.h>
22
#include <glog/logging.h>
23
24
#include <boost/iterator/iterator_facade.hpp>
25
#include <cstddef>
26
#include <memory>
27
#include <utility>
28
29
#include "common/compiler_util.h" // IWYU pragma: keep
30
#include "common/status.h"
31
#include "core/assert_cast.h"
32
#include "core/block/block.h"
33
#include "core/block/column_numbers.h"
34
#include "core/block/column_with_type_and_name.h"
35
#include "core/column/column.h"
36
#include "core/column/column_complex.h"
37
#include "core/column/column_const.h"
38
#include "core/column/column_nullable.h"
39
#include "core/column/column_string.h"
40
#include "core/column/column_vector.h"
41
#include "core/data_type/data_type.h"
42
#include "core/data_type/data_type_nullable.h"
43
#include "core/data_type/data_type_number.h"
44
#include "core/data_type/data_type_quantilestate.h" // IWYU pragma: keep
45
#include "core/data_type/data_type_string.h"
46
#include "core/types.h"
47
#include "core/value/quantile_state.h"
48
#include "exec/common/util.hpp"
49
#include "exprs/aggregate/aggregate_function.h"
50
#include "exprs/function/function.h"
51
#include "exprs/function/function_const.h"
52
#include "exprs/function/function_helpers.h"
53
#include "exprs/function/function_totype.h"
54
#include "exprs/function/simple_function_factory.h"
55
#include "util/url_coding.h"
56
57
namespace doris {
58
class FunctionContext;
59
} // namespace doris
60
61
namespace doris {
62
63
struct QuantileStateEmpty {
64
    static constexpr auto name = "quantile_state_empty";
65
    using ReturnColVec = ColumnQuantileState;
66
0
    static DataTypePtr get_return_type() { return std::make_shared<DataTypeQuantileState>(); }
67
0
    static auto init_value() { return QuantileState {}; }
68
};
69
70
class FunctionToQuantileState : public IFunction {
71
public:
72
    static constexpr auto name = "to_quantile_state";
73
1
    String get_name() const override { return name; }
74
75
2
    static FunctionPtr create() { return std::make_shared<FunctionToQuantileState>(); }
76
77
0
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
78
0
        return std::make_shared<DataTypeQuantileState>();
79
0
    }
80
81
0
    size_t get_number_of_arguments() const override { return 2; }
82
83
0
    bool use_default_implementation_for_nulls() const override { return false; }
84
85
    template <bool is_nullable>
86
    Status execute_internal(const ColumnPtr& column, const DataTypePtr& data_type,
87
0
                            MutableColumnPtr& column_result, float compression) const {
88
0
        const ColumnNullable* col_nullable = nullptr;
89
0
        const ColumnUInt8* col_nullmap = nullptr;
90
0
        const ColumnFloat64* col = nullptr;
91
0
        const NullMap* nullmap = nullptr;
92
0
        if constexpr (is_nullable) {
93
0
            col_nullable = assert_cast<const ColumnNullable*>(column.get());
94
0
            col_nullmap = col_nullable->get_null_map_column_ptr().get();
95
0
            col = assert_cast<const ColumnFloat64*>(col_nullable->get_nested_column_ptr().get());
96
97
0
            nullmap = &col_nullmap->get_data();
98
0
        } else {
99
0
            col = assert_cast<const ColumnFloat64*>(column.get());
100
0
        }
101
0
        auto* res_column = reinterpret_cast<ColumnQuantileState*>(column_result.get());
102
0
        auto& res_data = res_column->get_data();
103
104
0
        size_t size = col->size();
105
0
        for (size_t i = 0; i < size; ++i) {
106
0
            if constexpr (is_nullable) {
107
0
                if ((*nullmap)[i]) {
108
0
                    res_data[i].clear();
109
0
                    continue;
110
0
                }
111
0
            }
112
0
            auto value = (double)col->get_data()[i];
113
0
            res_data[i].set_compression(compression);
114
0
            res_data[i].add_value(value);
115
0
        }
116
0
        return Status::OK();
117
0
    }
Unexecuted instantiation: _ZNK5doris23FunctionToQuantileState16execute_internalILb1EEENS_6StatusERKNS_3COWINS_7IColumnEE13immutable_ptrIS4_EERKSt10shared_ptrIKNS_9IDataTypeEERNS5_11mutable_ptrIS4_EEf
Unexecuted instantiation: _ZNK5doris23FunctionToQuantileState16execute_internalILb0EEENS_6StatusERKNS_3COWINS_7IColumnEE13immutable_ptrIS4_EERKSt10shared_ptrIKNS_9IDataTypeEERNS5_11mutable_ptrIS4_EEf
118
119
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
120
0
                        uint32_t result, size_t input_rows_count) const override {
121
0
        const ColumnPtr& column = block.get_by_position(arguments[0]).column;
122
0
        const DataTypePtr& data_type = block.get_by_position(arguments[0]).type;
123
0
        const auto* compression_arg = check_and_get_column_const<ColumnFloat32>(
124
0
                block.get_by_position(arguments.back()).column.get());
125
0
        float compression = 2048;
126
0
        if (compression_arg) {
127
0
            auto compression_arg_val = compression_arg->get_value<TYPE_FLOAT>();
128
0
            if (compression_arg_val >= QUANTILE_STATE_COMPRESSION_MIN &&
129
0
                compression_arg_val <= QUANTILE_STATE_COMPRESSION_MAX) {
130
0
                compression = compression_arg_val;
131
0
            }
132
0
        }
133
0
        MutableColumnPtr column_result = get_return_type_impl({})->create_column();
134
0
        column_result->resize(input_rows_count);
135
136
0
        Status status = Status::OK();
137
0
        if (data_type->is_nullable()) {
138
0
            RETURN_IF_ERROR(execute_internal<true>(column, data_type, column_result, compression));
139
0
        } else {
140
0
            RETURN_IF_ERROR(execute_internal<false>(column, data_type, column_result, compression));
141
0
        }
142
0
        if (status.ok()) {
143
0
            block.replace_by_position(result, std::move(column_result));
144
0
        }
145
0
        return status;
146
0
    }
147
};
148
149
class FunctionQuantileStatePercent : public IFunction {
150
public:
151
    static constexpr auto name = "quantile_percent";
152
1
    String get_name() const override { return name; }
153
154
2
    static FunctionPtr create() { return std::make_shared<FunctionQuantileStatePercent>(); }
155
156
0
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
157
0
        return std::make_shared<DataTypeFloat64>();
158
0
    }
159
160
0
    size_t get_number_of_arguments() const override { return 2; }
161
162
0
    bool use_default_implementation_for_nulls() const override { return false; }
163
164
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
165
0
                        uint32_t result, size_t input_rows_count) const override {
166
0
        auto res_data_column = ColumnFloat64::create();
167
0
        auto& res = res_data_column->get_data();
168
0
        auto data_null_map = ColumnUInt8::create(input_rows_count, 0);
169
0
        auto& null_map = data_null_map->get_data();
170
171
0
        auto column = block.get_by_position(arguments[0]).column->convert_to_full_column_if_const();
172
0
        if (const auto* nullable = check_and_get_column<const ColumnNullable>(*column)) {
173
0
            VectorizedUtils::update_null_map(null_map, nullable->get_null_map_data());
174
0
            column = nullable->get_nested_column_ptr();
175
0
        }
176
0
        const auto* str_col = assert_cast<const ColumnQuantileState*>(column.get());
177
0
        const auto& col_data = str_col->get_data();
178
0
        const auto* percent_arg = check_and_get_column_const<ColumnFloat32>(
179
0
                block.get_by_position(arguments.back()).column.get());
180
181
0
        if (!percent_arg) {
182
0
            return Status::InvalidArgument(
183
0
                    "Second argument to {} must be a constant float describing type", get_name());
184
0
        }
185
0
        auto percent_arg_value = percent_arg->get_value<TYPE_FLOAT>();
186
0
        if (percent_arg_value < 0 || percent_arg_value > 1) {
187
0
            return Status::InvalidArgument(
188
0
                    "the input argument of percentage: {} is not valid, must be in range [0,1] ",
189
0
                    percent_arg_value);
190
0
        }
191
192
0
        res.reserve(input_rows_count);
193
0
        for (size_t i = 0; i < input_rows_count; ++i) {
194
0
            if (null_map[i]) {
195
                // if null push_back meaningless result to make sure idxs can be matched
196
0
                res.push_back(0);
197
0
                continue;
198
0
            }
199
200
0
            res.push_back(col_data[i].get_value_by_percentile(percent_arg_value));
201
0
        }
202
203
0
        block.replace_by_position(result, std::move(res_data_column));
204
0
        return Status::OK();
205
0
    }
206
};
207
208
class FunctionQuantileStateFromBase64 : public IFunction {
209
public:
210
    static constexpr auto name = "quantile_state_from_base64";
211
1
    String get_name() const override { return name; }
212
213
2
    static FunctionPtr create() { return std::make_shared<FunctionQuantileStateFromBase64>(); }
214
215
0
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
216
0
        return std::make_shared<DataTypeNullable>(std::make_shared<DataTypeQuantileState>());
217
0
    }
218
219
0
    size_t get_number_of_arguments() const override { return 1; }
220
221
0
    bool use_default_implementation_for_nulls() const override { return true; }
222
223
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
224
0
                        uint32_t result, size_t input_rows_count) const override {
225
0
        auto res_null_map = ColumnUInt8::create(input_rows_count, 0);
226
0
        auto res_data_column = ColumnQuantileState::create();
227
0
        auto& null_map = res_null_map->get_data();
228
0
        auto& res = res_data_column->get_data();
229
230
0
        auto& argument_column = block.get_by_position(arguments[0]).column;
231
0
        const auto& str_column = static_cast<const ColumnString&>(*argument_column);
232
0
        const ColumnString::Chars& data = str_column.get_chars();
233
0
        const ColumnString::Offsets& offsets = str_column.get_offsets();
234
235
0
        res.reserve(input_rows_count);
236
237
0
        std::string decode_buff;
238
0
        int64_t last_decode_buff_len = 0;
239
0
        int64_t curr_decode_buff_len = 0;
240
0
        for (size_t i = 0; i < input_rows_count; ++i) {
241
0
            const char* src_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
242
0
            int64_t src_size = offsets[i] - offsets[i - 1];
243
244
0
            if (src_size == 0 || 0 != src_size % 4) {
245
0
                res.emplace_back();
246
0
                null_map[i] = 1;
247
0
                continue;
248
0
            }
249
250
0
            curr_decode_buff_len = src_size + 3;
251
0
            if (curr_decode_buff_len > last_decode_buff_len) {
252
0
                decode_buff.resize(curr_decode_buff_len);
253
0
                last_decode_buff_len = curr_decode_buff_len;
254
0
            }
255
0
            auto outlen = base64_decode(src_str, src_size, decode_buff.data());
256
0
            if (outlen < 0) {
257
0
                res.emplace_back();
258
0
                null_map[i] = 1;
259
0
            } else {
260
0
                doris::Slice decoded_slice(decode_buff.data(), outlen);
261
0
                doris::QuantileState quantile_state;
262
0
                if (!quantile_state.deserialize(decoded_slice)) {
263
0
                    return Status::RuntimeError(fmt::format(
264
0
                            "quantile_state_from_base64 decode failed: base64: {}", src_str));
265
0
                } else {
266
0
                    res.emplace_back(std::move(quantile_state));
267
0
                }
268
0
            }
269
0
        }
270
271
0
        block.get_by_position(result).column =
272
0
                ColumnNullable::create(std::move(res_data_column), std::move(res_null_map));
273
0
        return Status::OK();
274
0
    }
275
};
276
277
struct NameQuantileStateToBase64 {
278
    static constexpr auto name = "quantile_state_to_base64";
279
};
280
281
struct QuantileStateToBase64 {
282
    using ReturnType = DataTypeString;
283
    static constexpr auto PrimitiveTypeImpl = PrimitiveType::TYPE_QUANTILE_STATE;
284
    using Type = DataTypeQuantileState::FieldType;
285
    using ReturnColumnType = ColumnString;
286
    using Chars = ColumnString::Chars;
287
    using Offsets = ColumnString::Offsets;
288
289
1
    static Status vector(const std::vector<QuantileState>& data, Chars& chars, Offsets& offsets) {
290
1
        size_t size = data.size();
291
1
        offsets.resize(size);
292
1
        size_t output_char_size = 0;
293
6
        for (size_t i = 0; i < size; ++i) {
294
5
            auto& quantile_state_val = const_cast<QuantileState&>(data[i]);
295
5
            auto ser_size = quantile_state_val.get_serialized_size();
296
5
            output_char_size += (int)(4.0 * ceil((double)ser_size / 3.0));
297
5
        }
298
1
        ColumnString::check_chars_length(output_char_size, size);
299
1
        chars.resize(output_char_size);
300
1
        auto* chars_data = chars.data();
301
302
1
        size_t cur_ser_size = 0;
303
1
        size_t last_ser_size = 0;
304
1
        std::string ser_buff;
305
1
        size_t encoded_offset = 0;
306
6
        for (size_t i = 0; i < size; ++i) {
307
5
            auto& quantile_state_val = const_cast<QuantileState&>(data[i]);
308
309
5
            cur_ser_size = quantile_state_val.get_serialized_size();
310
5
            if (cur_ser_size > last_ser_size) {
311
5
                last_ser_size = cur_ser_size;
312
5
                ser_buff.resize(cur_ser_size);
313
5
            }
314
5
            size_t real_size =
315
5
                    quantile_state_val.serialize(reinterpret_cast<uint8_t*>(ser_buff.data()));
316
5
            auto outlen = base64_encode((const unsigned char*)ser_buff.data(), real_size,
317
5
                                        chars_data + encoded_offset);
318
5
            DCHECK(outlen > 0);
319
320
5
            encoded_offset += outlen;
321
5
            offsets[i] = cast_set<uint32_t>(encoded_offset);
322
5
        }
323
1
        return Status::OK();
324
1
    }
325
};
326
327
using FunctionQuantileStateToBase64 =
328
        FunctionUnaryToType<QuantileStateToBase64, NameQuantileStateToBase64>;
329
330
1
void register_function_quantile_state(SimpleFunctionFactory& factory) {
331
1
    factory.register_function<FunctionConst<QuantileStateEmpty, false>>();
332
1
    factory.register_function<FunctionQuantileStatePercent>();
333
1
    factory.register_function<FunctionToQuantileState>();
334
1
    factory.register_function<FunctionQuantileStateFromBase64>();
335
1
    factory.register_function<FunctionQuantileStateToBase64>();
336
1
}
337
338
} // namespace doris