Hll.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.io;

import org.apache.commons.codec.binary.StringUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigInteger;
import java.util.HashSet;
import java.util.Set;

/**
 *  mainly used for Spark Load process to produce hll
 *  Try to keep consistent with be's C++ hll
 *  Whether method estimateCardinality can keep consistent with is a question to be studied
 *
 * use example:
 *  Hll hll = new hll();
 *  Hll.updateWithHash(value);
 *
 */

public class Hll {

    public static final byte HLL_DATA_EMPTY = 0;
    public static final byte HLL_DATA_EXPLICIT = 1;
    public static final byte HLL_DATA_SPARSE = 2;
    public static final byte HLL_DATA_FULL = 3;

    public static final int HLL_COLUMN_PRECISION = 14;
    public static final int HLL_ZERO_COUNT_BITS = (64 - HLL_COLUMN_PRECISION);
    public static final int HLL_EXPLICIT_INT64_NUM = 160;
    public static final int HLL_SPARSE_THRESHOLD = 4096;
    public static final int HLL_REGISTERS_COUNT = 16 * 1024;

    private int type;
    private Set<Long> hashSet;
    private byte[] registers;

    public Hll() {
        type = HLL_DATA_EMPTY;
        this.hashSet = new HashSet<>();
    }

    private void convertExplicitToRegister() {
        assert this.type == HLL_DATA_EXPLICIT;
        registers = new byte[HLL_REGISTERS_COUNT];
        for (Long value : hashSet) {
            updateRegisters(value);
        }
        hashSet.clear();
    }

    private void updateRegisters(long hashValue) {
        int idx;
        // hash value less than zero means we get a unsigned long
        // so need to transfer to BigInter to mod
        if (hashValue < 0) {
            BigInteger unint64HashValue = new BigInteger(Long.toUnsignedString(hashValue));
            unint64HashValue = unint64HashValue.mod(new BigInteger(Long.toUnsignedString(HLL_REGISTERS_COUNT)));
            idx = unint64HashValue.intValue();
        } else {
            idx = (int) (hashValue % HLL_REGISTERS_COUNT);
        }

        hashValue >>>= HLL_COLUMN_PRECISION;
        hashValue |= (1L << HLL_ZERO_COUNT_BITS);
        byte firstOneBit = (byte) (getLongTailZeroNum(hashValue) + 1);
        registers[idx] = registers[idx] > firstOneBit ? registers[idx] : firstOneBit;
    }

    private void mergeRegisters(byte[] other) {
        for (int i = 0; i < HLL_REGISTERS_COUNT; i++) {
            this.registers[i] = this.registers[i] > other[i] ? this.registers[i] : other[i];
        }
    }

    public static byte getLongTailZeroNum(long hashValue) {
        if (hashValue == 0) {
            return 0;
        }
        long value = 1L;
        byte idx = 0;
        for (;; idx++) {
            if ((value & hashValue) != 0) {
                return idx;
            }
            value = value << 1;
            if (idx == 62) {
                break;
            }
        }
        return idx;
    }

    public void updateWithHash(Object value) {
        byte[] v = StringUtils.getBytesUtf8(String.valueOf(value));
        update(hash64(v, v.length, SEED));
    }

    public void update(long hashValue) {
        switch (this.type) { // CHECKSTYLE IGNORE THIS LINE: missing switch default
            case HLL_DATA_EMPTY:
                hashSet.add(hashValue);
                type = HLL_DATA_EXPLICIT;
                break;
            case HLL_DATA_EXPLICIT:
                if (hashSet.size() < HLL_EXPLICIT_INT64_NUM) {
                    hashSet.add(hashValue);
                    break;
                }
                convertExplicitToRegister();
                type = HLL_DATA_FULL;
            case HLL_DATA_SPARSE: // CHECKSTYLE IGNORE THIS LINE: fall through
            case HLL_DATA_FULL:
                updateRegisters(hashValue);
                break;
        }
    }

    public void merge(Hll other) {
        if (other.type == HLL_DATA_EMPTY) {
            return;
        }
        switch (this.type) { // CHECKSTYLE IGNORE THIS LINE: missing switch default
            case HLL_DATA_EMPTY:
                this.type = other.type;
                switch (other.type) { // CHECKSTYLE IGNORE THIS LINE: missing switch default
                    case HLL_DATA_EXPLICIT:
                        this.hashSet.addAll(other.hashSet);
                        break;
                    case HLL_DATA_SPARSE:
                    case HLL_DATA_FULL:
                        this.registers = new byte[HLL_REGISTERS_COUNT];
                        System.arraycopy(other.registers, 0, this.registers, 0, HLL_REGISTERS_COUNT);
                        break;
                }
                break;
            case HLL_DATA_EXPLICIT:
                switch (other.type) { // CHECKSTYLE IGNORE THIS LINE: missing switch default
                    case HLL_DATA_EXPLICIT:
                        this.hashSet.addAll(other.hashSet);
                        if (this.hashSet.size() > HLL_EXPLICIT_INT64_NUM) {
                            convertExplicitToRegister();
                            this.type = HLL_DATA_FULL;
                        }
                        break;
                    case HLL_DATA_SPARSE:
                    case HLL_DATA_FULL:
                        convertExplicitToRegister();
                        mergeRegisters(other.registers);
                        this.type = HLL_DATA_FULL;
                        break;
                }
                break;
            case HLL_DATA_SPARSE:
            case HLL_DATA_FULL:
                switch (other.type) { // CHECKSTYLE IGNORE THIS LINE: missing switch default
                    case HLL_DATA_EXPLICIT:
                        for (long value : other.hashSet) {
                            update(value);
                        }
                        break;
                    case HLL_DATA_SPARSE:
                    case HLL_DATA_FULL:
                        mergeRegisters(other.registers);
                        break;
                }
                break;
        }
    }

    public void serialize(DataOutput output) throws IOException {
        switch (type) { // CHECKSTYLE IGNORE THIS LINE: missing switch default
            case HLL_DATA_EMPTY:
                output.writeByte(type);
                break;
            case HLL_DATA_EXPLICIT:
                output.writeByte(type);
                output.writeByte(hashSet.size());
                for (long value : hashSet) {
                    output.writeLong(Long.reverseBytes(value));
                }
                break;
            case HLL_DATA_SPARSE:
            case HLL_DATA_FULL:
                int nonZeroRegisterNum = 0;
                for (int i = 0; i < HLL_REGISTERS_COUNT; i++) {
                    if (registers[i] != 0) {
                        nonZeroRegisterNum++;
                    }
                }
                if (nonZeroRegisterNum > HLL_SPARSE_THRESHOLD) {
                    output.writeByte(HLL_DATA_FULL);
                    for (byte value : registers) {
                        output.writeByte(value);
                    }
                } else {
                    output.writeByte(HLL_DATA_SPARSE);
                    output.writeInt(Integer.reverseBytes(nonZeroRegisterNum));
                    for (int i = 0; i < HLL_REGISTERS_COUNT; i++) {
                        if (registers[i] != 0) {
                            output.writeShort(Short.reverseBytes((short) i));
                            output.writeByte(registers[i]);
                        }
                    }
                }
                break;
        }
    }

    public boolean deserialize(DataInput input) throws IOException {
        assert type == HLL_DATA_EMPTY;

        if (input == null) {
            return false;
        }

        this.type = input.readByte();
        switch (this.type) {
            case HLL_DATA_EMPTY:
                break;
            case HLL_DATA_EXPLICIT:
                int hashSetSize = input.readUnsignedByte();
                for (int i = 0; i < hashSetSize; i++) {
                    update(Long.reverseBytes(input.readLong()));
                }
                assert this.type == HLL_DATA_EXPLICIT;
                break;
            case HLL_DATA_SPARSE:
                int sparseDataSize = Integer.reverseBytes(input.readInt());
                this.registers = new byte[HLL_REGISTERS_COUNT];
                for (int i = 0; i < sparseDataSize; i++) {
                    int idx = Short.reverseBytes(input.readShort());
                    byte value = input.readByte();
                    registers[idx] = value;
                }
                break;
            case HLL_DATA_FULL:
                this.registers = new byte[HLL_REGISTERS_COUNT];
                for (int i = 0; i < HLL_REGISTERS_COUNT; i++) {
                    registers[i] = input.readByte();
                }
                break;
            default:
                return false;
        }

        return true;
    }

    // use strictfp to force java follow IEEE 754 to deal float point strictly
    public strictfp long estimateCardinality() {
        if (type == HLL_DATA_EMPTY) {
            return 0;
        }
        if (type == HLL_DATA_EXPLICIT) {
            return hashSet.size();
        }

        int numStreams = HLL_REGISTERS_COUNT;
        float alpha = 0;

        if (numStreams == 16) {
            alpha = 0.673f;
        } else if (numStreams == 32) {
            alpha = 0.697f;
        } else if (numStreams == 64) {
            alpha = 0.709f;
        } else {
            alpha = 0.7213f / (1 + 1.079f / numStreams);
        }

        float harmonicMean = 0;
        int numZeroRegisters = 0;

        for (int i = 0; i < HLL_REGISTERS_COUNT; i++) {
            harmonicMean += Math.pow(2.0f, -registers[i]);

            if (registers[i] == 0) {
                numZeroRegisters++;
            }
        }

        harmonicMean = 1.0f / harmonicMean;
        double estimate = alpha * numStreams * numStreams * harmonicMean;

        if (estimate <= numStreams * 2.5 && numZeroRegisters != 0) {
            estimate = numStreams * Math.log(((float) numStreams) / ((float) numZeroRegisters));
        } else if (numStreams == 16384 && estimate < 72000) {
            double bias = 5.9119 * 1.0e-18 * (estimate * estimate * estimate * estimate)
                    - 1.4253 * 1.0e-12 * (estimate * estimate * estimate)
                    + 1.2940 * 1.0e-7 * (estimate * estimate)
                    - 5.2921 * 1.0e-3 * estimate
                    + 83.3216;
            estimate -= estimate * (bias / 100);
        }

        return (long) (estimate + 0.5);
    }

    public int maxSerializedSize() {
        switch (type) {
            case HLL_DATA_EMPTY:
            default:
                return 1;
            case HLL_DATA_EXPLICIT:
                return 2 + hashSet.size() * 8;
            case HLL_DATA_SPARSE:
            case HLL_DATA_FULL:
                return 1 + HLL_REGISTERS_COUNT;
        }
    }

    public static final long M64 = 0xc6a4a7935bd1e995L;
    public static final int R64 = 47;
    public static final int SEED = 0xadc83b19;


    private static long getLittleEndianLong(final byte[] data, final int index) {
        return (((long) data[index    ] & 0xff))
                | (((long) data[index + 1] & 0xff) <<  8)
                | (((long) data[index + 2] & 0xff) << 16)
                | (((long) data[index + 3] & 0xff) << 24)
                | (((long) data[index + 4] & 0xff) << 32)
                | (((long) data[index + 5] & 0xff) << 40)
                | (((long) data[index + 6] & 0xff) << 48)
                | (((long) data[index + 7] & 0xff) << 56);
    }

    public static long hash64(final byte[] data, final int length, final int seed) {
        long h = (seed & 0xffffffffL) ^ (length * M64);
        final int nblocks = length >> 3;

        // body
        for (int i = 0; i < nblocks; i++) {
            final int index = (i << 3);
            long k = getLittleEndianLong(data, index);

            k *= M64;
            k ^= k >>> R64;
            k *= M64;

            h ^= k;
            h *= M64;
        }

        final int index = (nblocks << 3);
        switch (length - index) { // CHECKSTYLE IGNORE THIS LINE: missing switch default
            case 7:
                h ^= ((long) data[index + 6] & 0xff) << 48;
            case 6: // CHECKSTYLE IGNORE THIS LINE: fall through
                h ^= ((long) data[index + 5] & 0xff) << 40;
            case 5: // CHECKSTYLE IGNORE THIS LINE: fall through
                h ^= ((long) data[index + 4] & 0xff) << 32;
            case 4: // CHECKSTYLE IGNORE THIS LINE: fall through
                h ^= ((long) data[index + 3] & 0xff) << 24;
            case 3: // CHECKSTYLE IGNORE THIS LINE: fall through
                h ^= ((long) data[index + 2] & 0xff) << 16;
            case 2: // CHECKSTYLE IGNORE THIS LINE: fall through
                h ^= ((long) data[index + 1] & 0xff) << 8;
            case 1: // CHECKSTYLE IGNORE THIS LINE: fall through
                h ^= ((long) data[index] & 0xff);
                h *= M64;
        }

        h ^= h >>> R64;
        h *= M64;
        h ^= h >>> R64;

        return h;
    }

    // just for ut
    public int getType() {
        return type;
    }

    // For convert to statistics used Hll128
    public byte[] getRegisters() {
        return registers;
    }

    // For convert to statistics used Hll128
    public Set<Long> getHashSet() {
        return hashSet;
    }
}