Coverage Report

Created: 2026-05-11 13:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/function/function_hll.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <algorithm>
19
#include <cstddef>
20
#include <cstdint>
21
#include <memory>
22
#include <utility>
23
#include <vector>
24
25
#include "common/cast_set.h"
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "core/block/column_numbers.h"
29
#include "core/block/column_with_type_and_name.h"
30
#include "core/column/column.h"
31
#include "core/column/column_complex.h"
32
#include "core/column/column_nullable.h"
33
#include "core/column/column_string.h"
34
#include "core/column/column_vector.h"
35
#include "core/data_type/data_type.h"
36
#include "core/data_type/data_type_hll.h"
37
#include "core/data_type/data_type_number.h"
38
#include "core/data_type/data_type_string.h"
39
#include "core/types.h"
40
#include "core/value/hll.h"
41
#include "exprs/function/function.h"
42
#include "exprs/function/function_always_not_nullable.h"
43
#include "exprs/function/function_const.h"
44
#include "exprs/function/function_totype.h"
45
#include "exprs/function/simple_function_factory.h"
46
#include "util/hash_util.hpp"
47
#include "util/url_coding.h"
48
49
namespace doris {
50
51
struct HLLCardinality {
52
    static constexpr auto name = "hll_cardinality";
53
54
68
    static void vector(const std::vector<HyperLogLog>& data, ColumnInt64::MutablePtr& col_res) {
55
68
        auto& res = col_res->get_data();
56
68
        auto size = res.size();
57
221
        for (int i = 0; i < size; ++i) {
58
153
            res[i] = data[i].estimate_cardinality();
59
153
        }
60
68
    }
61
62
    static void vector_nullable(const std::vector<HyperLogLog>& data, const NullMap& nullmap,
63
30
                                ColumnInt64::MutablePtr& col_res) {
64
30
        auto& res = col_res->get_data();
65
30
        auto size = res.size();
66
103
        for (int i = 0; i < size; ++i) {
67
73
            if (nullmap[i]) {
68
2
                res[i] = 0;
69
71
            } else {
70
71
                res[i] = data[i].estimate_cardinality();
71
71
            }
72
73
        }
73
30
    }
74
};
75
76
template <typename Function>
77
class FunctionHLL : public IFunction {
78
public:
79
    static constexpr auto name = Function::name;
80
81
62
    static FunctionPtr create() { return std::make_shared<FunctionHLL>(); }
82
83
1
    String get_name() const override { return Function::name; }
84
85
53
    size_t get_number_of_arguments() const override { return 1; }
86
87
53
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
88
53
        return std::make_shared<DataTypeInt64>();
89
53
    }
90
91
150
    bool use_default_implementation_for_nulls() const override { return false; }
92
93
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
94
96
                        uint32_t result, size_t input_rows_count) const override {
95
96
        auto column = block.get_by_position(arguments[0]).column;
96
97
96
        auto column_result = ColumnInt64::create(input_rows_count);
98
96
        if (const ColumnNullable* col_nullable =
99
96
                    check_and_get_column<ColumnNullable>(column.get())) {
100
30
            const ColumnHLL* col =
101
30
                    check_and_get_column<ColumnHLL>(col_nullable->get_nested_column_ptr().get());
102
30
            const ColumnUInt8* col_nullmap = check_and_get_column<ColumnUInt8>(
103
30
                    col_nullable->get_null_map_column_ptr().get());
104
105
30
            if (col != nullptr && col_nullmap != nullptr) {
106
30
                Function::vector_nullable(col->get_data(), col_nullmap->get_data(), column_result);
107
30
                block.replace_by_position(result, std::move(column_result));
108
30
                return Status::OK();
109
30
            }
110
68
        } else if (const ColumnHLL* col = check_and_get_column<ColumnHLL>(column.get())) {
111
68
            Function::vector(col->get_data(), column_result);
112
68
            block.replace_by_position(result, std::move(column_result));
113
68
            return Status::OK();
114
18.4E
        } else {
115
18.4E
            return Status::RuntimeError("Illegal column {} of argument of function {}",
116
18.4E
                                        block.get_by_position(arguments[0]).column->get_name(),
117
18.4E
                                        get_name());
118
18.4E
        }
119
120
0
        block.replace_by_position(result, std::move(column_result));
121
0
        return Status::OK();
122
96
    }
123
};
124
125
struct HLLEmptyImpl {
126
    static constexpr auto name = "hll_empty";
127
    using ReturnColVec = ColumnHLL;
128
19
    static auto get_return_type() { return std::make_shared<DataTypeHLL>(); }
129
19
    static HyperLogLog init_value() { return HyperLogLog {}; }
130
};
131
132
class FunctionHllFromBase64 : public IFunction {
133
public:
134
    static constexpr auto name = "hll_from_base64";
135
136
1
    String get_name() const override { return name; }
137
138
26
    static FunctionPtr create() { return std::make_shared<FunctionHllFromBase64>(); }
139
140
17
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
141
17
        return make_nullable(std::make_shared<DataTypeHLL>());
142
17
    }
143
144
17
    size_t get_number_of_arguments() const override { return 1; }
145
146
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
147
41
                        uint32_t result, size_t input_rows_count) const override {
148
41
        auto res_null_map = ColumnUInt8::create(input_rows_count, 0);
149
41
        auto res_data_column = ColumnHLL::create();
150
41
        auto& null_map = res_null_map->get_data();
151
41
        auto& res = res_data_column->get_data();
152
153
41
        auto& argument_column = block.get_by_position(arguments[0]).column;
154
41
        const auto& str_column = static_cast<const ColumnString&>(*argument_column);
155
41
        const ColumnString::Chars& data = str_column.get_chars();
156
41
        const ColumnString::Offsets& offsets = str_column.get_offsets();
157
158
41
        res.reserve(input_rows_count);
159
160
41
        std::string decode_buff;
161
41
        int64_t last_decode_buff_len = 0;
162
41
        int64_t curr_decode_buff_len = 0;
163
150
        for (size_t i = 0; i < input_rows_count; ++i) {
164
110
            const char* src_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
165
110
            int64_t src_size = offsets[i] - offsets[i - 1];
166
167
            // Base64 encoding has a characteristic where every 4 characters represent 3 bytes of data.
168
            // Here, we check if the length of the input string is a multiple of 4 to ensure it's a valid base64 encoded string.
169
110
            if (0 != src_size % 4) {
170
0
                res.emplace_back();
171
0
                null_map[i] = 1;
172
0
                continue;
173
0
            }
174
175
            // Allocate sufficient space for the decoded data.
176
            // The number 3 here represents the number of bytes in the decoded data for each group of 4 base64 characters.
177
            // We set the size of the decoding buffer to be 'src_size + 3' to ensure there is enough space to store the decoded data.
178
110
            curr_decode_buff_len = src_size + 3;
179
110
            if (curr_decode_buff_len > last_decode_buff_len) {
180
42
                decode_buff.resize(curr_decode_buff_len);
181
42
                last_decode_buff_len = curr_decode_buff_len;
182
42
            }
183
110
            auto outlen = base64_decode(src_str, src_size, decode_buff.data());
184
110
            if (outlen < 0) {
185
0
                res.emplace_back();
186
0
                null_map[i] = 1;
187
110
            } else {
188
110
                doris::Slice decoded_slice(decode_buff.data(), outlen);
189
110
                doris::HyperLogLog hll;
190
110
                if (!hll.deserialize(decoded_slice)) {
191
1
                    return Status::RuntimeError(
192
1
                            fmt::format("hll_from_base64 decode failed: base64: {}",
193
1
                                        StringRef(src_str, src_size).to_string()));
194
109
                } else {
195
109
                    res.emplace_back(std::move(hll));
196
109
                }
197
110
            }
198
110
        }
199
200
40
        block.get_by_position(result).column =
201
40
                ColumnNullable::create(std::move(res_data_column), std::move(res_null_map));
202
40
        return Status::OK();
203
41
    }
204
};
205
206
struct HLLHash {
207
    static constexpr auto name = "hll_hash";
208
209
    using ReturnType = DataTypeHLL;
210
    template <typename ColumnType>
211
391
    static void vector(const ColumnType* col, MutableColumnPtr& col_res) {
212
391
        if constexpr (std::is_same_v<ColumnType, ColumnString>) {
213
391
            const ColumnString::Chars& data = col->get_chars();
214
391
            const ColumnString::Offsets& offsets = col->get_offsets();
215
391
            auto* res_column = reinterpret_cast<ColumnHLL*>(col_res.get());
216
391
            auto& res_data = res_column->get_data();
217
391
            size_t size = offsets.size();
218
219
5.18k
            for (size_t i = 0; i < size; ++i) {
220
4.79k
                const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
221
4.79k
                size_t str_size = offsets[i] - offsets[i - 1];
222
4.79k
                uint64_t hash_value =
223
4.79k
                        HashUtil::murmur_hash64A(raw_str, str_size, HashUtil::MURMUR_SEED);
224
4.79k
                res_data[i].update(hash_value);
225
4.79k
            }
226
391
        }
227
391
    }
_ZN5doris7HLLHash6vectorINS_9ColumnStrIjEEEEvPKT_RNS_3COWINS_7IColumnEE11mutable_ptrIS8_EE
Line
Count
Source
211
391
    static void vector(const ColumnType* col, MutableColumnPtr& col_res) {
212
391
        if constexpr (std::is_same_v<ColumnType, ColumnString>) {
213
391
            const ColumnString::Chars& data = col->get_chars();
214
391
            const ColumnString::Offsets& offsets = col->get_offsets();
215
391
            auto* res_column = reinterpret_cast<ColumnHLL*>(col_res.get());
216
391
            auto& res_data = res_column->get_data();
217
391
            size_t size = offsets.size();
218
219
5.18k
            for (size_t i = 0; i < size; ++i) {
220
4.79k
                const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
221
4.79k
                size_t str_size = offsets[i] - offsets[i - 1];
222
4.79k
                uint64_t hash_value =
223
4.79k
                        HashUtil::murmur_hash64A(raw_str, str_size, HashUtil::MURMUR_SEED);
224
4.79k
                res_data[i].update(hash_value);
225
4.79k
            }
226
391
        }
227
391
    }
Unexecuted instantiation: _ZN5doris7HLLHash6vectorINS_12ColumnVectorILNS_13PrimitiveTypeE6EEEEEvPKT_RNS_3COWINS_7IColumnEE11mutable_ptrIS9_EE
228
229
    template <typename ColumnType>
230
    static void vector_nullable(const ColumnType* col, const NullMap& nullmap,
231
135
                                MutableColumnPtr& col_res) {
232
135
        if constexpr (std::is_same_v<ColumnType, ColumnString>) {
233
135
            const ColumnString::Chars& data = col->get_chars();
234
135
            const ColumnString::Offsets& offsets = col->get_offsets();
235
135
            auto* res_column = reinterpret_cast<ColumnHLL*>(col_res.get());
236
135
            auto& res_data = res_column->get_data();
237
135
            size_t size = offsets.size();
238
239
4.69k
            for (size_t i = 0; i < size; ++i) {
240
4.55k
                if (nullmap[i]) {
241
83
                    continue;
242
4.47k
                } else {
243
4.47k
                    const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
244
4.47k
                    size_t str_size = offsets[i] - offsets[i - 1];
245
4.47k
                    uint64_t hash_value =
246
4.47k
                            HashUtil::murmur_hash64A(raw_str, str_size, HashUtil::MURMUR_SEED);
247
4.47k
                    res_data[i].update(hash_value);
248
4.47k
                }
249
4.55k
            }
250
135
        }
251
135
    }
_ZN5doris7HLLHash15vector_nullableINS_9ColumnStrIjEEEEvPKT_RKNS_8PODArrayIhLm4096ENS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEELm16ELm15EEERNS_3COWINS_7IColumnEE11mutable_ptrISF_EE
Line
Count
Source
231
135
                                MutableColumnPtr& col_res) {
232
135
        if constexpr (std::is_same_v<ColumnType, ColumnString>) {
233
135
            const ColumnString::Chars& data = col->get_chars();
234
135
            const ColumnString::Offsets& offsets = col->get_offsets();
235
135
            auto* res_column = reinterpret_cast<ColumnHLL*>(col_res.get());
236
135
            auto& res_data = res_column->get_data();
237
135
            size_t size = offsets.size();
238
239
4.69k
            for (size_t i = 0; i < size; ++i) {
240
4.55k
                if (nullmap[i]) {
241
83
                    continue;
242
4.47k
                } else {
243
4.47k
                    const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
244
4.47k
                    size_t str_size = offsets[i] - offsets[i - 1];
245
4.47k
                    uint64_t hash_value =
246
4.47k
                            HashUtil::murmur_hash64A(raw_str, str_size, HashUtil::MURMUR_SEED);
247
4.47k
                    res_data[i].update(hash_value);
248
4.47k
                }
249
4.55k
            }
250
135
        }
251
135
    }
Unexecuted instantiation: _ZN5doris7HLLHash15vector_nullableINS_12ColumnVectorILNS_13PrimitiveTypeE6EEEEEvPKT_RKNS_8PODArrayIhLm4096ENS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEELm16ELm15EEERNS_3COWINS_7IColumnEE11mutable_ptrISG_EE
252
};
253
254
struct NameHllToBase64 {
255
    static constexpr auto name = "hll_to_base64";
256
};
257
258
struct HllToBase64 {
259
    using ReturnType = DataTypeString;
260
    static constexpr auto PrimitiveTypeImpl = PrimitiveType::TYPE_HLL;
261
    using Type = DataTypeHLL::FieldType;
262
    using ReturnColumnType = ColumnString;
263
    using Chars = ColumnString::Chars;
264
    using Offsets = ColumnString::Offsets;
265
266
59
    static Status vector(const std::vector<HyperLogLog>& data, Chars& chars, Offsets& offsets) {
267
59
        size_t size = data.size();
268
59
        offsets.resize(size);
269
59
        size_t output_char_size = 0;
270
203
        for (size_t i = 0; i < size; ++i) {
271
144
            auto& hll_val = data[i];
272
144
            auto ser_size = hll_val.max_serialized_size();
273
144
            output_char_size += (int)(4.0 * ceil((double)ser_size / 3.0));
274
144
        }
275
59
        ColumnString::check_chars_length(output_char_size, size);
276
59
        chars.resize(output_char_size);
277
59
        auto* chars_data = chars.data();
278
279
59
        size_t cur_ser_size = 0;
280
59
        size_t last_ser_size = 0;
281
59
        std::string ser_buff;
282
59
        size_t encoded_offset = 0;
283
203
        for (size_t i = 0; i < size; ++i) {
284
144
            auto& hll_val = data[i];
285
286
144
            cur_ser_size = hll_val.max_serialized_size();
287
144
            if (cur_ser_size > last_ser_size) {
288
60
                last_ser_size = cur_ser_size;
289
60
                ser_buff.resize(cur_ser_size);
290
60
            }
291
144
            size_t real_size = hll_val.serialize(reinterpret_cast<uint8_t*>(ser_buff.data()));
292
144
            auto outlen = base64_encode((const unsigned char*)ser_buff.data(), real_size,
293
144
                                        chars_data + encoded_offset);
294
144
            DCHECK(outlen > 0);
295
296
144
            encoded_offset += outlen;
297
144
            offsets[i] = cast_set<uint32_t>(encoded_offset);
298
144
        }
299
        // chars was sized using max_serialized_size(); the actual encoded length
300
        // can be smaller (e.g. sparse HLL), so shrink chars to keep
301
        // offsets.back() == chars.size() and satisfy ColumnString::sanity_check.
302
59
        chars.resize(encoded_offset);
303
59
        return Status::OK();
304
59
    }
305
};
306
307
using FunctionHLLCardinality = FunctionHLL<HLLCardinality>;
308
using FunctionHLLEmpty = FunctionConst<HLLEmptyImpl, false>;
309
using FunctionHLLHash = FunctionAlwaysNotNullable<HLLHash>;
310
using FunctionHllToBase64 = FunctionUnaryToType<HllToBase64, NameHllToBase64>;
311
312
8
void register_function_hll(SimpleFunctionFactory& factory) {
313
8
    factory.register_function<FunctionHLLCardinality>();
314
8
    factory.register_function<FunctionHLLEmpty>();
315
8
    factory.register_function<FunctionHllFromBase64>();
316
8
    factory.register_function<FunctionHLLHash>();
317
8
    factory.register_function<FunctionHllToBase64>();
318
8
}
319
320
} // namespace doris