StatisticsUtil.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics.util;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BoolLiteral;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.analysis.DecimalLiteral;
import org.apache.doris.analysis.FloatLiteral;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LargeIntLiteral;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.analysis.VariableExpr;
import org.apache.doris.catalog.AggStateType;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.VariantType;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
import org.apache.doris.nereids.trees.expressions.literal.IPv4Literal;
import org.apache.doris.nereids.trees.expressions.literal.IPv6Literal;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.ColStatsMeta;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.statistics.PartitionColumnStatistic;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.system.Frontend;
import com.google.common.base.Preconditions;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
public class StatisticsUtil {
private static final Logger LOG = LogManager.getLogger(StatisticsUtil.class);
private static final String ID_DELIMITER = "-";
private static final String TOTAL_SIZE = "totalSize";
private static final String NUM_ROWS = "numRows";
private static final String SPARK_NUM_ROWS = "spark.sql.statistics.numRows";
private static final String SPARK_TOTAL_SIZE = "spark.sql.statistics.totalSize";
private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static final int UPDATED_PARTITION_THRESHOLD = 3;
public static List<ResultRow> executeQuery(String template, Map<String, String> params) {
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(template);
return execStatisticQuery(sql, true);
}
public static void execUpdate(String template, Map<String, String> params) throws Exception {
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(template);
execUpdate(sql);
}
public static List<ResultRow> execStatisticQuery(String sql) {
return execStatisticQuery(sql, false);
}
public static List<ResultRow> execStatisticQuery(String sql, boolean enableFileCache) {
if (!FeConstants.enableInternalSchemaDb) {
return Collections.emptyList();
}
boolean useFileCacheForStat = (enableFileCache && Config.allow_analyze_statistics_info_polluting_file_cache);
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(useFileCacheForStat)) {
if (Config.isCloudMode()) {
try {
r.connectContext.getCloudCluster();
} catch (ComputeGroupException e) {
LOG.warn("failed to connect to cloud cluster", e);
return Collections.emptyList();
}
}
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql);
r.connectContext.setExecutor(stmtExecutor);
return stmtExecutor.executeInternalQuery();
}
}
public static QueryState execUpdate(String sql) throws Exception {
StmtExecutor stmtExecutor = null;
AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false);
try {
stmtExecutor = new StmtExecutor(r.connectContext, sql);
r.connectContext.setExecutor(stmtExecutor);
stmtExecutor.execute();
QueryState state = r.connectContext.getState();
if (state.getStateType().equals(QueryState.MysqlStateType.ERR)) {
throw new Exception(state.getErrorMessage());
}
return state;
} finally {
r.close();
if (stmtExecutor != null) {
AuditLogHelper.logAuditLog(r.connectContext, stmtExecutor.getOriginStmt().originStmt,
stmtExecutor.getParsedStmt(), stmtExecutor.getQueryStatisticsForAuditLog(), true);
}
}
}
public static ColumnStatistic deserializeToColumnStatistics(List<ResultRow> resultBatches) {
if (CollectionUtils.isEmpty(resultBatches)) {
return null;
}
return ColumnStatistic.fromResultRow(resultBatches);
}
public static List<Histogram> deserializeToHistogramStatistics(List<ResultRow> resultBatches) {
return resultBatches.stream().map(Histogram::fromResultRow).collect(Collectors.toList());
}
public static PartitionColumnStatistic deserializeToPartitionStatistics(List<ResultRow> resultBatches)
throws IOException {
if (CollectionUtils.isEmpty(resultBatches)) {
return null;
}
return PartitionColumnStatistic.fromResultRow(resultBatches);
}
public static AutoCloseConnectContext buildConnectContext(boolean useFileCacheForStat) {
ConnectContext connectContext = new ConnectContext();
SessionVariable sessionVariable = connectContext.getSessionVariable();
sessionVariable.internalSession = true;
sessionVariable.setMaxExecMemByte(Config.statistics_sql_mem_limit_in_bytes);
sessionVariable.cpuResourceLimit = Config.cpu_resource_limit_per_analyze_task;
sessionVariable.setEnableInsertStrict(true);
sessionVariable.setInsertMaxFilterRatio(1.0);
sessionVariable.enablePageCache = false;
sessionVariable.enableProfile = Config.enable_profile_when_analyze;
sessionVariable.parallelExecInstanceNum = Config.statistics_sql_parallel_exec_instance_num;
sessionVariable.parallelPipelineTaskNum = Config.statistics_sql_parallel_exec_instance_num;
sessionVariable.setQueryTimeoutS(StatisticsUtil.getAnalyzeTimeout());
sessionVariable.insertTimeoutS = StatisticsUtil.getAnalyzeTimeout();
sessionVariable.enableFileCache = false;
sessionVariable.forbidUnknownColStats = false;
sessionVariable.enablePushDownMinMaxOnUnique = true;
sessionVariable.enablePushDownStringMinMax = true;
sessionVariable.enableUniqueKeyPartialUpdate = false;
sessionVariable.enableMaterializedViewRewrite = false;
sessionVariable.enableQueryCache = false;
sessionVariable.enableSqlCache = false;
connectContext.setEnv(Env.getCurrentEnv());
connectContext.setDatabase(FeConstants.INTERNAL_DB_NAME);
connectContext.setQualifiedUser(UserIdentity.ADMIN.getQualifiedUser());
connectContext.setCurrentUserIdentity(UserIdentity.ADMIN);
connectContext.setStartTime();
if (Config.isCloudMode()) {
AutoCloseConnectContext ctx = new AutoCloseConnectContext(connectContext);
try {
ctx.connectContext.getCloudCluster();
} catch (ComputeGroupException e) {
LOG.warn("failed to connect to cloud cluster", e);
return ctx;
}
sessionVariable.disableFileCache = !useFileCacheForStat;
return ctx;
} else {
return new AutoCloseConnectContext(connectContext);
}
}
public static void analyze(StatementBase statementBase) throws UserException {
try (AutoCloseConnectContext r = buildConnectContext(false)) {
Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), r.connectContext);
statementBase.analyze(analyzer);
}
}
public static LiteralExpr readableValue(Type type, String columnValue) throws AnalysisException {
Preconditions.checkArgument(type.isScalarType());
ScalarType scalarType = (ScalarType) type;
// check if default value is valid.
// if not, some literal constructor will throw AnalysisException
PrimitiveType primitiveType = scalarType.getPrimitiveType();
switch (primitiveType) {
case BOOLEAN:
return new BoolLiteral(columnValue);
case TINYINT:
case SMALLINT:
case INT:
case BIGINT:
return new IntLiteral(columnValue, type);
case LARGEINT:
return new LargeIntLiteral(columnValue);
case FLOAT:
// the min max value will loose precision when value type is double.
case DOUBLE:
return new FloatLiteral(columnValue);
case DECIMALV2:
// no need to check precision and scale, since V2 is fixed point
return new DecimalLiteral(columnValue);
case DECIMAL32:
case DECIMAL64:
case DECIMAL128:
case DECIMAL256:
DecimalLiteral decimalLiteral = new DecimalLiteral(columnValue);
decimalLiteral.checkPrecisionAndScale(scalarType.getScalarPrecision(), scalarType.getScalarScale());
return decimalLiteral;
case DATE:
case DATETIME:
case DATEV2:
case DATETIMEV2:
return new DateLiteral(columnValue, type);
case CHAR:
case VARCHAR:
case STRING:
return new StringLiteral(columnValue);
case IPV4:
return new org.apache.doris.analysis.IPv4Literal(columnValue);
case IPV6:
return new org.apache.doris.analysis.IPv6Literal(columnValue);
case HLL:
case BITMAP:
case ARRAY:
case MAP:
case STRUCT:
default:
throw new AnalysisException("Unsupported setting this type: " + type + " of min max value");
}
}
public static double convertToDouble(Type type, String columnValue) throws AnalysisException {
Preconditions.checkArgument(type.isScalarType());
try {
ScalarType scalarType = (ScalarType) type;
// check if default value is valid.
// if not, some literal constructor will throw AnalysisException
PrimitiveType primitiveType = scalarType.getPrimitiveType();
switch (primitiveType) {
case BOOLEAN:
return Boolean.parseBoolean(columnValue) ? 1.0 : 0.0;
case TINYINT:
case SMALLINT:
case INT:
case BIGINT:
case LARGEINT:
case FLOAT:
// the min max value will loose precision when value type is double.
case DOUBLE:
case DECIMALV2:
case DECIMAL32:
case DECIMAL64:
case DECIMAL128:
case DECIMAL256:
return Double.parseDouble(columnValue);
case DATE:
case DATEV2:
org.apache.doris.nereids.trees.expressions.literal.DateLiteral literal =
new org.apache.doris.nereids.trees.expressions.literal.DateLiteral(columnValue);
return literal.getDouble();
case DATETIMEV2:
case DATETIME:
DateTimeLiteral dateTimeLiteral = new DateTimeLiteral(columnValue);
return dateTimeLiteral.getDouble();
case CHAR:
case VARCHAR:
case STRING:
VarcharLiteral varchar = new VarcharLiteral(columnValue);
return varchar.getDouble();
case IPV4:
IPv4Literal ipv4 = new IPv4Literal(columnValue);
return ipv4.getDouble();
case IPV6:
IPv6Literal ipv6 = new IPv6Literal(columnValue);
return ipv6.getDouble();
case HLL:
case BITMAP:
case ARRAY:
case MAP:
case STRUCT:
default:
throw new AnalysisException("Unsupported setting this type: " + type + " of min max value");
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}
}
public static DBObjects convertTableNameToObjects(TableNameInfo tableNameInfo) {
CatalogIf<? extends DatabaseIf<? extends TableIf>> catalogIf =
Env.getCurrentEnv().getCatalogMgr().getCatalog(tableNameInfo.getCtl());
if (catalogIf == null) {
throw new IllegalStateException(String.format("Catalog:%s doesn't exist", tableNameInfo.getCtl()));
}
DatabaseIf<? extends TableIf> databaseIf = catalogIf.getDbNullable(tableNameInfo.getDb());
if (databaseIf == null) {
throw new IllegalStateException(String.format("DB:%s doesn't exist", tableNameInfo.getDb()));
}
TableIf tableIf = databaseIf.getTableNullable(tableNameInfo.getTbl());
if (tableIf == null) {
throw new IllegalStateException(String.format("Table:%s doesn't exist", tableNameInfo.getTbl()));
}
return new DBObjects(catalogIf, databaseIf, tableIf);
}
public static DBObjects convertTableNameToObjects(TableName tableName) {
CatalogIf<? extends DatabaseIf<? extends TableIf>> catalogIf =
Env.getCurrentEnv().getCatalogMgr().getCatalog(tableName.getCtl());
if (catalogIf == null) {
throw new IllegalStateException(String.format("Catalog:%s doesn't exist", tableName.getCtl()));
}
DatabaseIf<? extends TableIf> databaseIf = catalogIf.getDbNullable(tableName.getDb());
if (databaseIf == null) {
throw new IllegalStateException(String.format("DB:%s doesn't exist", tableName.getDb()));
}
TableIf tableIf = databaseIf.getTableNullable(tableName.getTbl());
if (tableIf == null) {
throw new IllegalStateException(String.format("Table:%s doesn't exist", tableName.getTbl()));
}
return new DBObjects(catalogIf, databaseIf, tableIf);
}
public static DBObjects convertIdToObjects(long catalogId, long dbId, long tblId) {
return new DBObjects(findCatalog(catalogId), findDatabase(catalogId, dbId), findTable(catalogId, dbId, tblId));
}
public static Column findColumn(long catalogId, long dbId, long tblId, long idxId, String columnName) {
CatalogIf<? extends DatabaseIf<? extends TableIf>> catalogIf =
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
if (catalogIf == null) {
return null;
}
DatabaseIf<? extends TableIf> db = catalogIf.getDb(dbId).orElse(null);
if (db == null) {
return null;
}
TableIf tblIf = db.getTable(tblId).orElse(null);
if (tblIf == null) {
return null;
}
if (idxId != -1) {
if (tblIf instanceof OlapTable) {
OlapTable olapTable = (OlapTable) tblIf;
return olapTable.getIndexIdToMeta().get(idxId).getColumnByName(columnName);
}
}
return tblIf.getColumn(columnName);
}
@SuppressWarnings({"unchecked"})
public static Column findColumn(String catalogName, String dbName, String tblName, String columnName)
throws Throwable {
TableIf tableIf = findTable(catalogName, dbName, tblName);
return tableIf.getColumn(columnName);
}
/**
* Throw RuntimeException if table not exists.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static TableIf findTable(String catalogName, String dbName, String tblName) {
try {
DatabaseIf db = findDatabase(catalogName, dbName);
return db.getTableOrException(tblName,
t -> new RuntimeException("Table: " + t + " not exists"));
} catch (Throwable t) {
throw new RuntimeException("Table: `" + catalogName + "." + dbName + "." + tblName + "` not exists");
}
}
public static TableIf findTable(long catalogId, long dbId, long tblId) {
try {
DatabaseIf<? extends TableIf> db = findDatabase(catalogId, dbId);
return db.getTableOrException(tblId,
t -> new RuntimeException("Table: " + t + " not exists"));
} catch (Throwable t) {
throw new RuntimeException("Table: `" + catalogId + "." + dbId + "." + tblId + "` not exists");
}
}
/**
* Throw RuntimeException if database not exists.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static DatabaseIf findDatabase(String catalogName, String dbName) throws Throwable {
CatalogIf catalog = findCatalog(catalogName);
return catalog.getDbOrException(dbName,
d -> new RuntimeException("DB: " + d + " not exists"));
}
public static DatabaseIf<? extends TableIf> findDatabase(long catalogId, long dbId) {
CatalogIf<? extends DatabaseIf<? extends TableIf>> catalog = findCatalog(catalogId);
return catalog.getDbOrException(dbId,
d -> new RuntimeException("DB: " + d + " not exists"));
}
/**
* Throw RuntimeException if catalog not exists.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static CatalogIf findCatalog(String catalogName) {
return Env.getCurrentEnv().getCatalogMgr()
.getCatalogOrException(catalogName, c -> new RuntimeException("Catalog: " + c + " not exists"));
}
public static CatalogIf<? extends DatabaseIf<? extends TableIf>> findCatalog(long catalogId) {
return Env.getCurrentEnv().getCatalogMgr().getCatalogOrException(catalogId,
c -> new RuntimeException("Catalog: " + c + " not exists"));
}
public static boolean isNullOrEmpty(String str) {
return Optional.ofNullable(str)
.map(String::trim)
.map(String::toLowerCase)
.map(s -> "null".equalsIgnoreCase(s) || s.isEmpty())
.orElse(true);
}
public static boolean statsTblAvailable() {
String dbName = FeConstants.INTERNAL_DB_NAME;
List<OlapTable> statsTbls = new ArrayList<>();
try {
statsTbls.add(
(OlapTable) StatisticsUtil
.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
dbName,
StatisticConstants.TABLE_STATISTIC_TBL_NAME));
// uncomment it when hist is available for user.
// statsTbls.add(
// (OlapTable) StatisticsUtil
// .findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
// dbName,
// StatisticConstants.HISTOGRAM_TBL_NAME));
} catch (Throwable t) {
LOG.info("stat table does not exist, db_name: {}, table_name: {}", dbName,
StatisticConstants.TABLE_STATISTIC_TBL_NAME);
return false;
}
if (Config.isCloudMode()) {
if (!((CloudSystemInfoService) Env.getCurrentSystemInfo()).availableBackendsExists()) {
LOG.info("there are no available backends");
return false;
}
try (AutoCloseConnectContext r = buildConnectContext(false)) {
try {
r.connectContext.getCloudCluster();
} catch (ComputeGroupException e) {
LOG.warn("failed to connect to cloud cluster", e);
return false;
}
for (OlapTable table : statsTbls) {
for (Partition partition : table.getPartitions()) {
if (partition.getBaseIndex().getTablets().stream()
.anyMatch(t -> t.getNormalReplicaBackendIds().isEmpty())) {
return false;
}
}
}
}
} else {
for (OlapTable table : statsTbls) {
for (Partition partition : table.getPartitions()) {
if (partition.getBaseIndex().getTablets().stream()
.anyMatch(t -> t.getNormalReplicaBackendIds().isEmpty())) {
return false;
}
}
}
}
return true;
}
public static Map<Long, Partition> getIdToPartition(TableIf table) {
return table.getPartitionNames().stream()
.map(table::getPartition)
.filter(Objects::nonNull)
.collect(Collectors.toMap(
Partition::getId,
Function.identity()
));
}
public static Map<Long, String> getPartitionIdToName(TableIf table) {
return table.getPartitionNames().stream()
.map(table::getPartition)
.collect(Collectors.toMap(
Partition::getId,
Partition::getName
));
}
public static Set<String> getPartitionIds(TableIf table) {
if (table instanceof OlapTable) {
return ((OlapTable) table).getPartitionIds().stream().map(String::valueOf).collect(Collectors.toSet());
} else if (table instanceof ExternalTable) {
return table.getPartitionNames();
}
throw new RuntimeException(String.format("Not supported Table %s", table.getClass().getName()));
}
public static <T> String joinElementsToString(Collection<T> values, String delimiter) {
StringJoiner builder = new StringJoiner(delimiter);
values.forEach(v -> builder.add(String.valueOf(v)));
return builder.toString();
}
public static int convertStrToInt(String str) {
return StringUtils.isNumeric(str) ? Integer.parseInt(str) : 0;
}
public static long convertStrToLong(String str) {
return StringUtils.isNumeric(str) ? Long.parseLong(str) : 0;
}
public static String getReadableTime(long timeInMs) {
if (timeInMs <= 0) {
return "";
}
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
return format.format(new Date(timeInMs));
}
@SafeVarargs
public static <T> String constructId(T... items) {
if (items == null || items.length == 0) {
return "";
}
List<String> idElements = Arrays.stream(items)
.map(String::valueOf)
.collect(Collectors.toList());
return StatisticsUtil.joinElementsToString(idElements, ID_DELIMITER);
}
public static String replaceParams(String template, Map<String, String> params) {
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
return stringSubstitutor.replace(template);
}
/**
* The health of the table indicates the health of the table statistics.
* When update_rows >= row_count, the health is 0;
* when update_rows < row_count, the health degree is 100 (1 - update_rows row_count).
*
* @param updatedRows The number of rows updated by the table
* @param totalRows The current number of rows in the table
* @return Health, the value range is [0, 100], the larger the value, the healthier the statistics of the table.
*/
public static int getTableHealth(long totalRows, long updatedRows) {
// Avoid analyze empty table every time.
if (totalRows == 0 && updatedRows == 0) {
return 100;
}
if (updatedRows >= totalRows) {
return 0;
} else {
double healthCoefficient = (double) (totalRows - updatedRows) / (double) totalRows;
return (int) (healthCoefficient * 100.0);
}
}
/**
* Estimate hive table row count.
* First get it from remote table parameters. If not found, estimate it : totalSize/estimatedRowSize
*
* @param table Hive HMSExternalTable to estimate row count.
* @return estimated row count
*/
public static long getHiveRowCount(HMSExternalTable table) {
Map<String, String> parameters = table.getRemoteTable().getParameters();
if (parameters == null) {
return TableIf.UNKNOWN_ROW_COUNT;
}
// Table parameters contains row count, simply get and return it.
long rows = getRowCountFromParameters(parameters);
if (rows > 0) {
LOG.info("Get row count {} for hive table {} in table parameters.", rows, table.getName());
return rows;
}
if (!parameters.containsKey(TOTAL_SIZE) && !parameters.containsKey(SPARK_TOTAL_SIZE)) {
return TableIf.UNKNOWN_ROW_COUNT;
}
// Table parameters doesn't contain row count but contain total size. Estimate row count : totalSize/rowSize
long totalSize = parameters.containsKey(TOTAL_SIZE) ? Long.parseLong(parameters.get(TOTAL_SIZE))
: Long.parseLong(parameters.get(SPARK_TOTAL_SIZE));
long estimatedRowSize = 0;
for (Column column : table.getFullSchema()) {
estimatedRowSize += column.getDataType().getSlotSize();
}
if (estimatedRowSize == 0) {
LOG.warn("Hive table {} estimated row size is invalid {}", table.getName(), estimatedRowSize);
return TableIf.UNKNOWN_ROW_COUNT;
}
rows = totalSize / estimatedRowSize;
LOG.info("Get row count {} for hive table {} by total size {} and row size {}",
rows, table.getName(), totalSize, estimatedRowSize);
return rows;
}
public static long getRowCountFromParameters(Map<String, String> parameters) {
if (parameters == null) {
return TableIf.UNKNOWN_ROW_COUNT;
}
// Table parameters contains row count, simply get and return it.
if (parameters.containsKey(NUM_ROWS)) {
long rows = Long.parseLong(parameters.get(NUM_ROWS));
if (rows <= 0 && parameters.containsKey(SPARK_NUM_ROWS)) {
rows = Long.parseLong(parameters.get(SPARK_NUM_ROWS));
}
// Sometimes, the NUM_ROWS in hms is 0 but actually is not. Need to check TOTAL_SIZE if NUM_ROWS is 0.
if (rows > 0) {
return rows;
}
}
return TableIf.UNKNOWN_ROW_COUNT;
}
/**
* Get total size parameter from HMS.
* @param table Hive HMSExternalTable to get HMS total size parameter.
* @return Long value of table total size, return 0 if not found.
*/
public static long getTotalSizeFromHMS(HMSExternalTable table) {
Map<String, String> parameters = table.getRemoteTable().getParameters();
if (parameters == null) {
return 0;
}
return parameters.containsKey(TOTAL_SIZE) ? Long.parseLong(parameters.get(TOTAL_SIZE)) : 0;
}
/**
* Get Iceberg column statistics.
*
* @param colName
* @param table Iceberg table.
* @return Optional Column statistic for the given column.
*/
public static Optional<ColumnStatistic> getIcebergColumnStats(String colName, org.apache.iceberg.Table table) {
TableScan tableScan = table.newScan().includeColumnStats();
double totalDataSize = 0;
double totalDataCount = 0;
double totalNumNull = 0;
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
for (FileScanTask task : fileScanTasks) {
int colId = getColId(task.spec(), colName);
totalDataSize += task.file().columnSizes().get(colId);
totalDataCount += task.file().recordCount();
totalNumNull += task.file().nullValueCounts().get(colId);
}
} catch (IOException e) {
LOG.warn("Error to close FileScanTask.", e);
}
ColumnStatisticBuilder columnStatisticBuilder = new ColumnStatisticBuilder(totalDataCount);
columnStatisticBuilder.setMaxValue(Double.POSITIVE_INFINITY);
columnStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY);
columnStatisticBuilder.setDataSize(totalDataSize);
columnStatisticBuilder.setAvgSizeByte(0);
columnStatisticBuilder.setNumNulls(totalNumNull);
if (columnStatisticBuilder.getCount() > 0) {
columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getDataSize()
/ columnStatisticBuilder.getCount());
}
return Optional.of(columnStatisticBuilder.build());
}
private static int getColId(PartitionSpec partitionSpec, String colName) {
int colId = -1;
for (Types.NestedField column : partitionSpec.schema().columns()) {
if (column.name().equals(colName)) {
colId = column.fieldId();
break;
}
}
if (colId == -1) {
throw new RuntimeException(String.format("Column %s not exist.", colName));
}
return colId;
}
public static boolean isUnsupportedType(Type type) {
if (ColumnStatistic.UNSUPPORTED_TYPE.contains(type)) {
return true;
}
return type instanceof ArrayType
|| type instanceof StructType
|| type instanceof MapType
|| type instanceof VariantType
|| type instanceof AggStateType;
}
public static boolean canCollectColumn(Column c, TableIf table, boolean isSampleAnalyze, long indexId) {
// Full analyze can collect all columns.
if (!isSampleAnalyze) {
return true;
}
// External table can collect all columns.
if (!(table instanceof OlapTable)) {
return true;
}
OlapTable olapTable = (OlapTable) table;
// Skip agg table value columns
KeysType keysType = olapTable.getIndexMetaByIndexId(indexId).getKeysType();
if (KeysType.AGG_KEYS.equals(keysType) && !c.isKey()) {
return false;
}
// Skip mor unique table value columns
return !KeysType.UNIQUE_KEYS.equals(keysType) || olapTable.isUniqKeyMergeOnWrite() || c.isKey();
}
public static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ignore) {
// IGNORE
}
}
public static String quote(String str) {
return "'" + str + "'";
}
public static boolean isMaster(Frontend frontend) {
InetSocketAddress socketAddress = new InetSocketAddress(frontend.getHost(), frontend.getEditLogPort());
return Env.getCurrentEnv().getHaProtocol().getLeader().equals(socketAddress);
}
public static String escapeSQL(String str) {
if (str == null) {
return null;
}
return str.replace("'", "''")
.replace("\\", "\\\\");
}
public static String escapeColumnName(String str) {
if (str == null) {
return null;
}
return str.replace("`", "``");
}
public static boolean isExternalTable(String catalogName, String dbName, String tblName) {
TableIf table;
try {
table = StatisticsUtil.findTable(catalogName, dbName, tblName);
} catch (Throwable e) {
LOG.warn(e.getMessage());
return false;
}
return table instanceof ExternalTable;
}
public static boolean isExternalTable(long catalogId, long dbId, long tblId) {
TableIf table;
try {
table = findTable(catalogId, dbId, tblId);
} catch (Throwable e) {
LOG.warn(e.getMessage());
return false;
}
return table instanceof ExternalTable;
}
public static boolean inAnalyzeTime(LocalTime now) {
try {
Pair<LocalTime, LocalTime> range = findConfigFromGlobalSessionVar();
if (range == null) {
return false;
}
LocalTime start = range.first;
LocalTime end = range.second;
if (start.isAfter(end) && (now.isAfter(start) || now.isBefore(end))) {
return true;
} else {
return now.isAfter(start) && now.isBefore(end);
}
} catch (DateTimeParseException e) {
LOG.warn("Parse analyze start/end time format fail", e);
return true;
}
}
private static Pair<LocalTime, LocalTime> findConfigFromGlobalSessionVar() {
try {
String startTime =
findConfigFromGlobalSessionVar(SessionVariable.AUTO_ANALYZE_START_TIME)
.autoAnalyzeStartTime;
// For compatibility
if (StringUtils.isEmpty(startTime)) {
startTime = StatisticConstants.FULL_AUTO_ANALYZE_START_TIME;
}
String endTime = findConfigFromGlobalSessionVar(SessionVariable.AUTO_ANALYZE_END_TIME)
.autoAnalyzeEndTime;
if (StringUtils.isEmpty(startTime)) {
endTime = StatisticConstants.FULL_AUTO_ANALYZE_END_TIME;
}
DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
return Pair.of(LocalTime.parse(startTime, timeFormatter), LocalTime.parse(endTime, timeFormatter));
} catch (Exception e) {
return null;
}
}
protected static SessionVariable findConfigFromGlobalSessionVar(String varName) throws Exception {
SessionVariable sessionVariable = VariableMgr.getDefaultSessionVariable();
VariableExpr variableExpr = new VariableExpr(varName, SetType.GLOBAL);
VariableMgr.getValue(sessionVariable, variableExpr);
return sessionVariable;
}
public static boolean enableAutoAnalyze() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.ENABLE_AUTO_ANALYZE).enableAutoAnalyze;
} catch (Exception e) {
LOG.warn("Fail to get value of enable auto analyze, return false by default", e);
}
return false;
}
public static boolean enableAutoAnalyzeInternalCatalog() {
try {
return findConfigFromGlobalSessionVar(
SessionVariable.ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG).enableAutoAnalyzeInternalCatalog;
} catch (Exception e) {
LOG.warn("Fail to get value of enable auto analyze internal catalog, return false by default", e);
}
return true;
}
public static boolean enablePartitionAnalyze() {
try {
return findConfigFromGlobalSessionVar(
SessionVariable.ENABLE_PARTITION_ANALYZE).enablePartitionAnalyze;
} catch (Exception e) {
LOG.warn("Fail to get value of enable partition analyze, return false by default", e);
}
return false;
}
public static boolean isEnableHboInfoCollection() {
try {
return findConfigFromGlobalSessionVar(
SessionVariable.ENABLE_HBO_INFO_COLLECTION).isEnableHboInfoCollection();
} catch (Exception e) {
LOG.warn("Fail to get value of enable hbo optimization, return false by default", e);
}
return false;
}
public static int getInsertMergeCount() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.STATS_INSERT_MERGE_ITEM_COUNT)
.statsInsertMergeItemCount;
} catch (Exception e) {
LOG.warn("Failed to get value of insert_merge_item_count, return default", e);
}
return StatisticConstants.INSERT_MERGE_ITEM_COUNT;
}
public static long getHugeTableSampleRows() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_DEFAULT_SAMPLE_ROWS)
.hugeTableDefaultSampleRows;
} catch (Exception e) {
LOG.warn("Failed to get value of huge_table_default_sample_rows, return default", e);
}
return StatisticConstants.HUGE_TABLE_DEFAULT_SAMPLE_ROWS;
}
public static long getHugeTableLowerBoundSizeInBytes() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES)
.hugeTableLowerBoundSizeInBytes;
} catch (Exception e) {
LOG.warn("Failed to get value of huge_table_lower_bound_size_in_bytes, return default", e);
}
return StatisticConstants.HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES;
}
public static long getHugePartitionLowerBoundRows() {
return GlobalVariable.hugePartitionLowerBoundRows;
}
public static int getPartitionAnalyzeBatchSize() {
return GlobalVariable.partitionAnalyzeBatchSize;
}
public static long getHugeTableAutoAnalyzeIntervalInMillis() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS)
.hugeTableAutoAnalyzeIntervalInMillis;
} catch (Exception e) {
LOG.warn("Failed to get value of huge_table_auto_analyze_interval_in_millis, return default", e);
}
return StatisticConstants.HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS;
}
public static long getExternalTableAutoAnalyzeIntervalInMillis() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS)
.externalTableAutoAnalyzeIntervalInMillis;
} catch (Exception e) {
LOG.warn("Failed to get value of externalTableAutoAnalyzeIntervalInMillis, return default", e);
}
return StatisticConstants.EXTERNAL_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS;
}
public static long getTableStatsHealthThreshold() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.TABLE_STATS_HEALTH_THRESHOLD)
.tableStatsHealthThreshold;
} catch (Exception e) {
LOG.warn("Failed to get value of table_stats_health_threshold, return default", e);
}
return StatisticConstants.TABLE_STATS_HEALTH_THRESHOLD;
}
public static int getAnalyzeTimeout() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.ANALYZE_TIMEOUT)
.analyzeTimeoutS;
} catch (Exception e) {
LOG.warn("Failed to get value of table_stats_health_threshold, return default", e);
}
return StatisticConstants.ANALYZE_TIMEOUT_IN_SEC;
}
public static int getAutoAnalyzeTableWidthThreshold() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD)
.autoAnalyzeTableWidthThreshold;
} catch (Exception e) {
LOG.warn("Failed to get value of auto_analyze_table_width_threshold, return default", e);
}
return StatisticConstants.AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD;
}
public static int getPartitionSampleCount() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.PARTITION_SAMPLE_COUNT).partitionSampleCount;
} catch (Exception e) {
LOG.warn("Fail to get value of partition_sample_count, return default", e);
}
return StatisticConstants.PARTITION_SAMPLE_COUNT;
}
public static long getPartitionSampleRowCount() {
try {
return findConfigFromGlobalSessionVar(SessionVariable.PARTITION_SAMPLE_ROW_COUNT).partitionSampleRowCount;
} catch (Exception e) {
LOG.warn("Fail to get value of partition_sample_row_count, return default", e);
}
return StatisticConstants.PARTITION_SAMPLE_ROW_COUNT;
}
public static String encodeValue(ResultRow row, int index) {
if (row == null || row.getValues().size() <= index) {
return "NULL";
}
return encodeString(row.get(index));
}
public static String encodeString(String value) {
if (value == null) {
return "NULL";
} else {
return Base64.getEncoder().encodeToString(value.getBytes(StandardCharsets.UTF_8));
}
}
/**
* Check if the given column name is a materialized view column.
* @param table
* @param columnName
* @return True for mv column.
*/
public static boolean isMvColumn(TableIf table, String columnName) {
return table instanceof OlapTable
&& columnName.startsWith(CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PREFIX)
|| columnName.startsWith(CreateMaterializedViewStmt.MATERIALIZED_VIEW_AGGREGATE_NAME_PREFIX);
}
public static boolean isEmptyTable(TableIf table, AnalysisInfo.AnalysisMethod method) {
int waitRowCountReportedTime = 120;
if (!(table instanceof OlapTable) || method.equals(AnalysisInfo.AnalysisMethod.FULL)) {
return false;
}
OlapTable olapTable = (OlapTable) table;
long rowCount = 0;
for (int i = 0; i < waitRowCountReportedTime; i++) {
rowCount = olapTable.getRowCountForIndex(olapTable.getBaseIndexId(), true);
// rowCount == -1 means new table or first load row count not fully reported, need to wait.
if (rowCount == -1) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.info("Sleep interrupted.");
}
continue;
}
break;
}
return rowCount == 0;
}
public static boolean needAnalyzeColumn(TableIf table, Pair<String, String> column) {
if (column == null) {
return false;
}
if (!table.autoAnalyzeEnabled()) {
return false;
}
AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
TableStatsMeta tableStatsStatus = manager.findTableStatsStatus(table.getId());
// Table never been analyzed, need analyze.
if (tableStatsStatus == null) {
return true;
}
// User injected column stats, don't do auto analyze, avoid overwrite user injected stats.
if (tableStatsStatus.userInjected) {
return false;
}
ColStatsMeta columnStatsMeta = tableStatsStatus.findColumnStatsMeta(column.first, column.second);
// Column never been analyzed, need analyze.
if (columnStatsMeta == null) {
return true;
}
// Partition table partition stats never been collected.
if (StatisticsUtil.enablePartitionAnalyze() && table.isPartitionedTable()
&& columnStatsMeta.partitionUpdateRows == null) {
return true;
}
if (table instanceof OlapTable) {
OlapTable olapTable = (OlapTable) table;
// 0. Check new partition first time loaded flag.
if (tableStatsStatus.partitionChanged.get()) {
return true;
}
// 1. Check row count.
// TODO: One conner case. Last analyze row count is 0, but actually it's not 0 because isEmptyTable waiting.
long currentRowCount = olapTable.getRowCount();
long lastAnalyzeRowCount = columnStatsMeta.rowCount;
// 1.1 Empty table -> non-empty table. Need analyze.
if (currentRowCount != 0 && lastAnalyzeRowCount == 0) {
return true;
}
// 1.2 Non-empty table -> empty table. Need analyze;
if (currentRowCount == 0 && lastAnalyzeRowCount != 0) {
return true;
}
// 1.3 Table is still empty. Not need to analyze. lastAnalyzeRowCount == 0 is always true here.
if (currentRowCount == 0) {
return false;
}
// 1.4 If row count changed more than the threshold, need analyze.
// lastAnalyzeRowCount == 0 is always false here.
double changeRate =
((double) Math.abs(currentRowCount - lastAnalyzeRowCount) / lastAnalyzeRowCount) * 100.0;
if (changeRate > (100 - StatisticsUtil.getTableStatsHealthThreshold())) {
return true;
}
// 2. Check update rows.
long currentUpdatedRows = tableStatsStatus.updatedRows.get();
long lastAnalyzeUpdateRows = columnStatsMeta.updatedRows;
changeRate = ((double) Math.abs(currentUpdatedRows - lastAnalyzeUpdateRows) / lastAnalyzeRowCount) * 100.0;
if (changeRate > (100 - StatisticsUtil.getTableStatsHealthThreshold())) {
return true;
}
// 3. Check partition
return needAnalyzePartition(olapTable, tableStatsStatus, columnStatsMeta);
} else {
// Now, we only support Hive external table auto analyze.
if (!(table instanceof HMSExternalTable)) {
return false;
}
HMSExternalTable hmsTable = (HMSExternalTable) table;
if (!hmsTable.getDlaType().equals(DLAType.HIVE)) {
return false;
}
// External is hard to calculate change rate, use time interval to control analyze frequency.
return System.currentTimeMillis()
- tableStatsStatus.lastAnalyzeTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis();
}
}
public static boolean needAnalyzePartition(OlapTable table, TableStatsMeta tableStatsStatus,
ColStatsMeta columnStatsMeta) {
if (!StatisticsUtil.enablePartitionAnalyze() || !table.isPartitionedTable()) {
return false;
}
if (tableStatsStatus.partitionChanged != null && tableStatsStatus.partitionChanged.get()) {
return true;
}
ConcurrentMap<Long, Long> partitionUpdateRows = columnStatsMeta.partitionUpdateRows;
if (partitionUpdateRows == null) {
return true;
}
Collection<Partition> partitions = table.getPartitions();
// New partition added or old partition deleted.
if (partitions.size() != partitionUpdateRows.size()) {
return true;
}
int changedPartitions = 0;
long hugePartitionLowerBoundRows = getHugePartitionLowerBoundRows();
for (Partition p : partitions) {
long id = p.getId();
// Skip partition that is too large.
if (p.getBaseIndex().getRowCount() > hugePartitionLowerBoundRows) {
continue;
}
// New partition added.
if (!partitionUpdateRows.containsKey(id)) {
return true;
}
// Former skipped large partition is not large anymore. Need to analyze it.
if (partitionUpdateRows.get(id) == -1) {
return true;
}
long currentUpdateRows = tableStatsStatus.partitionUpdateRows.getOrDefault(id, 0L);
long lastUpdateRows = partitionUpdateRows.get(id);
long changedRows = currentUpdateRows - lastUpdateRows;
if (changedRows > 0) {
changedPartitions++;
// Too much partition changed, need to reanalyze.
if (changedPartitions >= UPDATED_PARTITION_THRESHOLD) {
return true;
}
double changeRate = (((double) changedRows) / currentUpdateRows) * 100;
// One partition changed too much, need to reanalyze.
if (changeRate > (100 - StatisticsUtil.getTableStatsHealthThreshold())) {
return true;
}
}
}
return false;
}
// This function return true means the column hasn't been analyzed for longer than the configured time.
public static boolean isLongTimeColumn(TableIf table, Pair<String, String> column) {
if (column == null) {
return false;
}
if (!table.autoAnalyzeEnabled()) {
return false;
}
if (!(table instanceof OlapTable)) {
return false;
}
AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
TableStatsMeta tblStats = manager.findTableStatsStatus(table.getId());
// Table never been analyzed, skip it for higher priority jobs.
if (tblStats == null) {
LOG.warn("Table stats is null.");
return false;
}
ColStatsMeta columnStats = tblStats.findColumnStatsMeta(column.first, column.second);
if (columnStats == null) {
// Column never been analyzed, skip it for higher priority jobs.
return false;
}
// User injected column stats, don't do auto analyze, avoid overwrite user injected stats.
if (tblStats.userInjected) {
return false;
}
boolean isLongTime = Config.auto_analyze_interval_seconds > 0
&& System.currentTimeMillis() - columnStats.updatedTime > Config.auto_analyze_interval_seconds * 1000;
if (!isLongTime) {
return false;
}
// For olap table, if the table visible version and row count doesn't change since last analyze,
// we don't need to analyze it because its data is not changed.
OlapTable olapTable = (OlapTable) table;
return olapTable.getVisibleVersion() != columnStats.tableVersion
|| olapTable.getRowCount() != columnStats.rowCount;
}
public static boolean canCollect() {
return enableAutoAnalyze() && inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()));
}
}