Coverage Report

Created: 2026-04-15 19:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/function/function_hll.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <algorithm>
19
#include <cstddef>
20
#include <cstdint>
21
#include <memory>
22
#include <utility>
23
#include <vector>
24
25
#include "common/cast_set.h"
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "core/block/column_numbers.h"
29
#include "core/block/column_with_type_and_name.h"
30
#include "core/column/column.h"
31
#include "core/column/column_complex.h"
32
#include "core/column/column_nullable.h"
33
#include "core/column/column_string.h"
34
#include "core/column/column_vector.h"
35
#include "core/data_type/data_type.h"
36
#include "core/data_type/data_type_hll.h"
37
#include "core/data_type/data_type_number.h"
38
#include "core/data_type/data_type_string.h"
39
#include "core/types.h"
40
#include "core/value/hll.h"
41
#include "exprs/function/function.h"
42
#include "exprs/function/function_always_not_nullable.h"
43
#include "exprs/function/function_const.h"
44
#include "exprs/function/function_totype.h"
45
#include "exprs/function/simple_function_factory.h"
46
#include "util/hash_util.hpp"
47
#include "util/url_coding.h"
48
49
namespace doris {
50
51
struct HLLCardinality {
52
    static constexpr auto name = "hll_cardinality";
53
54
    using ReturnType = DataTypeInt64;
55
56
0
    static void vector(const std::vector<HyperLogLog>& data, MutableColumnPtr& col_res) {
57
0
        typename ColumnInt64::Container& res =
58
0
                reinterpret_cast<ColumnInt64*>(col_res.get())->get_data();
59
60
0
        auto size = res.size();
61
0
        for (int i = 0; i < size; ++i) {
62
0
            res[i] = data[i].estimate_cardinality();
63
0
        }
64
0
    }
65
66
    static void vector_nullable(const std::vector<HyperLogLog>& data, const NullMap& nullmap,
67
1
                                MutableColumnPtr& col_res) {
68
1
        typename ColumnInt64::Container& res =
69
1
                reinterpret_cast<ColumnInt64*>(col_res.get())->get_data();
70
71
1
        auto size = res.size();
72
7
        for (int i = 0; i < size; ++i) {
73
6
            if (nullmap[i]) {
74
1
                res[i] = 0;
75
5
            } else {
76
5
                res[i] = data[i].estimate_cardinality();
77
5
            }
78
6
        }
79
1
    }
80
};
81
82
template <typename Function>
83
class FunctionHLL : public IFunction {
84
public:
85
    static constexpr auto name = Function::name;
86
87
3
    static FunctionPtr create() { return std::make_shared<FunctionHLL>(); }
88
89
1
    String get_name() const override { return Function::name; }
90
91
1
    size_t get_number_of_arguments() const override { return 1; }
92
93
2
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
94
2
        return std::make_shared<typename Function::ReturnType>();
95
2
    }
96
97
2
    bool use_default_implementation_for_nulls() const override { return false; }
98
99
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
100
1
                        uint32_t result, size_t input_rows_count) const override {
101
1
        auto column = block.get_by_position(arguments[0]).column;
102
103
1
        MutableColumnPtr column_result = get_return_type_impl({})->create_column();
104
1
        column_result->resize(input_rows_count);
105
1
        if (const ColumnNullable* col_nullable =
106
1
                    check_and_get_column<ColumnNullable>(column.get())) {
107
1
            const ColumnHLL* col =
108
1
                    check_and_get_column<ColumnHLL>(col_nullable->get_nested_column_ptr().get());
109
1
            const ColumnUInt8* col_nullmap = check_and_get_column<ColumnUInt8>(
110
1
                    col_nullable->get_null_map_column_ptr().get());
111
112
1
            if (col != nullptr && col_nullmap != nullptr) {
113
1
                Function::vector_nullable(col->get_data(), col_nullmap->get_data(), column_result);
114
1
                block.replace_by_position(result, std::move(column_result));
115
1
                return Status::OK();
116
1
            }
117
1
        } else if (const ColumnHLL* col = check_and_get_column<ColumnHLL>(column.get())) {
118
0
            Function::vector(col->get_data(), column_result);
119
0
            block.replace_by_position(result, std::move(column_result));
120
0
            return Status::OK();
121
0
        } else {
122
0
            return Status::RuntimeError("Illegal column {} of argument of function {}",
123
0
                                        block.get_by_position(arguments[0]).column->get_name(),
124
0
                                        get_name());
125
0
        }
126
127
0
        block.replace_by_position(result, std::move(column_result));
128
0
        return Status::OK();
129
1
    }
130
};
131
132
struct HLLEmptyImpl {
133
    static constexpr auto name = "hll_empty";
134
    using ReturnColVec = ColumnHLL;
135
0
    static auto get_return_type() { return std::make_shared<DataTypeHLL>(); }
136
0
    static HyperLogLog init_value() { return HyperLogLog {}; }
137
};
138
139
class FunctionHllFromBase64 : public IFunction {
140
public:
141
    static constexpr auto name = "hll_from_base64";
142
143
1
    String get_name() const override { return name; }
144
145
3
    static FunctionPtr create() { return std::make_shared<FunctionHllFromBase64>(); }
146
147
1
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
148
1
        return make_nullable(std::make_shared<DataTypeHLL>());
149
1
    }
150
151
1
    size_t get_number_of_arguments() const override { return 1; }
152
153
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
154
1
                        uint32_t result, size_t input_rows_count) const override {
155
1
        auto res_null_map = ColumnUInt8::create(input_rows_count, 0);
156
1
        auto res_data_column = ColumnHLL::create();
157
1
        auto& null_map = res_null_map->get_data();
158
1
        auto& res = res_data_column->get_data();
159
160
1
        auto& argument_column = block.get_by_position(arguments[0]).column;
161
1
        const auto& str_column = static_cast<const ColumnString&>(*argument_column);
162
1
        const ColumnString::Chars& data = str_column.get_chars();
163
1
        const ColumnString::Offsets& offsets = str_column.get_offsets();
164
165
1
        res.reserve(input_rows_count);
166
167
1
        std::string decode_buff;
168
1
        int64_t last_decode_buff_len = 0;
169
1
        int64_t curr_decode_buff_len = 0;
170
6
        for (size_t i = 0; i < input_rows_count; ++i) {
171
5
            const char* src_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
172
5
            int64_t src_size = offsets[i] - offsets[i - 1];
173
174
            // Base64 encoding has a characteristic where every 4 characters represent 3 bytes of data.
175
            // Here, we check if the length of the input string is a multiple of 4 to ensure it's a valid base64 encoded string.
176
5
            if (0 != src_size % 4) {
177
0
                res.emplace_back();
178
0
                null_map[i] = 1;
179
0
                continue;
180
0
            }
181
182
            // Allocate sufficient space for the decoded data.
183
            // The number 3 here represents the number of bytes in the decoded data for each group of 4 base64 characters.
184
            // We set the size of the decoding buffer to be 'src_size + 3' to ensure there is enough space to store the decoded data.
185
5
            curr_decode_buff_len = src_size + 3;
186
5
            if (curr_decode_buff_len > last_decode_buff_len) {
187
2
                decode_buff.resize(curr_decode_buff_len);
188
2
                last_decode_buff_len = curr_decode_buff_len;
189
2
            }
190
5
            auto outlen = base64_decode(src_str, src_size, decode_buff.data());
191
5
            if (outlen < 0) {
192
0
                res.emplace_back();
193
0
                null_map[i] = 1;
194
5
            } else {
195
5
                doris::Slice decoded_slice(decode_buff.data(), outlen);
196
5
                doris::HyperLogLog hll;
197
5
                if (!hll.deserialize(decoded_slice)) {
198
0
                    return Status::RuntimeError(
199
0
                            fmt::format("hll_from_base64 decode failed: base64: {}",
200
0
                                        StringRef(src_str, src_size).to_string()));
201
5
                } else {
202
5
                    res.emplace_back(std::move(hll));
203
5
                }
204
5
            }
205
5
        }
206
207
1
        block.get_by_position(result).column =
208
1
                ColumnNullable::create(std::move(res_data_column), std::move(res_null_map));
209
1
        return Status::OK();
210
1
    }
211
};
212
213
struct HLLHash {
214
    static constexpr auto name = "hll_hash";
215
216
    using ReturnType = DataTypeHLL;
217
    template <typename ColumnType>
218
0
    static void vector(const ColumnType* col, MutableColumnPtr& col_res) {
219
0
        if constexpr (std::is_same_v<ColumnType, ColumnString>) {
220
0
            const ColumnString::Chars& data = col->get_chars();
221
0
            const ColumnString::Offsets& offsets = col->get_offsets();
222
0
            auto* res_column = reinterpret_cast<ColumnHLL*>(col_res.get());
223
0
            auto& res_data = res_column->get_data();
224
0
            size_t size = offsets.size();
225
226
0
            for (size_t i = 0; i < size; ++i) {
227
0
                const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
228
0
                size_t str_size = offsets[i] - offsets[i - 1];
229
0
                uint64_t hash_value =
230
0
                        HashUtil::murmur_hash64A(raw_str, str_size, HashUtil::MURMUR_SEED);
231
0
                res_data[i].update(hash_value);
232
0
            }
233
0
        }
234
0
    }
Unexecuted instantiation: _ZN5doris7HLLHash6vectorINS_9ColumnStrIjEEEEvPKT_RNS_3COWINS_7IColumnEE11mutable_ptrIS8_EE
Unexecuted instantiation: _ZN5doris7HLLHash6vectorINS_12ColumnVectorILNS_13PrimitiveTypeE6EEEEEvPKT_RNS_3COWINS_7IColumnEE11mutable_ptrIS9_EE
235
236
    template <typename ColumnType>
237
    static void vector_nullable(const ColumnType* col, const NullMap& nullmap,
238
0
                                MutableColumnPtr& col_res) {
239
0
        if constexpr (std::is_same_v<ColumnType, ColumnString>) {
240
0
            const ColumnString::Chars& data = col->get_chars();
241
0
            const ColumnString::Offsets& offsets = col->get_offsets();
242
0
            auto* res_column = reinterpret_cast<ColumnHLL*>(col_res.get());
243
0
            auto& res_data = res_column->get_data();
244
0
            size_t size = offsets.size();
245
246
0
            for (size_t i = 0; i < size; ++i) {
247
0
                if (nullmap[i]) {
248
0
                    continue;
249
0
                } else {
250
0
                    const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
251
0
                    size_t str_size = offsets[i] - offsets[i - 1];
252
0
                    uint64_t hash_value =
253
0
                            HashUtil::murmur_hash64A(raw_str, str_size, HashUtil::MURMUR_SEED);
254
0
                    res_data[i].update(hash_value);
255
0
                }
256
0
            }
257
0
        }
258
0
    }
Unexecuted instantiation: _ZN5doris7HLLHash15vector_nullableINS_9ColumnStrIjEEEEvPKT_RKNS_8PODArrayIhLm4096ENS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEELm16ELm15EEERNS_3COWINS_7IColumnEE11mutable_ptrISF_EE
Unexecuted instantiation: _ZN5doris7HLLHash15vector_nullableINS_12ColumnVectorILNS_13PrimitiveTypeE6EEEEEvPKT_RKNS_8PODArrayIhLm4096ENS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEELm16ELm15EEERNS_3COWINS_7IColumnEE11mutable_ptrISG_EE
259
};
260
261
struct NameHllToBase64 {
262
    static constexpr auto name = "hll_to_base64";
263
};
264
265
struct HllToBase64 {
266
    using ReturnType = DataTypeString;
267
    static constexpr auto PrimitiveTypeImpl = PrimitiveType::TYPE_HLL;
268
    using Type = DataTypeHLL::FieldType;
269
    using ReturnColumnType = ColumnString;
270
    using Chars = ColumnString::Chars;
271
    using Offsets = ColumnString::Offsets;
272
273
1
    static Status vector(const std::vector<HyperLogLog>& data, Chars& chars, Offsets& offsets) {
274
1
        size_t size = data.size();
275
1
        offsets.resize(size);
276
1
        size_t output_char_size = 0;
277
7
        for (size_t i = 0; i < size; ++i) {
278
6
            auto& hll_val = data[i];
279
6
            auto ser_size = hll_val.max_serialized_size();
280
6
            output_char_size += (int)(4.0 * ceil((double)ser_size / 3.0));
281
6
        }
282
1
        ColumnString::check_chars_length(output_char_size, size);
283
1
        chars.resize(output_char_size);
284
1
        auto* chars_data = chars.data();
285
286
1
        size_t cur_ser_size = 0;
287
1
        size_t last_ser_size = 0;
288
1
        std::string ser_buff;
289
1
        size_t encoded_offset = 0;
290
7
        for (size_t i = 0; i < size; ++i) {
291
6
            auto& hll_val = data[i];
292
293
6
            cur_ser_size = hll_val.max_serialized_size();
294
6
            if (cur_ser_size > last_ser_size) {
295
2
                last_ser_size = cur_ser_size;
296
2
                ser_buff.resize(cur_ser_size);
297
2
            }
298
6
            size_t real_size = hll_val.serialize(reinterpret_cast<uint8_t*>(ser_buff.data()));
299
6
            auto outlen = base64_encode((const unsigned char*)ser_buff.data(), real_size,
300
6
                                        chars_data + encoded_offset);
301
6
            DCHECK(outlen > 0);
302
303
6
            encoded_offset += outlen;
304
6
            offsets[i] = cast_set<uint32_t>(encoded_offset);
305
6
        }
306
1
        return Status::OK();
307
1
    }
308
};
309
310
using FunctionHLLCardinality = FunctionHLL<HLLCardinality>;
311
using FunctionHLLEmpty = FunctionConst<HLLEmptyImpl, false>;
312
using FunctionHLLHash = FunctionAlwaysNotNullable<HLLHash>;
313
using FunctionHllToBase64 = FunctionUnaryToType<HllToBase64, NameHllToBase64>;
314
315
1
void register_function_hll(SimpleFunctionFactory& factory) {
316
1
    factory.register_function<FunctionHLLCardinality>();
317
1
    factory.register_function<FunctionHLLEmpty>();
318
1
    factory.register_function<FunctionHllFromBase64>();
319
1
    factory.register_function<FunctionHLLHash>();
320
1
    factory.register_function<FunctionHllToBase64>();
321
1
}
322
323
} // namespace doris