Coverage Report

Created: 2026-03-16 08:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/function/function_hll.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <algorithm>
19
#include <cstddef>
20
#include <cstdint>
21
#include <memory>
22
#include <utility>
23
#include <vector>
24
25
#include "common/cast_set.h"
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "core/block/column_numbers.h"
29
#include "core/block/column_with_type_and_name.h"
30
#include "core/column/column.h"
31
#include "core/column/column_complex.h"
32
#include "core/column/column_nullable.h"
33
#include "core/column/column_string.h"
34
#include "core/column/column_vector.h"
35
#include "core/data_type/data_type.h"
36
#include "core/data_type/data_type_hll.h"
37
#include "core/data_type/data_type_number.h"
38
#include "core/data_type/data_type_string.h"
39
#include "core/types.h"
40
#include "core/value/hll.h"
41
#include "exprs/function/function.h"
42
#include "exprs/function/function_always_not_nullable.h"
43
#include "exprs/function/function_const.h"
44
#include "exprs/function/function_totype.h"
45
#include "exprs/function/simple_function_factory.h"
46
#include "util/hash_util.hpp"
47
#include "util/url_coding.h"
48
49
namespace doris {
50
#include "common/compile_check_begin.h"
51
52
struct HLLCardinality {
53
    static constexpr auto name = "hll_cardinality";
54
55
    using ReturnType = DataTypeInt64;
56
57
63
    static void vector(const std::vector<HyperLogLog>& data, MutableColumnPtr& col_res) {
58
63
        typename ColumnInt64::Container& res =
59
63
                reinterpret_cast<ColumnInt64*>(col_res.get())->get_data();
60
61
63
        auto size = res.size();
62
202
        for (int i = 0; i < size; ++i) {
63
139
            res[i] = data[i].estimate_cardinality();
64
139
        }
65
63
    }
66
67
    static void vector_nullable(const std::vector<HyperLogLog>& data, const NullMap& nullmap,
68
29
                                MutableColumnPtr& col_res) {
69
29
        typename ColumnInt64::Container& res =
70
29
                reinterpret_cast<ColumnInt64*>(col_res.get())->get_data();
71
72
29
        auto size = res.size();
73
101
        for (int i = 0; i < size; ++i) {
74
72
            if (nullmap[i]) {
75
2
                res[i] = 0;
76
70
            } else {
77
70
                res[i] = data[i].estimate_cardinality();
78
70
            }
79
72
        }
80
29
    }
81
};
82
83
template <typename Function>
84
class FunctionHLL : public IFunction {
85
public:
86
    static constexpr auto name = Function::name;
87
88
55
    static FunctionPtr create() { return std::make_shared<FunctionHLL>(); }
89
90
1
    String get_name() const override { return Function::name; }
91
92
46
    size_t get_number_of_arguments() const override { return 1; }
93
94
138
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
95
138
        return std::make_shared<typename Function::ReturnType>();
96
138
    }
97
98
138
    bool use_default_implementation_for_nulls() const override { return false; }
99
100
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
101
92
                        uint32_t result, size_t input_rows_count) const override {
102
92
        auto column = block.get_by_position(arguments[0]).column;
103
104
92
        MutableColumnPtr column_result = get_return_type_impl({})->create_column();
105
92
        column_result->resize(input_rows_count);
106
92
        if (const ColumnNullable* col_nullable =
107
92
                    check_and_get_column<ColumnNullable>(column.get())) {
108
29
            const ColumnHLL* col =
109
29
                    check_and_get_column<ColumnHLL>(col_nullable->get_nested_column_ptr().get());
110
29
            const ColumnUInt8* col_nullmap = check_and_get_column<ColumnUInt8>(
111
29
                    col_nullable->get_null_map_column_ptr().get());
112
113
29
            if (col != nullptr && col_nullmap != nullptr) {
114
29
                Function::vector_nullable(col->get_data(), col_nullmap->get_data(), column_result);
115
29
                block.replace_by_position(result, std::move(column_result));
116
29
                return Status::OK();
117
29
            }
118
63
        } else if (const ColumnHLL* col = check_and_get_column<ColumnHLL>(column.get())) {
119
63
            Function::vector(col->get_data(), column_result);
120
63
            block.replace_by_position(result, std::move(column_result));
121
63
            return Status::OK();
122
63
        } else {
123
0
            return Status::RuntimeError("Illegal column {} of argument of function {}",
124
0
                                        block.get_by_position(arguments[0]).column->get_name(),
125
0
                                        get_name());
126
0
        }
127
128
0
        block.replace_by_position(result, std::move(column_result));
129
0
        return Status::OK();
130
92
    }
131
};
132
133
struct HLLEmptyImpl {
134
    static constexpr auto name = "hll_empty";
135
    using ReturnColVec = ColumnHLL;
136
15
    static auto get_return_type() { return std::make_shared<DataTypeHLL>(); }
137
15
    static HyperLogLog init_value() { return HyperLogLog {}; }
138
};
139
140
class FunctionHllFromBase64 : public IFunction {
141
public:
142
    static constexpr auto name = "hll_from_base64";
143
144
1
    String get_name() const override { return name; }
145
146
25
    static FunctionPtr create() { return std::make_shared<FunctionHllFromBase64>(); }
147
148
16
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
149
16
        return make_nullable(std::make_shared<DataTypeHLL>());
150
16
    }
151
152
16
    size_t get_number_of_arguments() const override { return 1; }
153
154
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
155
39
                        uint32_t result, size_t input_rows_count) const override {
156
39
        auto res_null_map = ColumnUInt8::create(input_rows_count, 0);
157
39
        auto res_data_column = ColumnHLL::create();
158
39
        auto& null_map = res_null_map->get_data();
159
39
        auto& res = res_data_column->get_data();
160
161
39
        auto& argument_column = block.get_by_position(arguments[0]).column;
162
39
        const auto& str_column = static_cast<const ColumnString&>(*argument_column);
163
39
        const ColumnString::Chars& data = str_column.get_chars();
164
39
        const ColumnString::Offsets& offsets = str_column.get_offsets();
165
166
39
        res.reserve(input_rows_count);
167
168
39
        std::string decode_buff;
169
39
        int64_t last_decode_buff_len = 0;
170
39
        int64_t curr_decode_buff_len = 0;
171
147
        for (size_t i = 0; i < input_rows_count; ++i) {
172
109
            const char* src_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
173
109
            int64_t src_size = offsets[i] - offsets[i - 1];
174
175
            // Base64 encoding has a characteristic where every 4 characters represent 3 bytes of data.
176
            // Here, we check if the length of the input string is a multiple of 4 to ensure it's a valid base64 encoded string.
177
109
            if (0 != src_size % 4) {
178
0
                res.emplace_back();
179
0
                null_map[i] = 1;
180
0
                continue;
181
0
            }
182
183
            // Allocate sufficient space for the decoded data.
184
            // The number 3 here represents the number of bytes in the decoded data for each group of 4 base64 characters.
185
            // We set the size of the decoding buffer to be 'src_size + 3' to ensure there is enough space to store the decoded data.
186
109
            curr_decode_buff_len = src_size + 3;
187
109
            if (curr_decode_buff_len > last_decode_buff_len) {
188
42
                decode_buff.resize(curr_decode_buff_len);
189
42
                last_decode_buff_len = curr_decode_buff_len;
190
42
            }
191
109
            auto outlen = base64_decode(src_str, src_size, decode_buff.data());
192
109
            if (outlen < 0) {
193
0
                res.emplace_back();
194
0
                null_map[i] = 1;
195
109
            } else {
196
109
                doris::Slice decoded_slice(decode_buff.data(), outlen);
197
109
                doris::HyperLogLog hll;
198
109
                if (!hll.deserialize(decoded_slice)) {
199
1
                    return Status::RuntimeError(
200
1
                            fmt::format("hll_from_base64 decode failed: base64: {}",
201
1
                                        StringRef(src_str, src_size).to_string()));
202
108
                } else {
203
108
                    res.emplace_back(std::move(hll));
204
108
                }
205
109
            }
206
109
        }
207
208
38
        block.get_by_position(result).column =
209
38
                ColumnNullable::create(std::move(res_data_column), std::move(res_null_map));
210
38
        return Status::OK();
211
39
    }
212
};
213
214
struct HLLHash {
215
    static constexpr auto name = "hll_hash";
216
217
    using ReturnType = DataTypeHLL;
218
    template <typename ColumnType>
219
335
    static void vector(const ColumnType* col, MutableColumnPtr& col_res) {
220
335
        if constexpr (std::is_same_v<ColumnType, ColumnString>) {
221
335
            const ColumnString::Chars& data = col->get_chars();
222
335
            const ColumnString::Offsets& offsets = col->get_offsets();
223
335
            auto* res_column = reinterpret_cast<ColumnHLL*>(col_res.get());
224
335
            auto& res_data = res_column->get_data();
225
335
            size_t size = offsets.size();
226
227
4.07k
            for (size_t i = 0; i < size; ++i) {
228
3.74k
                const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
229
3.74k
                size_t str_size = offsets[i] - offsets[i - 1];
230
3.74k
                uint64_t hash_value =
231
3.74k
                        HashUtil::murmur_hash64A(raw_str, str_size, HashUtil::MURMUR_SEED);
232
3.74k
                res_data[i].update(hash_value);
233
3.74k
            }
234
335
        }
235
335
    }
_ZN5doris7HLLHash6vectorINS_9ColumnStrIjEEEEvPKT_RNS_3COWINS_7IColumnEE11mutable_ptrIS8_EE
Line
Count
Source
219
335
    static void vector(const ColumnType* col, MutableColumnPtr& col_res) {
220
335
        if constexpr (std::is_same_v<ColumnType, ColumnString>) {
221
335
            const ColumnString::Chars& data = col->get_chars();
222
335
            const ColumnString::Offsets& offsets = col->get_offsets();
223
335
            auto* res_column = reinterpret_cast<ColumnHLL*>(col_res.get());
224
335
            auto& res_data = res_column->get_data();
225
335
            size_t size = offsets.size();
226
227
4.07k
            for (size_t i = 0; i < size; ++i) {
228
3.74k
                const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
229
3.74k
                size_t str_size = offsets[i] - offsets[i - 1];
230
3.74k
                uint64_t hash_value =
231
3.74k
                        HashUtil::murmur_hash64A(raw_str, str_size, HashUtil::MURMUR_SEED);
232
3.74k
                res_data[i].update(hash_value);
233
3.74k
            }
234
335
        }
235
335
    }
Unexecuted instantiation: _ZN5doris7HLLHash6vectorINS_12ColumnVectorILNS_13PrimitiveTypeE6EEEEEvPKT_RNS_3COWINS_7IColumnEE11mutable_ptrIS9_EE
236
237
    template <typename ColumnType>
238
    static void vector_nullable(const ColumnType* col, const NullMap& nullmap,
239
144
                                MutableColumnPtr& col_res) {
240
144
        if constexpr (std::is_same_v<ColumnType, ColumnString>) {
241
144
            const ColumnString::Chars& data = col->get_chars();
242
144
            const ColumnString::Offsets& offsets = col->get_offsets();
243
144
            auto* res_column = reinterpret_cast<ColumnHLL*>(col_res.get());
244
144
            auto& res_data = res_column->get_data();
245
144
            size_t size = offsets.size();
246
247
4.70k
            for (size_t i = 0; i < size; ++i) {
248
4.55k
                if (nullmap[i]) {
249
80
                    continue;
250
4.47k
                } else {
251
4.47k
                    const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
252
4.47k
                    size_t str_size = offsets[i] - offsets[i - 1];
253
4.47k
                    uint64_t hash_value =
254
4.47k
                            HashUtil::murmur_hash64A(raw_str, str_size, HashUtil::MURMUR_SEED);
255
4.47k
                    res_data[i].update(hash_value);
256
4.47k
                }
257
4.55k
            }
258
144
        }
259
144
    }
_ZN5doris7HLLHash15vector_nullableINS_9ColumnStrIjEEEEvPKT_RKNS_8PODArrayIhLm4096ENS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEELm16ELm15EEERNS_3COWINS_7IColumnEE11mutable_ptrISF_EE
Line
Count
Source
239
144
                                MutableColumnPtr& col_res) {
240
144
        if constexpr (std::is_same_v<ColumnType, ColumnString>) {
241
144
            const ColumnString::Chars& data = col->get_chars();
242
144
            const ColumnString::Offsets& offsets = col->get_offsets();
243
144
            auto* res_column = reinterpret_cast<ColumnHLL*>(col_res.get());
244
144
            auto& res_data = res_column->get_data();
245
144
            size_t size = offsets.size();
246
247
4.70k
            for (size_t i = 0; i < size; ++i) {
248
4.55k
                if (nullmap[i]) {
249
80
                    continue;
250
4.47k
                } else {
251
4.47k
                    const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
252
4.47k
                    size_t str_size = offsets[i] - offsets[i - 1];
253
4.47k
                    uint64_t hash_value =
254
4.47k
                            HashUtil::murmur_hash64A(raw_str, str_size, HashUtil::MURMUR_SEED);
255
4.47k
                    res_data[i].update(hash_value);
256
4.47k
                }
257
4.55k
            }
258
144
        }
259
144
    }
Unexecuted instantiation: _ZN5doris7HLLHash15vector_nullableINS_12ColumnVectorILNS_13PrimitiveTypeE6EEEEEvPKT_RKNS_8PODArrayIhLm4096ENS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEELm16ELm15EEERNS_3COWINS_7IColumnEE11mutable_ptrISG_EE
260
};
261
262
struct NameHllToBase64 {
263
    static constexpr auto name = "hll_to_base64";
264
};
265
266
struct HllToBase64 {
267
    using ReturnType = DataTypeString;
268
    static constexpr auto PrimitiveTypeImpl = PrimitiveType::TYPE_HLL;
269
    using Type = DataTypeHLL::FieldType;
270
    using ReturnColumnType = ColumnString;
271
    using Chars = ColumnString::Chars;
272
    using Offsets = ColumnString::Offsets;
273
274
57
    static Status vector(const std::vector<HyperLogLog>& data, Chars& chars, Offsets& offsets) {
275
57
        size_t size = data.size();
276
57
        offsets.resize(size);
277
57
        size_t output_char_size = 0;
278
199
        for (size_t i = 0; i < size; ++i) {
279
142
            auto& hll_val = data[i];
280
142
            auto ser_size = hll_val.max_serialized_size();
281
142
            output_char_size += (int)(4.0 * ceil((double)ser_size / 3.0));
282
142
        }
283
57
        ColumnString::check_chars_length(output_char_size, size);
284
57
        chars.resize(output_char_size);
285
57
        auto* chars_data = chars.data();
286
287
57
        size_t cur_ser_size = 0;
288
57
        size_t last_ser_size = 0;
289
57
        std::string ser_buff;
290
57
        size_t encoded_offset = 0;
291
199
        for (size_t i = 0; i < size; ++i) {
292
142
            auto& hll_val = data[i];
293
294
142
            cur_ser_size = hll_val.max_serialized_size();
295
142
            if (cur_ser_size > last_ser_size) {
296
58
                last_ser_size = cur_ser_size;
297
58
                ser_buff.resize(cur_ser_size);
298
58
            }
299
142
            size_t real_size = hll_val.serialize(reinterpret_cast<uint8_t*>(ser_buff.data()));
300
142
            auto outlen = base64_encode((const unsigned char*)ser_buff.data(), real_size,
301
142
                                        chars_data + encoded_offset);
302
142
            DCHECK(outlen > 0);
303
304
142
            encoded_offset += outlen;
305
142
            offsets[i] = cast_set<uint32_t>(encoded_offset);
306
142
        }
307
57
        return Status::OK();
308
57
    }
309
};
310
311
using FunctionHLLCardinality = FunctionHLL<HLLCardinality>;
312
using FunctionHLLEmpty = FunctionConst<HLLEmptyImpl, false>;
313
using FunctionHLLHash = FunctionAlwaysNotNullable<HLLHash>;
314
using FunctionHllToBase64 = FunctionUnaryToType<HllToBase64, NameHllToBase64>;
315
316
8
void register_function_hll(SimpleFunctionFactory& factory) {
317
8
    factory.register_function<FunctionHLLCardinality>();
318
8
    factory.register_function<FunctionHLLEmpty>();
319
8
    factory.register_function<FunctionHllFromBase64>();
320
8
    factory.register_function<FunctionHLLHash>();
321
8
    factory.register_function<FunctionHllToBase64>();
322
8
}
323
324
} // namespace doris