Table.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.alter.AlterCancelException;
import org.apache.doris.catalog.constraint.Constraint;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.lock.MonitoredReentrantLock;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.thrift.TTableDescriptor;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Internal representation of table-related metadata. A table contains several partitions.
*/
public abstract class Table extends MetaObject implements Writable, TableIf, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(Table.class);
// empirical value.
// assume that the time a lock is held by thread is less then 100ms
public static final long TRY_LOCK_TIMEOUT_MS = 100L;
public volatile boolean isDropped = false;
private boolean hasCompoundKey = false;
@SerializedName(value = "id")
protected long id;
@SerializedName(value = "name")
protected volatile String name;
@SerializedName(value = "qualifiedDbName")
protected volatile String qualifiedDbName;
@SerializedName(value = "type")
protected TableType type;
@SerializedName(value = "createTime")
protected long createTime;
protected MonitoredReentrantReadWriteLock rwLock;
// Used for queuing commit transactifon tasks to avoid fdb transaction conflicts,
// especially to reduce conflicts when obtaining delete bitmap update locks for
// MoW table
protected MonitoredReentrantLock commitLock;
/*
* fullSchema and nameToColumn should contains all columns, both visible and shadow.
* eg. for OlapTable, when doing schema change, there will be some shadow columns which are not visible
* to query but visible to load process.
* If you want to get all visible columns, you should call getBaseSchema() method, which is override in
* sub classes.
*
* NOTICE: the order of this fullSchema is meaningless to OlapTable
*/
/**
* The fullSchema of OlapTable includes the base columns and the SHADOW_NAME_PREFIX columns.
* The properties of base columns in fullSchema are same as properties in baseIndex.
* For example:
* Table (c1 int, c2 int, c3 int)
* Schema change (c3 to bigint)
* When OlapTable is changing schema, the fullSchema is (c1 int, c2 int, c3 int, SHADOW_NAME_PRFIX_c3 bigint)
* The fullSchema of OlapTable is mainly used by Scanner of Load job.
* <p>
* If you want to get the mv columns, you should call getIndexToSchema in Subclass OlapTable.
*/
@SerializedName(value = "fullSchema")
protected List<Column> fullSchema;
// tree map for case-insensitive lookup.
/**
* The nameToColumn of OlapTable includes the base columns and the SHADOW_NAME_PREFIX columns.
*/
protected Map<String, Column> nameToColumn;
// DO NOT persist this variable.
protected boolean isTypeRead = false;
// table(view)'s comment
@SerializedName(value = "comment")
protected String comment = "";
@SerializedName(value = "ta")
protected TableAttributes tableAttributes = new TableAttributes();
// check read lock leaky
private Map<Long, String> readLockThreads = null;
@SerializedName(value = "isTemporary")
private boolean isTemporary = false;
// gson deserialization will call this at first by derived classes' non-parametered constructor.
public Table(TableType type) {
this.type = type;
this.fullSchema = Lists.newArrayList();
this.nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
this.rwLock = new MonitoredReentrantReadWriteLock(true);
if (Config.check_table_lock_leaky) {
this.readLockThreads = Maps.newConcurrentMap();
}
this.commitLock = new MonitoredReentrantLock(true);
}
public Table(long id, String tableName, TableType type, boolean isTemporary, List<Column> fullSchema) {
this(id, tableName, type, fullSchema);
this.isTemporary = isTemporary;
}
public Table(long id, String tableName, TableType type, List<Column> fullSchema) {
this.id = id;
this.name = tableName;
this.type = type;
// must copy the list, it should not be the same object as in indexIdToSchema
if (fullSchema != null) {
this.fullSchema = Lists.newArrayList(fullSchema);
}
this.nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
if (this.fullSchema != null) {
for (Column col : this.fullSchema) {
nameToColumn.put(col.getDefineName(), col);
}
} else {
// Only view in with-clause have null base
Preconditions.checkArgument(type == TableType.VIEW, "Table has no columns");
}
this.rwLock = new MonitoredReentrantReadWriteLock(true);
this.createTime = Instant.now().getEpochSecond();
if (Config.check_table_lock_leaky) {
this.readLockThreads = Maps.newConcurrentMap();
}
this.commitLock = new MonitoredReentrantLock(true);
}
public void markDropped() {
isDropped = true;
}
public void unmarkDropped() {
isDropped = false;
}
public void readLock() {
this.rwLock.readLock().lock();
if (this.readLockThreads != null && this.rwLock.getReadHoldCount() == 1) {
Thread thread = Thread.currentThread();
this.readLockThreads.put(thread.getId(),
"(" + thread.toString() + ", time " + System.currentTimeMillis() + ")");
}
}
public boolean readLockIfExist() {
readLock();
if (isDropped) {
readUnlock();
return false;
}
return true;
}
public boolean tryReadLock(long timeout, TimeUnit unit) {
try {
boolean res = this.rwLock.readLock().tryLock(timeout, unit);
if (res) {
if (this.readLockThreads != null && this.rwLock.getReadHoldCount() == 1) {
Thread thread = Thread.currentThread();
this.readLockThreads.put(thread.getId(),
"(" + thread.toString() + ", time " + System.currentTimeMillis() + ")");
}
} else {
if (unit.toSeconds(timeout) >= 1) {
LOG.warn("Failed to try table {}'s read lock. timeout {} {}. Current owner: {}",
name, timeout, unit.name(), rwLock.getOwner());
}
}
return res;
} catch (InterruptedException e) {
LOG.warn("failed to try read lock at table[" + name + "]", e);
return false;
}
}
public void readUnlock() {
this.rwLock.readLock().unlock();
if (this.readLockThreads != null && this.rwLock.getReadHoldCount() == 0) {
this.readLockThreads.remove(Thread.currentThread().getId());
}
}
public void writeLock() {
this.rwLock.writeLock().lock();
}
public boolean writeLockIfExist() {
this.rwLock.writeLock().lock();
if (isDropped) {
this.rwLock.writeLock().unlock();
return false;
}
return true;
}
// TabletStatMgr will invoke all olap tables' tryWriteLock every one minute,
// we can set Config.check_table_lock_leaky = true
// and check log to find out whether if the table has lock leaky.
public boolean tryWriteLock(long timeout, TimeUnit unit) {
try {
boolean res = this.rwLock.writeLock().tryLock(timeout, unit);
if (!res && unit.toSeconds(timeout) >= 1) {
if (readLockThreads == null) {
LOG.warn("Failed to try table {}'s write lock. timeout {} {}. Current owner: {}",
name, timeout, unit.name(), rwLock.getOwner());
} else {
LOG.warn("Failed to try table {}'s write lock. timeout {} {}. Current owner: {}, "
+ "current reader: {}",
name, timeout, unit.name(), rwLock.getOwner(), readLockThreads);
}
}
return res;
} catch (InterruptedException e) {
LOG.warn("failed to try write lock at table[" + name + "]", e);
return false;
}
}
public void writeUnlock() {
this.rwLock.writeLock().unlock();
}
public boolean isWriteLockHeldByCurrentThread() {
return this.rwLock.writeLock().isHeldByCurrentThread();
}
public <E extends Exception> void writeLockOrException(E e) throws E {
writeLock();
if (isDropped) {
writeUnlock();
throw e;
}
}
public void writeLockOrDdlException() throws DdlException {
writeLockOrException(new DdlException("unknown table, tableName=" + name,
ErrorCode.ERR_BAD_TABLE_ERROR));
}
public void writeLockOrMetaException() throws MetaNotFoundException {
writeLockOrException(new MetaNotFoundException("unknown table, tableName=" + name,
ErrorCode.ERR_BAD_TABLE_ERROR));
}
public void writeLockOrAlterCancelException() throws AlterCancelException {
writeLockOrException(new AlterCancelException("unknown table, tableName=" + name,
ErrorCode.ERR_BAD_TABLE_ERROR));
}
public boolean tryWriteLockOrMetaException(long timeout, TimeUnit unit) throws MetaNotFoundException {
return tryWriteLockOrException(timeout, unit, new MetaNotFoundException("unknown table, tableName=" + name,
ErrorCode.ERR_BAD_TABLE_ERROR));
}
public <E extends Exception> boolean tryWriteLockOrException(long timeout, TimeUnit unit, E e) throws E {
if (tryWriteLock(timeout, unit)) {
if (isDropped) {
writeUnlock();
throw e;
}
return true;
}
return false;
}
public boolean tryWriteLockIfExist(long timeout, TimeUnit unit) {
if (tryWriteLock(timeout, unit)) {
if (isDropped) {
writeUnlock();
return false;
}
return true;
}
return false;
}
public void commitLock() {
this.commitLock.lock();
}
public boolean tryCommitLock(long timeout, TimeUnit unit) {
try {
boolean res = this.commitLock.tryLock(timeout, unit);
if (!res && unit.toSeconds(timeout) >= 1) {
LOG.warn("Failed to try table {}'s cloud commit lock. timeout {} {}. Current owner: {}",
name, timeout, unit.name(), this.commitLock.getOwner());
}
return res;
} catch (InterruptedException e) {
LOG.warn("failed to try cloud commit lock at table[" + name + "]", e);
return false;
}
}
public Thread getCommitLockOwner() {
return this.commitLock.getOwner();
}
public void commitUnlock() {
this.commitLock.unlock();
}
public boolean isTypeRead() {
return isTypeRead;
}
public void setTypeRead(boolean isTypeRead) {
this.isTypeRead = isTypeRead;
}
public long getId() {
return id;
}
@Override
public String getName() {
return name;
}
public void setName(String newName) {
name = newName;
}
public void setQualifiedDbName(String qualifiedDbName) {
this.qualifiedDbName = qualifiedDbName;
}
public String getQualifiedDbName() {
return qualifiedDbName;
}
public String getQualifiedName() {
if (StringUtils.isEmpty(qualifiedDbName)) {
return name;
} else {
return qualifiedDbName + "." + name;
}
}
public String getDBName() {
String[] strs = qualifiedDbName.split(":");
return strs.length == 2 ? strs[1] : strs[0];
}
public String getDisplayName() {
return isTemporary ? Util.getTempTableDisplayName(name) : name;
}
public Constraint getConstraint(String name) {
return getConstraintsMap().get(name);
}
@Override
public Map<String, Constraint> getConstraintsMapUnsafe() {
return tableAttributes.getConstraintsMap();
}
public TableType getType() {
return type;
}
public List<Column> getFullSchema() {
return ImmutableList.copyOf(fullSchema);
}
// should override in subclass if necessary
public List<Column> getBaseSchema() {
return getBaseSchema(Util.showHiddenColumns());
}
public List<Column> getBaseSchema(boolean full) {
if (full) {
return ImmutableList.copyOf(fullSchema);
} else {
return fullSchema.stream().filter(Column::isVisible).collect(Collectors.toList());
}
}
public void setNewFullSchema(List<Column> newSchema) {
this.fullSchema = newSchema;
this.nameToColumn.clear();
for (Column col : fullSchema) {
nameToColumn.put(col.getName(), col);
}
}
public Column getColumn(String name) {
return nameToColumn.getOrDefault(name, null);
}
public List<Column> getColumns() {
return Lists.newArrayList(nameToColumn.values());
}
public long getCreateTime() {
return createTime;
}
public long getUpdateTime() {
return -1L;
}
public long getRowCount() {
return fetchRowCount();
}
public long getAvgRowLength() {
return 0;
}
public long getDataLength() {
return 0;
}
public long getIndexLength() {
return 0;
}
public boolean isTemporary() {
return isTemporary;
}
public TTableDescriptor toThrift() {
return null;
}
public static Table read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) {
Table table = null;
TableType type = TableType.valueOf(Text.readString(in));
if (type == TableType.OLAP) {
table = new OlapTable();
} else if (type == TableType.MATERIALIZED_VIEW) {
table = new MTMV();
} else if (type == TableType.ODBC) {
table = new OdbcTable();
} else if (type == TableType.MYSQL) {
table = new MysqlTable();
} else if (type == TableType.VIEW) {
table = new View();
} else if (type == TableType.BROKER) {
table = new BrokerTable();
} else if (type == TableType.ELASTICSEARCH) {
table = new EsTable();
} else if (type == TableType.HIVE) {
table = new HiveTable();
} else if (type == TableType.JDBC) {
table = new JdbcTable();
} else {
throw new IOException("Unknown table type: " + type.name());
}
table.setTypeRead(true);
table.readFields(in);
return table;
} else {
return GsonUtils.GSON.fromJson(Text.readString(in), Table.class);
}
}
@Override
public void gsonPostProcess() throws IOException {
List<Column> keys = Lists.newArrayList();
for (Column column : fullSchema) {
if (column.isKey()) {
keys.add(column);
}
this.nameToColumn.put(column.getName(), column);
}
if (keys.size() > 1) {
keys.forEach(key -> key.setCompoundKey(true));
hasCompoundKey = true;
}
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
@Deprecated
public void readFields(DataInput in) throws IOException {
if (!isTypeRead) {
type = TableType.valueOf(Text.readString(in));
isTypeRead = true;
}
super.readFields(in);
this.id = in.readLong();
this.name = Text.readString(in);
List<Column> keys = Lists.newArrayList();
// base schema
int columnCount = in.readInt();
for (int i = 0; i < columnCount; i++) {
Column column = Column.read(in);
if (column.isKey()) {
keys.add(column);
}
this.fullSchema.add(column);
this.nameToColumn.put(column.getName(), column);
}
if (keys.size() > 1) {
keys.forEach(key -> key.setCompoundKey(true));
hasCompoundKey = true;
}
comment = Text.readString(in);
// table attribute only support after version 127
if (FeMetaVersion.VERSION_127 <= Env.getCurrentEnvJournalVersion()) {
String json = Text.readString(in);
this.tableAttributes = GsonUtils.GSON.fromJson(json, TableAttributes.class);
}
// read create time
this.createTime = in.readLong();
}
// return if this table is partitioned, for planner.
// For OlapTable ture when is partitioned, or distributed by hash when no partition
public boolean isPartitionDistributed() {
return false;
}
public Partition getPartition(String partitionName) {
return null;
}
@Override
public String getEngine() {
return type.toEngineName();
}
@Override
public String getMysqlType() {
return type.toMysqlType();
}
@Override
public String getComment() {
return getComment(false);
}
@Override
public String getComment(boolean escapeQuota) {
if (!Strings.isNullOrEmpty(comment)) {
if (!escapeQuota) {
return comment;
}
return SqlUtils.escapeQuota(comment);
}
return "";
}
public void setComment(String comment) {
this.comment = Strings.nullToEmpty(comment);
}
public void setId(long id) {
this.id = id;
}
@Override
public String toString() {
return "Table [id=" + id + ", name=" + name + ", type=" + type + "]";
}
public JSONObject toSimpleJson() {
JSONObject table = new JSONObject();
table.put("Type", type.toEngineName());
table.put("Id", Long.toString(id));
table.put("Name", name);
return table;
}
@Override
public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
throw new NotImplementedException("createAnalysisTask not implemented");
}
@Override
public DatabaseIf getDatabase() {
return Env.getCurrentInternalCatalog().getDbNullable(qualifiedDbName);
}
@Override
public Optional<ColumnStatistic> getColumnStatistic(String colName) {
return Optional.empty();
}
public void analyze(String dbName) {}
@Override
public List<Long> getChunkSizes() {
throw new NotImplementedException("getChunkSized not implemented");
}
@Override
public long fetchRowCount() {
return UNKNOWN_ROW_COUNT;
}
@Override
public Set<Pair<String, String>> getColumnIndexPairs(Set<String> columns) {
return Sets.newHashSet();
}
@Override
public long getCachedRowCount() {
return getRowCount();
}
@Override
public boolean autoAnalyzeEnabled() {
return true;
}
@Override
public TableIndexes getTableIndexes() {
return new TableIndexes();
}
}