ExternalTable.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableAttributes;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIndexes;
import org.apache.doris.catalog.constraint.Constraint;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TTableDescriptor;

import com.google.common.base.Objects;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
 * External table represent tables that are not self-managed by Doris.
 * Such as tables from hive, iceberg, es, etc.
 */
@Getter
public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
    private static final Logger LOG = LogManager.getLogger(ExternalTable.class);

    @SerializedName(value = "id")
    protected long id;
    @SerializedName(value = "name")
    protected String name;
    @SerializedName(value = "remoteName")
    protected String remoteName;
    @SerializedName(value = "type")
    protected TableType type = null;
    @SerializedName(value = "timestamp")
    protected long timestamp;
    // dbName is temporarily retained and will be deleted later. To use dbName, please use db.getFullName()
    @SerializedName(value = "dbName")
    protected String dbName;
    @SerializedName(value = "ta")
    private final TableAttributes tableAttributes = new TableAttributes();

    // this field will be refreshed after reloading schema
    protected volatile long schemaUpdateTime;

    protected long dbId;
    protected boolean objectCreated;
    protected ExternalCatalog catalog;
    protected ExternalDatabase db;

    /**
     * No args constructor for persist.
     */
    public ExternalTable() {
        this.objectCreated = false;
    }

    /**
     * Create external table.
     *
     * @param id Table id.
     * @param name Table name.
     * @param remoteName Remote table name.
     * @param catalog ExternalCatalog this table belongs to.
     * @param db ExternalDatabase this table belongs to.
     * @param type Table type.
     */
    public ExternalTable(long id, String name, String remoteName, ExternalCatalog catalog, ExternalDatabase db,
            TableType type) {
        this.id = id;
        this.name = name;
        this.remoteName = remoteName;
        this.catalog = catalog;
        this.db = db;
        this.dbName = db.getFullName();
        this.type = type;
        this.objectCreated = false;
    }

    public void setCatalog(ExternalCatalog catalog) {
        this.catalog = catalog;
    }

    public void setDb(ExternalDatabase db) {
        this.db = db;
    }

    public void setRemoteName(String remoteName) {
        this.remoteName = remoteName;
    }

    public boolean isView() {
        return false;
    }

    protected void makeSureInitialized() {
        try {
            // getDbOrAnalysisException will call makeSureInitialized in ExternalCatalog.
            ExternalDatabase db = catalog.getDbOrAnalysisException(dbName);
            dbId = db.getId();
            db.makeSureInitialized();
        } catch (AnalysisException e) {
            Util.logAndThrowRuntimeException(LOG, String.format("Exception to get db %s", dbName), e);
        }
    }

    @Override
    public long getId() {
        return id;
    }

    @Override
    public String getName() {
        return name;
    }

    public String getRemoteName() {
        return remoteName;
    }

    @Override
    public TableType getType() {
        return type;
    }

    @Override
    public List<Column> getFullSchema() {
        ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
        Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(dbName, name);
        return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null);
    }

    @Override
    public List<Column> getBaseSchema() {
        return getFullSchema();
    }

    @Override
    public List<Column> getBaseSchema(boolean full) {
        return getFullSchema();
    }

    @Override
    public void setNewFullSchema(List<Column> newSchema) {
    }

    @Override
    public Column getColumn(String name) {
        List<Column> schema = getFullSchema();
        for (Column column : schema) {
            if (name.equalsIgnoreCase(column.getName())) {
                return column;
            }
        }
        return null;
    }

    @Override
    public Map<String, Constraint> getConstraintsMapUnsafe() {
        return tableAttributes.getConstraintsMap();
    }

    @Override
    public String getEngine() {
        return getType().toEngineName();
    }

    @Override
    public String getMysqlType() {
        return getType().toMysqlType();
    }

    @Override
    public long getRowCount() {
        // Return -1 if makeSureInitialized throw exception.
        // For example, init hive table may throw NotSupportedException.
        try {
            makeSureInitialized();
        } catch (Exception e) {
            LOG.warn("Failed to initialize table {}.{}.{}", catalog.getName(), dbName, name, e);
            return TableIf.UNKNOWN_ROW_COUNT;
        }
        // All external table should get external row count from cache.
        return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id);
    }

    @Override
    public long getCachedRowCount() {
        // Return -1 if uninitialized.
        // Before this, for uninitialized tables, we would call makeSureInitialized(), just like the implementation of
        // ExternalTable.getRowCount(), but this is not very meaningful and time-consuming.
        // The getCachedRowCount() function is only used when `show table` and querying `information_schema.tables`.
        if (!isObjectCreated()) {
            return -1;
        }
        // getExtMetaCacheMgr().getRowCountCache().getCachedRowCount() is an asynchronous non-blocking operation.
        // For tables that are not in the cache, it will load asynchronously and return -1.
        return Env.getCurrentEnv().getExtMetaCacheMgr().getRowCountCache().getCachedRowCount(catalog.getId(), dbId, id);
    }

    @Override
    /**
     * Default return -1. Subclass need to implement this interface.
     * This is called by ExternalRowCountCache to load row count cache.
     */
    public long fetchRowCount() {
        return UNKNOWN_ROW_COUNT;
    }

    @Override
    public long getAvgRowLength() {
        return 0;
    }

    @Override
    public long getDataLength() {
        return 0;
    }

    @Override
    public long getIndexLength() {
        return 0;
    }

    @Override
    public long getCreateTime() {
        return 0;
    }

    // return schema update time as default
    // override this method if there is some other kinds of update time
    // use getSchemaUpdateTime if just need the schema update time
    @Override
    public long getUpdateTime() {
        return this.schemaUpdateTime;
    }

    public void setUpdateTime(long schemaUpdateTime) {
        this.schemaUpdateTime = schemaUpdateTime;
    }

    @Override
    public long getLastCheckTime() {
        return 0;
    }

    @Override
    public String getComment() {
        return "";
    }

    @Override
    public String getComment(boolean escapeQuota) {
        return "";
    }

    public TTableDescriptor toThrift() {
        return null;
    }

    @Override
    public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
        throw new NotImplementedException("createAnalysisTask not implemented");
    }

    @Override
    public DatabaseIf getDatabase() {
        return this.db;
    }

    @Override
    public List<Column> getColumns() {
        return getFullSchema();
    }

    @Override
    public boolean autoAnalyzeEnabled() {
        makeSureInitialized();
        String policy = catalog.getTableAutoAnalyzePolicy().get(Pair.of(dbName, name));
        if (policy == null) {
            return catalog.enableAutoAnalyze();
        }
        return policy.equalsIgnoreCase(PropertyAnalyzer.ENABLE_AUTO_ANALYZE_POLICY);
    }

    @Override
    public Optional<ColumnStatistic> getColumnStatistic(String colName) {
        return Optional.empty();
    }

    /**
     * Should only be called in ExternalCatalog's getSchema(),
     * which is called from schema cache.
     * If you want to get schema of this table, use getFullSchema()
     *
     * @return
     */
    public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey key) {
        schemaUpdateTime = System.currentTimeMillis();
        return initSchema(key);
    }

    public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
        return initSchema();
    }

    public Optional<SchemaCacheValue> initSchema() {
        throw new NotImplementedException("implement in sub class");
    }

    public void unsetObjectCreated() {
        this.objectCreated = false;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        String json = GsonUtils.GSON.toJson(this);
        Text.writeString(out, json);
    }

    public static ExternalTable read(DataInput in) throws IOException {
        String json = Text.readString(in);
        return GsonUtils.GSON.fromJson(json, ExternalTable.class);
    }

    @Override
    public void gsonPostProcess() throws IOException {
        objectCreated = false;
    }

    @Override
    public Set<Pair<String, String>> getColumnIndexPairs(Set<String> columns) {
        Set<Pair<String, String>> ret = Sets.newHashSet();
        for (String column : columns) {
            Column col = getColumn(column);
            if (col == null || StatisticsUtil.isUnsupportedType(col.getType())) {
                continue;
            }
            // External table put table name as index name.
            ret.add(Pair.of(String.valueOf(name), column));
        }
        return ret;
    }

    @Override
    public List<Long> getChunkSizes() {
        throw new NotImplementedException("getChunkSized not implemented");
    }

    public Optional<SchemaCacheValue> getSchemaCacheValue() {
        ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
        return cache.getSchemaValue(dbName, name);
    }

    @Override
    public TableIndexes getTableIndexes() {
        return new TableIndexes();
    }

    /**
     * Retrieve all partitions and initialize SelectedPartitions
     *
     * @param snapshot if not support mvcc, ignore this
     * @return
     */
    public SelectedPartitions initSelectedPartitions(Optional<MvccSnapshot> snapshot) {
        if (!supportInternalPartitionPruned()) {
            return SelectedPartitions.NOT_PRUNED;
        }
        if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshot))) {
            return SelectedPartitions.NOT_PRUNED;
        }
        Map<String, PartitionItem> nameToPartitionItems = getNameToPartitionItems(snapshot);
        return new SelectedPartitions(nameToPartitionItems.size(), nameToPartitionItems, false);
    }

    /**
     * get partition map
     * If partition related operations are supported, this method needs to be implemented in the subclass
     *
     * @param snapshot if not support mvcc, ignore this
     * @return partitionName ==> PartitionItem
     */
    public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
        return Collections.emptyMap();
    }

    /**
     * get partition column list
     * If partition related operations are supported, this method needs to be implemented in the subclass
     *
     * @param snapshot if not support mvcc, ignore this
     * @return
     */
    public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
        return Collections.emptyList();
    }

    /**
     * Does it support Internal partition pruned, If so, this method needs to be overridden in subclasses
     * Internal partition pruned : Implement partition pruning logic without relying on external APIs.
     *
     * @return
     */
    public boolean supportInternalPartitionPruned() {
        return false;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof ExternalTable)) {
            return false;
        }
        ExternalTable that = (ExternalTable) o;
        return Objects.equal(name, that.name) && Objects.equal(db, that.db);
    }

    @Override
    public int hashCode() {
        return Objects.hashCode(name, db);
    }
}