Coverage Report

Created: 2026-05-26 17:00

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/function/function_hll.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <algorithm>
19
#include <cstddef>
20
#include <cstdint>
21
#include <memory>
22
#include <utility>
23
#include <vector>
24
25
#include "common/cast_set.h"
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "core/block/column_numbers.h"
29
#include "core/block/column_with_type_and_name.h"
30
#include "core/column/column.h"
31
#include "core/column/column_complex.h"
32
#include "core/column/column_nullable.h"
33
#include "core/column/column_string.h"
34
#include "core/column/column_vector.h"
35
#include "core/data_type/data_type.h"
36
#include "core/data_type/data_type_hll.h"
37
#include "core/data_type/data_type_number.h"
38
#include "core/data_type/data_type_string.h"
39
#include "core/types.h"
40
#include "core/value/hll.h"
41
#include "exprs/function/function.h"
42
#include "exprs/function/function_always_not_nullable.h"
43
#include "exprs/function/function_const.h"
44
#include "exprs/function/function_totype.h"
45
#include "exprs/function/simple_function_factory.h"
46
#include "util/hash_util.hpp"
47
#include "util/url_coding.h"
48
49
namespace doris {
50
51
struct HLLCardinality {
52
    static constexpr auto name = "hll_cardinality";
53
54
69
    static void vector(const std::vector<HyperLogLog>& data, ColumnInt64::MutablePtr& col_res) {
55
69
        auto& res = col_res->get_data();
56
69
        auto size = res.size();
57
198
        for (int i = 0; i < size; ++i) {
58
129
            res[i] = data[i].estimate_cardinality();
59
129
        }
60
69
    }
61
62
    static void vector_nullable(const std::vector<HyperLogLog>& data, const NullMap& nullmap,
63
30
                                ColumnInt64::MutablePtr& col_res) {
64
30
        auto& res = col_res->get_data();
65
30
        auto size = res.size();
66
103
        for (int i = 0; i < size; ++i) {
67
73
            if (nullmap[i]) {
68
2
                res[i] = 0;
69
71
            } else {
70
71
                res[i] = data[i].estimate_cardinality();
71
71
            }
72
73
        }
73
30
    }
74
};
75
76
template <typename Function>
77
class FunctionHLL : public IFunction {
78
public:
79
    static constexpr auto name = Function::name;
80
81
54
    static FunctionPtr create() { return std::make_shared<FunctionHLL>(); }
82
83
1
    String get_name() const override { return Function::name; }
84
85
45
    size_t get_number_of_arguments() const override { return 1; }
86
87
45
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
88
45
        return std::make_shared<DataTypeInt64>();
89
45
    }
90
91
144
    bool use_default_implementation_for_nulls() const override { return false; }
92
93
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
94
99
                        uint32_t result, size_t input_rows_count) const override {
95
99
        auto column = block.get_by_position(arguments[0]).column;
96
97
99
        auto column_result = ColumnInt64::create(input_rows_count);
98
99
        if (const ColumnNullable* col_nullable =
99
99
                    check_and_get_column<ColumnNullable>(column.get())) {
100
30
            const ColumnHLL* col =
101
30
                    check_and_get_column<ColumnHLL>(col_nullable->get_nested_column_ptr().get());
102
30
            const ColumnUInt8* col_nullmap = col_nullable->get_null_map_column_ptr().get();
103
104
30
            if (col != nullptr) {
105
30
                Function::vector_nullable(col->get_data(), col_nullmap->get_data(), column_result);
106
30
                block.replace_by_position(result, std::move(column_result));
107
30
                return Status::OK();
108
30
            }
109
69
        } else if (const ColumnHLL* col = check_and_get_column<ColumnHLL>(column.get())) {
110
69
            Function::vector(col->get_data(), column_result);
111
69
            block.replace_by_position(result, std::move(column_result));
112
69
            return Status::OK();
113
69
        } else {
114
0
            return Status::RuntimeError("Illegal column {} of argument of function {}",
115
0
                                        block.get_by_position(arguments[0]).column->get_name(),
116
0
                                        get_name());
117
0
        }
118
119
0
        block.replace_by_position(result, std::move(column_result));
120
0
        return Status::OK();
121
99
    }
122
};
123
124
struct HLLEmptyImpl {
125
    static constexpr auto name = "hll_empty";
126
    using ReturnColVec = ColumnHLL;
127
15
    static auto get_return_type() { return std::make_shared<DataTypeHLL>(); }
128
15
    static HyperLogLog init_value() { return HyperLogLog {}; }
129
};
130
131
class FunctionHllFromBase64 : public IFunction {
132
public:
133
    static constexpr auto name = "hll_from_base64";
134
135
1
    String get_name() const override { return name; }
136
137
26
    static FunctionPtr create() { return std::make_shared<FunctionHllFromBase64>(); }
138
139
17
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
140
17
        return make_nullable(std::make_shared<DataTypeHLL>());
141
17
    }
142
143
17
    size_t get_number_of_arguments() const override { return 1; }
144
145
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
146
41
                        uint32_t result, size_t input_rows_count) const override {
147
41
        auto res_null_map = ColumnUInt8::create(input_rows_count, 0);
148
41
        auto res_data_column = ColumnHLL::create();
149
41
        auto& null_map = res_null_map->get_data();
150
41
        auto& res = res_data_column->get_data();
151
152
41
        auto& argument_column = block.get_by_position(arguments[0]).column;
153
41
        const auto& str_column = static_cast<const ColumnString&>(*argument_column);
154
41
        const ColumnString::Chars& data = str_column.get_chars();
155
41
        const ColumnString::Offsets& offsets = str_column.get_offsets();
156
157
41
        res.reserve(input_rows_count);
158
159
41
        std::string decode_buff;
160
41
        int64_t last_decode_buff_len = 0;
161
41
        int64_t curr_decode_buff_len = 0;
162
150
        for (size_t i = 0; i < input_rows_count; ++i) {
163
110
            const char* src_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
164
110
            int64_t src_size = offsets[i] - offsets[i - 1];
165
166
            // Base64 encoding has a characteristic where every 4 characters represent 3 bytes of data.
167
            // Here, we check if the length of the input string is a multiple of 4 to ensure it's a valid base64 encoded string.
168
110
            if (0 != src_size % 4) {
169
0
                res.emplace_back();
170
0
                null_map[i] = 1;
171
0
                continue;
172
0
            }
173
174
            // Allocate sufficient space for the decoded data.
175
            // The number 3 here represents the number of bytes in the decoded data for each group of 4 base64 characters.
176
            // We set the size of the decoding buffer to be 'src_size + 3' to ensure there is enough space to store the decoded data.
177
110
            curr_decode_buff_len = src_size + 3;
178
110
            if (curr_decode_buff_len > last_decode_buff_len) {
179
42
                decode_buff.resize(curr_decode_buff_len);
180
42
                last_decode_buff_len = curr_decode_buff_len;
181
42
            }
182
110
            auto outlen = base64_decode(src_str, src_size, decode_buff.data());
183
110
            if (outlen < 0) {
184
0
                res.emplace_back();
185
0
                null_map[i] = 1;
186
110
            } else {
187
110
                doris::Slice decoded_slice(decode_buff.data(), outlen);
188
110
                doris::HyperLogLog hll;
189
110
                if (!hll.deserialize(decoded_slice)) {
190
1
                    return Status::RuntimeError(
191
1
                            fmt::format("hll_from_base64 decode failed: base64: {}",
192
1
                                        StringRef(src_str, src_size).to_string()));
193
109
                } else {
194
109
                    res.emplace_back(std::move(hll));
195
109
                }
196
110
            }
197
110
        }
198
199
40
        block.get_by_position(result).column =
200
40
                ColumnNullable::create(std::move(res_data_column), std::move(res_null_map));
201
40
        return Status::OK();
202
41
    }
203
};
204
205
struct HLLHash {
206
    static constexpr auto name = "hll_hash";
207
208
    using ReturnType = DataTypeHLL;
209
    template <typename ColumnType>
210
334
    static void vector(const ColumnType* col, MutableColumnPtr& col_res) {
211
334
        if constexpr (std::is_same_v<ColumnType, ColumnString>) {
212
334
            const ColumnString::Chars& data = col->get_chars();
213
334
            const ColumnString::Offsets& offsets = col->get_offsets();
214
334
            auto* res_column = reinterpret_cast<ColumnHLL*>(col_res.get());
215
334
            auto& res_data = res_column->get_data();
216
334
            size_t size = offsets.size();
217
218
5.07k
            for (size_t i = 0; i < size; ++i) {
219
4.73k
                const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
220
4.73k
                size_t str_size = offsets[i] - offsets[i - 1];
221
4.73k
                uint64_t hash_value =
222
4.73k
                        HashUtil::murmur_hash64A(raw_str, str_size, HashUtil::MURMUR_SEED);
223
4.73k
                res_data[i].update(hash_value);
224
4.73k
            }
225
334
        }
226
334
    }
_ZN5doris7HLLHash6vectorINS_9ColumnStrIjEEEEvPKT_RNS_3COWINS_7IColumnEE11mutable_ptrIS8_EE
Line
Count
Source
210
334
    static void vector(const ColumnType* col, MutableColumnPtr& col_res) {
211
334
        if constexpr (std::is_same_v<ColumnType, ColumnString>) {
212
334
            const ColumnString::Chars& data = col->get_chars();
213
334
            const ColumnString::Offsets& offsets = col->get_offsets();
214
334
            auto* res_column = reinterpret_cast<ColumnHLL*>(col_res.get());
215
334
            auto& res_data = res_column->get_data();
216
334
            size_t size = offsets.size();
217
218
5.07k
            for (size_t i = 0; i < size; ++i) {
219
4.73k
                const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
220
4.73k
                size_t str_size = offsets[i] - offsets[i - 1];
221
4.73k
                uint64_t hash_value =
222
4.73k
                        HashUtil::murmur_hash64A(raw_str, str_size, HashUtil::MURMUR_SEED);
223
4.73k
                res_data[i].update(hash_value);
224
4.73k
            }
225
334
        }
226
334
    }
Unexecuted instantiation: _ZN5doris7HLLHash6vectorINS_12ColumnVectorILNS_13PrimitiveTypeE6EEEEEvPKT_RNS_3COWINS_7IColumnEE11mutable_ptrIS9_EE
227
228
    template <typename ColumnType>
229
    static void vector_nullable(const ColumnType* col, const NullMap& nullmap,
230
147
                                MutableColumnPtr& col_res) {
231
147
        if constexpr (std::is_same_v<ColumnType, ColumnString>) {
232
147
            const ColumnString::Chars& data = col->get_chars();
233
147
            const ColumnString::Offsets& offsets = col->get_offsets();
234
147
            auto* res_column = reinterpret_cast<ColumnHLL*>(col_res.get());
235
147
            auto& res_data = res_column->get_data();
236
147
            size_t size = offsets.size();
237
238
4.70k
            for (size_t i = 0; i < size; ++i) {
239
4.55k
                if (nullmap[i]) {
240
83
                    continue;
241
4.47k
                } else {
242
4.47k
                    const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
243
4.47k
                    size_t str_size = offsets[i] - offsets[i - 1];
244
4.47k
                    uint64_t hash_value =
245
4.47k
                            HashUtil::murmur_hash64A(raw_str, str_size, HashUtil::MURMUR_SEED);
246
4.47k
                    res_data[i].update(hash_value);
247
4.47k
                }
248
4.55k
            }
249
147
        }
250
147
    }
_ZN5doris7HLLHash15vector_nullableINS_9ColumnStrIjEEEEvPKT_RKNS_8PODArrayIhLm4096ENS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEELm16ELm15EEERNS_3COWINS_7IColumnEE11mutable_ptrISF_EE
Line
Count
Source
230
147
                                MutableColumnPtr& col_res) {
231
147
        if constexpr (std::is_same_v<ColumnType, ColumnString>) {
232
147
            const ColumnString::Chars& data = col->get_chars();
233
147
            const ColumnString::Offsets& offsets = col->get_offsets();
234
147
            auto* res_column = reinterpret_cast<ColumnHLL*>(col_res.get());
235
147
            auto& res_data = res_column->get_data();
236
147
            size_t size = offsets.size();
237
238
4.70k
            for (size_t i = 0; i < size; ++i) {
239
4.55k
                if (nullmap[i]) {
240
83
                    continue;
241
4.47k
                } else {
242
4.47k
                    const char* raw_str = reinterpret_cast<const char*>(&data[offsets[i - 1]]);
243
4.47k
                    size_t str_size = offsets[i] - offsets[i - 1];
244
4.47k
                    uint64_t hash_value =
245
4.47k
                            HashUtil::murmur_hash64A(raw_str, str_size, HashUtil::MURMUR_SEED);
246
4.47k
                    res_data[i].update(hash_value);
247
4.47k
                }
248
4.55k
            }
249
147
        }
250
147
    }
Unexecuted instantiation: _ZN5doris7HLLHash15vector_nullableINS_12ColumnVectorILNS_13PrimitiveTypeE6EEEEEvPKT_RKNS_8PODArrayIhLm4096ENS_9AllocatorILb0ELb0ELb0ENS_22DefaultMemoryAllocatorELb1EEELm16ELm15EEERNS_3COWINS_7IColumnEE11mutable_ptrISG_EE
251
};
252
253
struct NameHllToBase64 {
254
    static constexpr auto name = "hll_to_base64";
255
};
256
257
struct HllToBase64 {
258
    using ReturnType = DataTypeString;
259
    static constexpr auto PrimitiveTypeImpl = PrimitiveType::TYPE_HLL;
260
    using Type = DataTypeHLL::FieldType;
261
    using ReturnColumnType = ColumnString;
262
    using Chars = ColumnString::Chars;
263
    using Offsets = ColumnString::Offsets;
264
265
59
    static Status vector(const std::vector<HyperLogLog>& data, Chars& chars, Offsets& offsets) {
266
59
        size_t size = data.size();
267
59
        offsets.resize(size);
268
59
        size_t output_char_size = 0;
269
203
        for (size_t i = 0; i < size; ++i) {
270
144
            auto& hll_val = data[i];
271
144
            auto ser_size = hll_val.max_serialized_size();
272
144
            output_char_size += (int)(4.0 * ceil((double)ser_size / 3.0));
273
144
        }
274
59
        ColumnString::check_chars_length(output_char_size, size);
275
59
        chars.resize(output_char_size);
276
59
        auto* chars_data = chars.data();
277
278
59
        size_t cur_ser_size = 0;
279
59
        size_t last_ser_size = 0;
280
59
        std::string ser_buff;
281
59
        size_t encoded_offset = 0;
282
203
        for (size_t i = 0; i < size; ++i) {
283
144
            auto& hll_val = data[i];
284
285
144
            cur_ser_size = hll_val.max_serialized_size();
286
144
            if (cur_ser_size > last_ser_size) {
287
60
                last_ser_size = cur_ser_size;
288
60
                ser_buff.resize(cur_ser_size);
289
60
            }
290
144
            size_t real_size = hll_val.serialize(reinterpret_cast<uint8_t*>(ser_buff.data()));
291
144
            auto outlen = base64_encode((const unsigned char*)ser_buff.data(), real_size,
292
144
                                        chars_data + encoded_offset);
293
144
            DCHECK(outlen > 0);
294
295
144
            encoded_offset += outlen;
296
144
            offsets[i] = cast_set<uint32_t>(encoded_offset);
297
144
        }
298
        // chars was sized using max_serialized_size(); the actual encoded length
299
        // can be smaller (e.g. sparse HLL), so shrink chars to keep
300
        // offsets.back() == chars.size() and satisfy ColumnString::sanity_check.
301
59
        chars.resize(encoded_offset);
302
59
        return Status::OK();
303
59
    }
304
};
305
306
using FunctionHLLCardinality = FunctionHLL<HLLCardinality>;
307
using FunctionHLLEmpty = FunctionConst<HLLEmptyImpl, false>;
308
using FunctionHLLHash = FunctionAlwaysNotNullable<HLLHash>;
309
using FunctionHllToBase64 = FunctionUnaryToType<HllToBase64, NameHllToBase64>;
310
311
8
void register_function_hll(SimpleFunctionFactory& factory) {
312
8
    factory.register_function<FunctionHLLCardinality>();
313
8
    factory.register_function<FunctionHLLEmpty>();
314
8
    factory.register_function<FunctionHllFromBase64>();
315
8
    factory.register_function<FunctionHLLHash>();
316
8
    factory.register_function<FunctionHllToBase64>();
317
8
}
318
319
} // namespace doris