Dictionary.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.dictionary;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.trees.plans.commands.info.CreateDictionaryInfo;
import org.apache.doris.nereids.trees.plans.commands.info.DictionaryColumnDefinition;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TDictionaryTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

/**
 * Dictionary metadata, including its structure and data source information. saved in
 */
public class Dictionary extends Table {
    private static final Logger LOG = LogManager.getLogger(Dictionary.class);

    // TODO: maybe dictionary should also be able to be created in external catalog.
    @SerializedName(value = "dbName")
    private final String dbName;

    // dict name use base class's name. these 3 is expected not to be null
    @SerializedName(value = "sourceCtlName")
    private final String sourceCtlName;
    @SerializedName(value = "sourceDbName")
    private final String sourceDbName;
    @SerializedName(value = "sourceTableName")
    private final String sourceTableName;

    @SerializedName(value = "columns")
    private final List<DictionaryColumnDefinition> columns;

    // createTime saved in base class

    // lastUpdateTime in milliseconds
    @SerializedName(value = "lastUpdateTime")
    private long lastUpdateTime;

    // when longer than lastUpdateTime + dataLifetimeSecs, data is out of date.
    @SerializedName(value = "dataLifetimeSecs")
    private long dataLifetimeSecs;

    @SerializedName(value = "memoryLimit")
    private long memoryLimit;

    public enum DictionaryStatus {
        NORMAL, // normal status
        LOADING, // wait load task finishs
        OUT_OF_DATE // wait load task be scheduled
    }

    private final AtomicReference<DictionaryStatus> status = new AtomicReference<>();

    @SerializedName(value = "layout")
    private final LayoutType layout;
    @SerializedName(value = "version")
    private long version; // every time dictionary is updated, version will increase by 1

    @SerializedName(value = "skipNullKey")
    private boolean skipNullKey;

    // not record this. whenever restart FE or switch Master, it will be reset. then lead to reload dictionary.
    private long srcVersion = 0;
    // if srcVersion same with this, we could skip automatically update.
    private long latestInvalidVersion = 0;

    private List<DictionaryDistribution> dataDistributions; // every time update, reset with a new list
    private String lastUpdateResult;

    // we need this to call Table's constructor with no args which construct new rwLock and more.
    // for gson only and it will set variables soon. so no need to set them.
    public Dictionary() {
        super(TableType.DICTIONARY);
        this.dbName = null;
        this.sourceCtlName = null;
        this.sourceDbName = null;
        this.sourceTableName = null;
        this.columns = null;
        this.lastUpdateTime = 0;
        this.dataLifetimeSecs = 0;
        this.status.set(DictionaryStatus.OUT_OF_DATE); // not replay by gson
        this.layout = null;
        this.version = 0;
        this.skipNullKey = false;
        this.memoryLimit = 0;
        resetDataDistributions(); // not replay by gson
        this.lastUpdateResult = new String();
    }

    public Dictionary(CreateDictionaryInfo info, long uniqueId) {
        super(uniqueId, info.getDictName(), TableType.DICTIONARY, /* source table's schema as dict's FullSchema */info
                .getColumns().stream().map(DictionaryColumnDefinition::getOriginColumn).collect(Collectors.toList()));
        this.dbName = info.getDbName();
        this.sourceCtlName = info.getSourceCtlName();
        this.sourceDbName = info.getSourceDbName();
        this.sourceTableName = info.getSourceTableName();
        this.columns = info.getColumns();
        this.lastUpdateTime = createTime;
        this.dataLifetimeSecs = info.getDataLifetime();
        // no data in the beginning so it's OUT_OF_DATE
        this.status.set(DictionaryStatus.OUT_OF_DATE);
        this.layout = info.getLayout();
        this.version = 1;
        this.skipNullKey = info.skipNullKey();
        this.memoryLimit = info.getMemoryLimit();
        resetDataDistributions();
        this.lastUpdateResult = new String();
    }

    public String getDbName() {
        return dbName;
    }

    public String getSourceCtlName() {
        return sourceCtlName;
    }

    public String getSourceDbName() {
        return sourceDbName;
    }

    public String getSourceTableName() {
        return sourceTableName;
    }

    public List<DictionaryColumnDefinition> getDicColumns() {
        return columns;
    }

    public Column getOriginColumn(String name) {
        for (DictionaryColumnDefinition column : columns) {
            if (column.getName().equalsIgnoreCase(name)) {
                return column.getOriginColumn();
            }
        }
        throw new IllegalArgumentException("Column " + name + " not found in dictionary " + getName());
    }

    public List<String> getSourceQualifiedName() {
        List<String> qualifiedName = Lists.newArrayList();
        if (Strings.isNullOrEmpty(sourceCtlName) || Strings.isNullOrEmpty(sourceDbName)
                || Strings.isNullOrEmpty(sourceTableName)) {
            throw new IllegalArgumentException(
                    "dictionary's source name " + qualifiedName.toString() + "is not completed");
        }
        qualifiedName.add(sourceCtlName);
        qualifiedName.add(sourceDbName);
        qualifiedName.add(sourceTableName);
        return qualifiedName;
    }

    @Override
    public Database getDatabase() {
        return Env.getCurrentInternalCatalog().getDbNullable(dbName);
    }

    @Override
    public List<String> getFullQualifiers() {
        return ImmutableList.of(Env.getCurrentEnv().getInternalCatalog().getName(), dbName, getName());
    }

    public List<String> getColumnNames() {
        return columns.stream().map(DictionaryColumnDefinition::getName).collect(Collectors.toList());
    }

    public long getNextRefreshTime() {
        return lastUpdateTime + dataLifetimeSecs * 1000;
    }

    public long getDataLifetimeSecs() {
        return dataLifetimeSecs;
    }

    public DataType getColumnType(String columnName) {
        for (DictionaryColumnDefinition column : columns) {
            if (column.getName().equalsIgnoreCase(columnName)) {
                return DataType.fromCatalogType(column.getType());
            }
        }
        throw new IllegalArgumentException("Column " + columnName + " not found in dictionary " + getName());
    }

    public List<DataType> getKeyColumnTypes() {
        List<DataType> keyTypes = Lists.newArrayList();
        for (DictionaryColumnDefinition column : columns) {
            if (column.isKey()) {
                keyTypes.add(DataType.fromCatalogType(column.getType()));
            }
        }
        if (keyTypes.isEmpty()) {
            throw new IllegalArgumentException("Key column not found in dictionary " + getName());
        }
        return keyTypes;
    }

    public void increaseVersion() {
        this.version++;
    }

    public void decreaseVersion() {
        this.version--;
    }

    public long getVersion() {
        return version;
    }

    public boolean skipNullKey() {
        return skipNullKey;
    }

    public long getMemoryLimit() {
        return memoryLimit;
    }

    public long getLastUpdateTime() {
        return lastUpdateTime;
    }

    public void updateLastUpdateTime() {
        this.lastUpdateTime = System.currentTimeMillis();
    }

    /**
     * @return true if source table's version is newer than this dictionary's version(need update dictionary).
     */
    public boolean hasNewerSourceVersion() {
        TableIf tableIf = RelationUtil.getTable(getSourceQualifiedName(), Env.getCurrentEnv());
        if (tableIf == null) {
            throw new RuntimeException(getName() + "'s source table not found");
        }
        if (tableIf instanceof MTMVRelatedTableIf) { // include OlapTable and some External tables
            long tableVersionNow = ((MTMVRelatedTableIf) tableIf).getNewestUpdateVersionOrTime();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dictionary " + getName() + " src's now version is " + tableVersionNow + ", old is "
                        + srcVersion);
            }
            if (tableVersionNow < srcVersion) {
                // maybe drop and recreate. but if so, this dictionary should be dropped as well.
                // so should not happen.
                throw new RuntimeException("Dictionary " + getName() + "'s source table's version " + tableVersionNow
                        + " is smaller than dictionary's " + srcVersion);
            } else if (tableVersionNow > srcVersion && tableVersionNow != latestInvalidVersion) {
                // if src is a illegal version, we can skip it.
                return true;
            } else {
                return false;
            }
        }
        // just update for tables without version information.
        return true;
    }

    // when refresh success, update srcVersion.
    public void updateSrcVersion(long value) {
        srcVersion = value;
    }

    public long getSrcVersion() {
        return srcVersion;
    }

    public void updateLatestInvalidVersion(long value) {
        if (value < latestInvalidVersion) {
            throw new RuntimeException("latestInvalidVersion of " + getName() + " should be greater than "
                    + latestInvalidVersion + ", but got " + value);
        }
        latestInvalidVersion = value;
    }

    /**
     * if has latestInvalidVersion and the base table's data not changed, we can skip update.
     */
    public boolean checkBaseDataValid() {
        TableIf tableIf = RelationUtil.getTable(getSourceQualifiedName(), Env.getCurrentEnv());
        if (tableIf == null) {
            throw new RuntimeException(getName() + "'s source table not found");
        }
        if (tableIf instanceof MTMVRelatedTableIf) { // include OlapTable and some External tables
            long tableVersionNow = ((MTMVRelatedTableIf) tableIf).getNewestUpdateVersionOrTime();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dictionary " + getName() + ": src's now version is " + tableVersionNow + ", old is "
                        + srcVersion);
            }
            // if not the known invalid version, maybe valid. so return true. otherwise we skip it.
            return tableVersionNow != latestInvalidVersion;
        }
        return true;
    }

    public DictionaryStatus getStatus() {
        return status.get();
    }

    public boolean trySetStatusIf(DictionaryStatus currentStatus, DictionaryStatus newStatus) {
        return status.compareAndSet(currentStatus, newStatus);
    }

    public boolean trySetStatus(DictionaryStatus newStatus) {
        // use get and compare to avoid ABA problem
        while (true) {
            DictionaryStatus currentStatus = status.get();

            if (currentStatus == DictionaryStatus.NORMAL) {
                if (status.compareAndSet(currentStatus, newStatus)) {
                    return true;
                } else {
                    continue; // status changed, retry
                }
            } else if (currentStatus == DictionaryStatus.LOADING) {
                // may success or failed, cannot accept another loading
                if (newStatus == DictionaryStatus.NORMAL || newStatus == DictionaryStatus.OUT_OF_DATE) {
                    if (status.compareAndSet(currentStatus, newStatus)) {
                        return true;
                    } else {
                        continue; // status changed, retry
                    }
                }
                return false;
            } else if (currentStatus == DictionaryStatus.OUT_OF_DATE) {
                // we could load or drop it
                if (newStatus == DictionaryStatus.LOADING) {
                    if (status.compareAndSet(currentStatus, newStatus)) {
                        return true;
                    } else {
                        continue; // status changed, retry
                    }
                }
                return false;
            }
            return false; // unknown status
        }
    }

    public LayoutType getLayout() {
        return layout;
    }

    public String prettyPrintDistributions() {
        if (dataDistributions == null) {
            return ""; // no records
        }
        return "{" + StringUtils.join(dataDistributions, ", ") + "}";
    }

    public List<DictionaryDistribution> getDataDistributions() {
        return dataDistributions;
    }

    public void resetDataDistributions() {
        this.dataDistributions = Lists.newArrayList();
    }

    public boolean dataCompleted() {
        List<Long> aliveBEs = Env.getCurrentSystemInfo().getAllBackendIds(true);
        if (dataDistributions.size() < aliveBEs.size()) {
            // greater is OK. may be BEs down.
            return false;
        }
        // if only there's alive BE not find in dataDistributions, return false
        Set<Long> beIdsHasData = Sets.newHashSet();
        for (DictionaryDistribution distribution : dataDistributions) {
            if (distribution.getVersion() < version) {
                return false;
            }
            beIdsHasData.add(distribution.getBackendId());
        }
        for (Long backendId : aliveBEs) {
            if (!beIdsHasData.contains(backendId)) {
                // some of BE does not have data
                return false;
            }
        }
        return true;
    }

    // get BEs which are out of date.
    public List<Backend> filterOutdatedBEs(List<Backend> backends) {
        if (dataDistributions == null || dataDistributions.isEmpty()) {
            // only called when do partial load. it bases on collection of data distributions.
            // so dataDistributions should not be null.
            LOG.warn("dataDistributions of " + getName() + " is null or empty. should not happen");
            return backends;
        }
        Set<Long> validBEs = Sets.newHashSet();
        for (DictionaryDistribution distribution : dataDistributions) {
            if (distribution.getVersion() == version) {
                validBEs.add(distribution.getBackendId());
            }
        }
        // get those not valid. maybe: 1. version not match(outdated) 2. not in dataDistributions(newcomers)
        return backends.stream().filter(backend -> !validBEs.contains(backend.getId())).collect(Collectors.toList());
    }

    public void setLastUpdateResult(String lastUpdateResult) {
        // get readable time
        String now = TimeUtils.getCurrentFormatTime();
        this.lastUpdateResult = now + ": " + lastUpdateResult;
    }

    public String getLastUpdateResult() {
        return lastUpdateResult;
    }

    @Override
    public TTableDescriptor toThrift() {
        TDictionaryTable tDictionaryTable = new TDictionaryTable();
        TTableDescriptor tTableDescriptor = new TTableDescriptor(id, TTableType.DICTIONARY_TABLE, fullSchema.size(), 0,
                getName(), dbName);
        tTableDescriptor.setDictionaryTable(tDictionaryTable);
        return tTableDescriptor;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        String json = GsonUtils.GSON.toJson(this);
        Text.writeString(out, json);
    }

    public static Dictionary read(DataInput in) throws IOException {
        String json = Text.readString(in);
        return GsonUtils.GSON.fromJson(json, Dictionary.class);
    }

    @Override
    public String toString() {
        return "Dictionary{" + getName() + "}";
    }
}