be/src/core/value/hll.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "core/value/hll.h" |
19 | | |
20 | | #include <cmath> |
21 | | #include <map> |
22 | | #include <ostream> |
23 | | |
24 | | #include "common/logging.h" |
25 | | #include "util/coding.h" |
26 | | #include "util/slice.h" |
27 | | |
28 | | using std::string; |
29 | | using std::stringstream; |
30 | | |
31 | | namespace doris { |
32 | 1.03k | HyperLogLog::HyperLogLog(const Slice& src) { |
33 | | // When deserialize return false, we make this object a empty |
34 | 1.03k | if (!deserialize(src)) { |
35 | 3 | _type = HLL_DATA_EMPTY; |
36 | 3 | } |
37 | 1.03k | } |
38 | | |
39 | | // Convert explicit values to register format, and clear explicit values. |
40 | | // NOTE: this function won't modify _type. |
41 | 5 | void HyperLogLog::_convert_explicit_to_register() { |
42 | 5 | DCHECK(_type == HLL_DATA_EXPLICIT) |
43 | 0 | << "_type(" << _type << ") should be explicit(" << HLL_DATA_EXPLICIT << ")"; |
44 | 5 | _registers = new uint8_t[HLL_REGISTERS_COUNT]; |
45 | 5 | memset(_registers, 0, HLL_REGISTERS_COUNT); |
46 | 780 | for (auto value : _hash_set) { |
47 | 780 | _update_registers(value); |
48 | 780 | } |
49 | | // clear _hash_set |
50 | 5 | flat_hash_set<uint64_t>().swap(_hash_set); |
51 | 5 | } |
52 | | |
53 | | // Change HLL_DATA_EXPLICIT to HLL_DATA_FULL directly, because HLL_DATA_SPARSE |
54 | | // is implemented in the same way in memory with HLL_DATA_FULL. |
55 | 81.3k | void HyperLogLog::update(uint64_t hash_value) { |
56 | 81.3k | switch (_type) { |
57 | 3.35k | case HLL_DATA_EMPTY: |
58 | 3.35k | _hash_set.insert(hash_value); |
59 | 3.35k | _type = HLL_DATA_EXPLICIT; |
60 | 3.35k | break; |
61 | 10.8k | case HLL_DATA_EXPLICIT: |
62 | 10.8k | if (_hash_set.size() < HLL_EXPLICIT_INT64_NUM) { |
63 | 10.8k | _hash_set.insert(hash_value); |
64 | 10.8k | break; |
65 | 10.8k | } |
66 | 3 | _convert_explicit_to_register(); |
67 | 3 | _type = HLL_DATA_FULL; |
68 | | // fall through |
69 | 4 | case HLL_DATA_SPARSE: |
70 | 67.1k | case HLL_DATA_FULL: |
71 | 67.1k | _update_registers(hash_value); |
72 | 67.1k | break; |
73 | 81.3k | } |
74 | 81.3k | } |
75 | | |
76 | 8 | void HyperLogLog::merge(const HyperLogLog& other) { |
77 | | // fast path |
78 | 8 | if (other._type == HLL_DATA_EMPTY) { |
79 | 0 | return; |
80 | 0 | } |
81 | 8 | switch (_type) { |
82 | 2 | case HLL_DATA_EMPTY: { |
83 | | // _type must change |
84 | 2 | _type = other._type; |
85 | 2 | switch (other._type) { |
86 | 1 | case HLL_DATA_EXPLICIT: |
87 | 1 | _hash_set = other._hash_set; |
88 | 1 | break; |
89 | 0 | case HLL_DATA_SPARSE: |
90 | 1 | case HLL_DATA_FULL: |
91 | 1 | _registers = new uint8_t[HLL_REGISTERS_COUNT]; |
92 | 1 | memcpy(_registers, other._registers, HLL_REGISTERS_COUNT); |
93 | 1 | break; |
94 | 0 | default: |
95 | 0 | break; |
96 | 2 | } |
97 | 2 | break; |
98 | 2 | } |
99 | 3 | case HLL_DATA_EXPLICIT: { |
100 | 3 | switch (other._type) { |
101 | 2 | case HLL_DATA_EXPLICIT: { |
102 | | // Merge other's explicit values first, then check if the number is exceed |
103 | | // HLL_EXPLICIT_INT64_NUM. This is OK because the max value is 2 * 160. |
104 | 2 | _hash_set.insert(other._hash_set.begin(), other._hash_set.end()); |
105 | 2 | if (_hash_set.size() > HLL_EXPLICIT_INT64_NUM) { |
106 | 1 | _convert_explicit_to_register(); |
107 | 1 | _type = HLL_DATA_FULL; |
108 | 1 | } |
109 | 2 | } break; |
110 | 0 | case HLL_DATA_SPARSE: |
111 | 1 | case HLL_DATA_FULL: |
112 | 1 | _convert_explicit_to_register(); |
113 | 1 | _merge_registers(other._registers); |
114 | 1 | _type = HLL_DATA_FULL; |
115 | 1 | break; |
116 | 0 | default: |
117 | 0 | break; |
118 | 3 | } |
119 | 3 | break; |
120 | 3 | } |
121 | 3 | case HLL_DATA_SPARSE: |
122 | 3 | case HLL_DATA_FULL: { |
123 | 3 | switch (other._type) { |
124 | 1 | case HLL_DATA_EXPLICIT: |
125 | 100 | for (auto hash_value : other._hash_set) { |
126 | 100 | _update_registers(hash_value); |
127 | 100 | } |
128 | 1 | break; |
129 | 0 | case HLL_DATA_SPARSE: |
130 | 2 | case HLL_DATA_FULL: |
131 | 2 | _merge_registers(other._registers); |
132 | 2 | break; |
133 | 0 | default: |
134 | 0 | break; |
135 | 3 | } |
136 | 3 | break; |
137 | 3 | } |
138 | 8 | } |
139 | 8 | } |
140 | | |
141 | 6.50k | size_t HyperLogLog::max_serialized_size() const { |
142 | 6.50k | switch (_type) { |
143 | 10 | case HLL_DATA_EMPTY: |
144 | 10 | default: |
145 | 10 | return 1; |
146 | 6.49k | case HLL_DATA_EXPLICIT: |
147 | 6.49k | return 2 + _hash_set.size() * 8; |
148 | 0 | case HLL_DATA_SPARSE: |
149 | 0 | case HLL_DATA_FULL: |
150 | 0 | return 1 + HLL_REGISTERS_COUNT; |
151 | 6.50k | } |
152 | 6.50k | } |
153 | | |
154 | 6.50k | size_t HyperLogLog::serialize(uint8_t* dst) const { |
155 | 6.50k | uint8_t* ptr = dst; |
156 | 6.50k | switch (_type) { |
157 | 9 | case HLL_DATA_EMPTY: |
158 | 9 | default: { |
159 | | // When the _type is unknown, which may not happen, we encode it as |
160 | | // Empty HyperLogLog object. |
161 | 9 | *ptr++ = HLL_DATA_EMPTY; |
162 | 9 | break; |
163 | 9 | } |
164 | 6.49k | case HLL_DATA_EXPLICIT: { |
165 | 6.49k | DCHECK(_hash_set.size() <= HLL_EXPLICIT_INT64_NUM) |
166 | 0 | << "Number of explicit elements(" << _hash_set.size() |
167 | 0 | << ") should be less or equal than " << HLL_EXPLICIT_INT64_NUM; |
168 | 6.49k | *ptr++ = _type; |
169 | 6.49k | *ptr++ = (uint8_t)_hash_set.size(); |
170 | 31.7k | for (auto hash_value : _hash_set) { |
171 | 31.7k | encode_fixed64_le(ptr, hash_value); |
172 | 31.7k | ptr += 8; |
173 | 31.7k | } |
174 | 6.49k | break; |
175 | 9 | } |
176 | 0 | case HLL_DATA_SPARSE: |
177 | 2 | case HLL_DATA_FULL: { |
178 | 2 | uint32_t num_non_zero_registers = 0; |
179 | 32.7k | for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) { |
180 | 32.7k | num_non_zero_registers += (_registers[i] != 0); |
181 | 32.7k | } |
182 | | |
183 | | // each register in sparse format will occupy 3bytes, 2 for index and |
184 | | // 1 for register value. So if num_non_zero_registers is greater than |
185 | | // 4K we use full encode format. |
186 | 2 | if (num_non_zero_registers > HLL_SPARSE_THRESHOLD) { |
187 | 1 | *ptr++ = HLL_DATA_FULL; |
188 | 1 | memcpy(ptr, _registers, HLL_REGISTERS_COUNT); |
189 | 1 | ptr += HLL_REGISTERS_COUNT; |
190 | 1 | } else { |
191 | 1 | *ptr++ = HLL_DATA_SPARSE; |
192 | | // 2-5(4 byte): number of registers |
193 | 1 | encode_fixed32_le(ptr, num_non_zero_registers); |
194 | 1 | ptr += 4; |
195 | | |
196 | 16.3k | for (uint16_t i = 0; i < HLL_REGISTERS_COUNT; ++i) { |
197 | 16.3k | if (_registers[i] == 0) { |
198 | 15.4k | continue; |
199 | 15.4k | } |
200 | | // 2 bytes: register index |
201 | | // 1 byte: register value |
202 | 984 | encode_fixed16_le(ptr, i); |
203 | 984 | ptr += 2; |
204 | 984 | *ptr++ = _registers[i]; |
205 | 984 | } |
206 | 1 | } |
207 | 2 | break; |
208 | 0 | } |
209 | 6.50k | } |
210 | 6.50k | return ptr - dst; |
211 | 6.50k | } |
212 | | |
213 | 3.44k | bool HyperLogLog::is_valid(const Slice& slice) { |
214 | 3.44k | if (slice.size < 1) { |
215 | 1 | return false; |
216 | 1 | } |
217 | 3.44k | const uint8_t* ptr = (uint8_t*)slice.data; |
218 | 3.44k | const uint8_t* end = (uint8_t*)slice.data + slice.size; |
219 | 3.44k | auto type = (HllDataType)*ptr++; |
220 | 3.44k | switch (type) { |
221 | 9 | case HLL_DATA_EMPTY: |
222 | 9 | break; |
223 | 3.42k | case HLL_DATA_EXPLICIT: { |
224 | 3.42k | if ((ptr + 1) > end) { |
225 | 1 | return false; |
226 | 1 | } |
227 | 3.42k | uint8_t num_explicits = *ptr++; |
228 | 3.42k | ptr += num_explicits * 8; |
229 | 3.42k | break; |
230 | 3.42k | } |
231 | 3 | case HLL_DATA_SPARSE: { |
232 | 3 | if ((ptr + 4) > end) { |
233 | 1 | return false; |
234 | 1 | } |
235 | 2 | uint32_t num_registers = decode_fixed32_le(ptr); |
236 | 2 | ptr += 4 + 3 * num_registers; |
237 | 2 | break; |
238 | 3 | } |
239 | 3 | case HLL_DATA_FULL: { |
240 | 3 | ptr += HLL_REGISTERS_COUNT; |
241 | 3 | break; |
242 | 3 | } |
243 | 3 | default: |
244 | 3 | return false; |
245 | 3.44k | } |
246 | 3.43k | return ptr == end; |
247 | 3.44k | } |
248 | | |
249 | | // TODO(zc): check input string's length |
250 | 3.43k | bool HyperLogLog::deserialize(const Slice& slice) { |
251 | | // can be called only when type is empty |
252 | 3.43k | DCHECK(_type == HLL_DATA_EMPTY); |
253 | | |
254 | | // NOTE(zc): Don't remove this check unless you known what |
255 | | // you are doing. Because of history bug, we ingest some |
256 | | // invalid HLL data in storage, which ptr is nullptr. |
257 | | // we must handle this case to avoid process crash. |
258 | | // This bug is in release 0.10, I think we can remove this |
259 | | // in release 0.12 or later. |
260 | 3.43k | if (slice.data == nullptr || slice.size <= 0) { |
261 | 1 | return false; |
262 | 1 | } |
263 | | // check if input length is valid |
264 | 3.43k | if (!is_valid(slice)) { |
265 | 2 | return false; |
266 | 2 | } |
267 | | |
268 | 3.43k | const uint8_t* ptr = (uint8_t*)slice.data; |
269 | | // first byte : type |
270 | 3.43k | _type = (HllDataType)*ptr++; |
271 | 3.43k | switch (_type) { |
272 | 7 | case HLL_DATA_EMPTY: |
273 | 7 | break; |
274 | 3.42k | case HLL_DATA_EXPLICIT: { |
275 | | // 2: number of explicit values |
276 | | // make sure that num_explicit is positive |
277 | 3.42k | uint8_t num_explicits = *ptr++; |
278 | | // 3+: 8 bytes hash value |
279 | 22.2k | for (int i = 0; i < num_explicits; ++i) { |
280 | 18.8k | _hash_set.insert(decode_fixed64_le(ptr)); |
281 | 18.8k | ptr += 8; |
282 | 18.8k | } |
283 | 3.42k | break; |
284 | 0 | } |
285 | 1 | case HLL_DATA_SPARSE: { |
286 | 1 | _registers = new uint8_t[HLL_REGISTERS_COUNT]; |
287 | 1 | memset(_registers, 0, HLL_REGISTERS_COUNT); |
288 | | // 2-5(4 byte): number of registers |
289 | 1 | uint32_t num_registers = decode_fixed32_le(ptr); |
290 | 1 | ptr += 4; |
291 | 985 | for (uint32_t i = 0; i < num_registers; ++i) { |
292 | | // 2 bytes: register index |
293 | | // 1 byte: register value |
294 | 984 | uint16_t register_idx = decode_fixed16_le(ptr); |
295 | 984 | ptr += 2; |
296 | 984 | _registers[register_idx] = *ptr++; |
297 | 984 | } |
298 | 1 | break; |
299 | 0 | } |
300 | 1 | case HLL_DATA_FULL: { |
301 | 1 | _registers = new uint8_t[HLL_REGISTERS_COUNT]; |
302 | | // 2+ : hll register value |
303 | 1 | memcpy(_registers, ptr, HLL_REGISTERS_COUNT); |
304 | 1 | break; |
305 | 0 | } |
306 | 0 | default: |
307 | | // revert type to EMPTY |
308 | 0 | _type = HLL_DATA_EMPTY; |
309 | 0 | return false; |
310 | 3.43k | } |
311 | 3.43k | return true; |
312 | 3.43k | } |
313 | | |
314 | 506 | int64_t HyperLogLog::estimate_cardinality() const { |
315 | 506 | if (_type == HLL_DATA_EMPTY) { |
316 | 4 | return 0; |
317 | 4 | } |
318 | 502 | if (_type == HLL_DATA_EXPLICIT) { |
319 | 489 | return _hash_set.size(); |
320 | 489 | } |
321 | | |
322 | 13 | const int num_streams = HLL_REGISTERS_COUNT; |
323 | | // Empirical constants for the algorithm. |
324 | 13 | float alpha = 0; |
325 | | |
326 | 13 | if (num_streams == 16) { |
327 | 0 | alpha = 0.673F; |
328 | 13 | } else if (num_streams == 32) { |
329 | 0 | alpha = 0.697F; |
330 | 13 | } else if (num_streams == 64) { |
331 | 0 | alpha = 0.709F; |
332 | 13 | } else { |
333 | 13 | alpha = 0.7213F / (1 + 1.079F / num_streams); |
334 | 13 | } |
335 | | |
336 | 13 | float harmonic_mean = 0; |
337 | 13 | int num_zero_registers = 0; |
338 | | |
339 | 213k | for (int i = 0; i < HLL_REGISTERS_COUNT; ++i) { |
340 | 212k | harmonic_mean += powf(2.0F, -_registers[i]); |
341 | | |
342 | 212k | if (_registers[i] == 0) { |
343 | 110k | ++num_zero_registers; |
344 | 110k | } |
345 | 212k | } |
346 | | |
347 | 13 | harmonic_mean = 1.0F / harmonic_mean; |
348 | 13 | double estimate = alpha * num_streams * num_streams * harmonic_mean; |
349 | | // according to HyperLogLog current correction, if E is cardinal |
350 | | // E =< num_streams * 2.5 , LC has higher accuracy. |
351 | | // num_streams * 2.5 < E , HyperLogLog has higher accuracy. |
352 | | // Generally , we can use HyperLogLog to produce value as E. |
353 | 13 | if (estimate <= num_streams * 2.5 && num_zero_registers != 0) { |
354 | | // Estimated cardinality is too low. Hll is too inaccurate here, instead use |
355 | | // linear counting. |
356 | 7 | estimate = num_streams * |
357 | 7 | log(static_cast<double>(num_streams) / static_cast<double>(num_zero_registers)); |
358 | 7 | } else if (num_streams == 16384 && estimate < 72000) { |
359 | | // when Linear Couint change to HyperLogLog according to HyperLogLog Correction, |
360 | | // there are relatively large fluctuations, we fixed the problem refer to redis. |
361 | 6 | double bias = 5.9119 * 1.0e-18 * (estimate * estimate * estimate * estimate) - |
362 | 6 | 1.4253 * 1.0e-12 * (estimate * estimate * estimate) + |
363 | 6 | 1.2940 * 1.0e-7 * (estimate * estimate) - 5.2921 * 1.0e-3 * estimate + |
364 | 6 | 83.3216; |
365 | 6 | estimate -= estimate * (bias / 100); |
366 | 6 | } |
367 | 13 | return (int64_t)(estimate + 0.5); |
368 | 502 | } |
369 | | } // namespace doris |