BaseAnalysisTask.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.qe.AuditLogHelper;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.util.DBObjects;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
public abstract class BaseAnalysisTask {
public static final Logger LOG = LogManager.getLogger(BaseAnalysisTask.class);
public static final long LIMIT_SIZE = 1024 * 1024 * 1024; // 1GB
public static final double LIMIT_FACTOR = 1.2;
protected static final String FULL_ANALYZE_TEMPLATE =
"SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
+ " ${catalogId} AS `catalog_id`, "
+ " ${dbId} AS `db_id`, "
+ " ${tblId} AS `tbl_id`, "
+ " ${idxId} AS `idx_id`, "
+ " '${colId}' AS `col_id`, "
+ " NULL AS `part_id`, "
+ " COUNT(1) AS `row_count`, "
+ " NDV(`${colName}`) AS `ndv`, "
+ " COUNT(1) - COUNT(`${colName}`) AS `null_count`, "
+ " SUBSTRING(CAST(MIN(`${colName}`) AS STRING), 1, 1024) AS `min`, "
+ " SUBSTRING(CAST(MAX(`${colName}`) AS STRING), 1, 1024) AS `max`, "
+ " ${dataSizeFunction} AS `data_size`, "
+ " NOW() AS `update_time` "
+ " FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index}";
protected static final String LINEAR_ANALYZE_TEMPLATE = "SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
+ "${catalogId} AS `catalog_id`, "
+ "${dbId} AS `db_id`, "
+ "${tblId} AS `tbl_id`, "
+ "${idxId} AS `idx_id`, "
+ "'${colId}' AS `col_id`, "
+ "NULL AS `part_id`, "
+ "${rowCount} AS `row_count`, "
+ "${ndvFunction} as `ndv`, "
+ "ROUND(SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) * ${scaleFactor}) AS `null_count`, "
+ "SUBSTRING(CAST(${min} AS STRING), 1, 1024) AS `min`, "
+ "SUBSTRING(CAST(${max} AS STRING), 1, 1024) AS `max`, "
+ "${dataSizeFunction} * ${scaleFactor} AS `data_size`, "
+ "NOW() FROM ( "
+ "SELECT * FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index} ${sampleHints} ${limit}) "
+ "as t ${preAggHint}";
protected static final String DUJ1_ANALYZE_TEMPLATE = "SELECT "
+ "CONCAT('${tblId}', '-', '${idxId}', '-', '${colId}') AS `id`, "
+ "${catalogId} AS `catalog_id`, "
+ "${dbId} AS `db_id`, "
+ "${tblId} AS `tbl_id`, "
+ "${idxId} AS `idx_id`, "
+ "'${colId}' AS `col_id`, "
+ "NULL AS `part_id`, "
+ "${rowCount} AS `row_count`, "
+ "${ndvFunction} as `ndv`, "
+ "IFNULL(SUM(IF(`t1`.`column_key` IS NULL, `t1`.`count`, 0)), 0) * ${scaleFactor} as `null_count`, "
+ "SUBSTRING(CAST(${min} AS STRING), 1, 1024) AS `min`, "
+ "SUBSTRING(CAST(${max} AS STRING), 1, 1024) AS `max`, "
+ "${dataSizeFunction} * ${scaleFactor} AS `data_size`, "
+ "NOW() "
+ "FROM ( "
+ " SELECT t0.`colValue` as `column_key`, COUNT(1) as `count`, SUM(`len`) as `column_length` "
+ " FROM "
+ " (SELECT ${subStringColName} AS `colValue`, LENGTH(`${colName}`) as `len` "
+ " FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index} ${sampleHints} ${limit}) as `t0`"
+ " ${preAggHint}"
+ " GROUP BY `t0`.`colValue` "
+ ") as `t1` ";
protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE = " SELECT "
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
+ "${catalogId} AS `catalog_id`, "
+ "${dbId} AS `db_id`, "
+ "${tblId} AS `tbl_id`, "
+ "${idxId} AS `idx_id`, "
+ "'${colId}' AS `col_id`, "
+ "NULL AS `part_id`, "
+ "${row_count} AS `row_count`, "
+ "${ndv} AS `ndv`, "
+ "${null_count} AS `null_count`, "
+ "SUBSTRING(CAST(${min} AS STRING), 1, 1024) AS `min`, "
+ "SUBSTRING(CAST(${max} AS STRING), 1, 1024) AS `max`, "
+ "${data_size} AS `data_size`, "
+ "NOW() ";
protected static final String PARTITION_ANALYZE_TEMPLATE = " SELECT "
+ "${catalogId} AS `catalog_id`, "
+ "${dbId} AS `db_id`, "
+ "${tblId} AS `tbl_id`, "
+ "${idxId} AS `idx_id`, "
+ "${partName} AS `part_name`, "
+ "${partId} AS `part_id`, "
+ "'${colId}' AS `col_id`, "
+ "COUNT(1) AS `row_count`, "
+ "HLL_UNION(HLL_HASH(`${colName}`)) as ndv, "
+ "COUNT(1) - COUNT(`${colName}`) AS `null_count`, "
+ "SUBSTRING(CAST(MIN(`${colName}`) AS STRING), 1, 1024) AS `min`, "
+ "SUBSTRING(CAST(MAX(`${colName}`) AS STRING), 1, 1024) AS `max`, "
+ "${dataSizeFunction} AS `data_size`, "
+ "NOW() AS `update_time` "
+ " FROM `${catalogName}`.`${dbName}`.`${tblName}` ${index} ${partitionInfo}";
protected static final String MERGE_PARTITION_TEMPLATE =
"SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
+ "${catalogId} AS `catalog_id`, "
+ "${dbId} AS `db_id`, "
+ "${tblId} AS `tbl_id`, "
+ "${idxId} AS `idx_id`, "
+ "'${colId}' AS `col_id`, "
+ "NULL AS `part_id`, "
+ "SUM(count) AS `row_count`, "
+ "HLL_CARDINALITY(HLL_UNION(ndv)) AS `ndv`, "
+ "SUM(null_count) AS `null_count`, "
+ "MIN(${min}) AS `min`, "
+ "MAX(${max}) AS `max`, "
+ "SUM(data_size_in_bytes) AS `data_size`, "
+ "NOW() AS `update_time` FROM "
+ StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME
+ " WHERE `catalog_id` = ${catalogId} "
+ " AND `db_id` = ${dbId} "
+ " AND `tbl_id` = ${tblId} "
+ " AND `idx_id` = ${idxId} "
+ " AND `col_id` = '${colId}'";
protected AnalysisInfo info;
protected CatalogIf<? extends DatabaseIf<? extends TableIf>> catalog;
protected DatabaseIf<? extends TableIf> db;
protected TableIf tbl;
protected Column col;
protected StmtExecutor stmtExecutor;
protected volatile boolean killed;
protected TableSample tableSample = null;
protected AnalysisJob job;
@VisibleForTesting
public BaseAnalysisTask() {
}
public BaseAnalysisTask(AnalysisInfo info) {
this.info = info;
init(info);
}
protected void init(AnalysisInfo info) {
DBObjects dbObjects = StatisticsUtil.convertIdToObjects(info.catalogId, info.dbId, info.tblId);
catalog = dbObjects.catalog;
db = dbObjects.db;
tbl = dbObjects.table;
tableSample = getTableSample();
if (info.analysisType != null && (info.analysisType.equals(AnalysisType.FUNDAMENTALS)
|| info.analysisType.equals(AnalysisType.HISTOGRAM))) {
col = tbl.getColumn(info.colName);
if (col == null) {
throw new RuntimeException(String.format("Column with name %s not exists in table %s",
info.colName, tbl.getName()));
}
Preconditions.checkArgument(!StatisticsUtil.isUnsupportedType(col.getType()),
String.format("Column with type %s is not supported", col.getType().toString()));
}
}
public void execute() throws Exception {
prepareExecution();
doExecute();
afterExecution();
}
protected void prepareExecution() {
setTaskStateToRunning();
}
public abstract void doExecute() throws Exception;
protected abstract void doSample() throws Exception;
protected void afterExecution() {}
protected void setTaskStateToRunning() {
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(info, AnalysisState.RUNNING, "", System.currentTimeMillis());
}
public void cancel() {
killed = true;
if (stmtExecutor != null) {
stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "analysis task cancelled"));
}
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(info, AnalysisState.FAILED,
String.format("Job has been cancelled: %s", info.message), System.currentTimeMillis());
}
public long getJobId() {
return info.jobId;
}
protected String getDataSizeFunction(Column column, boolean useDuj1) {
if (useDuj1) {
if (column.getType().isStringType()) {
return "SUM(`column_length`)";
} else {
return "SUM(t1.count) * " + column.getType().getSlotSize();
}
} else {
if (column.getType().isStringType()) {
return "SUM(LENGTH(`${colName}`))";
} else {
return "COUNT(1) * " + column.getType().getSlotSize();
}
}
}
protected String getStringTypeColName(Column column) {
if (column.getType().isStringType()) {
return "xxhash_64(SUBSTRING(CAST(`${colName}` AS STRING), 1, 1024))";
} else {
return "`${colName}`";
}
}
protected String getMinFunction() {
if (tableSample == null) {
return "CAST(MIN(`${colName}`) as ${type}) ";
} else {
// Min value is not accurate while sample, so set it to NULL to avoid optimizer generate bad plan.
return "NULL";
}
}
protected String getNdvFunction(String totalRows) {
String sampleRows = "SUM(`t1`.`count`)";
String onceCount = "SUM(IF(`t1`.`count` = 1, 1, 0))";
String countDistinct = "COUNT(1)";
// DUJ1 estimator: n*d / (n - f1 + f1*n/N)
// f1 is the count of element that appears only once in the sample.
// (https://github.com/postgres/postgres/blob/master/src/backend/commands/analyze.c)
// (http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.93.8637&rep=rep1&type=pdf)
// sample_row * count_distinct / ( sample_row - once_count + once_count * sample_row / total_row)
return MessageFormat.format("{0} * {1} / ({0} - {2} + {2} * {0} / {3})", sampleRows,
countDistinct, onceCount, totalRows);
}
// Max value is not accurate while sample, so set it to NULL to avoid optimizer generate bad plan.
protected String getMaxFunction() {
if (tableSample == null) {
return "CAST(MAX(`${colName}`) as ${type}) ";
} else {
return "NULL";
}
}
protected TableSample getTableSample() {
if (info.forceFull) {
return null;
}
// If user specified sample percent or sample rows, use it.
if (info.samplePercent > 0) {
return new TableSample(true, (long) info.samplePercent);
} else if (info.sampleRows > 0) {
return new TableSample(false, info.sampleRows);
} else {
return null;
}
}
@Override
public String toString() {
return String.format("Job id [%d], Task id [%d], catalog [%s], db [%s], table [%s], column [%s]",
info.jobId, info.taskId, catalog.getName(), db.getFullName(), tbl.getName(),
col == null ? "TableRowCount" : col.getName());
}
public void setJob(AnalysisJob job) {
this.job = job;
}
/**
* 1. Remove not exist partition stats
* 2. Get stats of each partition
* 3. insert partition in batch
* 4. Skip large partition and fallback to sample analyze if large partition exists.
* 5. calculate column stats based on partition stats
*/
protected void doPartitionTable() throws Exception {
AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager();
TableStatsMeta tableStatsStatus = analysisManager.findTableStatsStatus(tbl.getId());
// Find jobInfo for this task.
AnalysisInfo jobInfo = analysisManager.findJobInfo(job.getJobInfo().jobId);
// For sync job, get jobInfo from job.jobInfo.
boolean isSync = jobInfo == null;
jobInfo = isSync ? job.jobInfo : jobInfo;
// 1. Remove not exist partition stats
deleteNotExistPartitionStats(jobInfo);
// 2. Get stats of each partition
Map<String, String> params = buildSqlParams();
params.put("dataSizeFunction", getDataSizeFunction(col, false));
boolean isAllPartitions = info.partitionNames.isEmpty();
Set<String> partitionNames = isAllPartitions ? tbl.getPartitionNames() : info.partitionNames;
List<String> sqls = Lists.newArrayList();
Set<String> partNames = Sets.newHashSet();
int count = 0;
long batchRowCount = 0;
String idxName = info.indexId == -1 ? tbl.getName() : ((OlapTable) tbl).getIndexNameById(info.indexId);
ColStatsMeta columnStatsMeta = tableStatsStatus == null
? null : tableStatsStatus.findColumnStatsMeta(idxName, col.getName());
boolean hasHughPartition = false;
long hugePartitionThreshold = StatisticsUtil.getHugePartitionLowerBoundRows();
int partitionBatchSize = StatisticsUtil.getPartitionAnalyzeBatchSize();
for (String part : partitionNames) {
// External table partition is null.
Partition partition = tbl.getPartition(part);
if (partition != null) {
// For huge partition, skip analyze it.
long partitionRowCount = partition.getBaseIndex().getRowCount();
if (partitionRowCount > hugePartitionThreshold && AnalysisInfo.JobType.SYSTEM.equals(info.jobType)) {
hasHughPartition = true;
// -1 means it's skipped because this partition is too large.
jobInfo.partitionUpdateRows.putIfAbsent(partition.getId(), -1L);
LOG.info("Partition {} in table {} is too large, skip it.", part, tbl.getName());
continue;
}
batchRowCount += partitionRowCount;
// For cluster upgrade compatible (older version metadata doesn't have partition update rows map)
// and insert before first analyze, set partition update rows to 0.
jobInfo.partitionUpdateRows.putIfAbsent(partition.getId(), 0L);
}
params.put("partId", partition == null ? "-1" : String.valueOf(partition.getId()));
// Skip partitions that not changed after last analyze.
// External table getPartition always return null. So external table doesn't skip any partitions.
if (partition != null && tableStatsStatus != null && tableStatsStatus.partitionUpdateRows != null) {
ConcurrentMap<Long, Long> tableUpdateRows = tableStatsStatus.partitionUpdateRows;
if (columnStatsMeta != null && columnStatsMeta.partitionUpdateRows != null) {
ConcurrentMap<Long, Long> columnUpdateRows = columnStatsMeta.partitionUpdateRows;
long id = partition.getId();
if (Objects.equals(tableUpdateRows.getOrDefault(id, 0L), columnUpdateRows.get(id))) {
LOG.debug("Partition {} doesn't change after last analyze for column {}, skip it.",
part, col.getName());
continue;
}
}
}
params.put("partName", "'" + StatisticsUtil.escapeColumnName(part) + "'");
params.put("partitionInfo", getPartitionInfo(part));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
sqls.add(stringSubstitutor.replace(PARTITION_ANALYZE_TEMPLATE));
count++;
partNames.add(part);
// 3. insert partition in batch
if (count == partitionBatchSize || batchRowCount > hugePartitionThreshold) {
String sql = "INSERT INTO " + StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME
+ Joiner.on(" UNION ALL ").join(sqls);
runInsert(sql);
analysisManager.updatePartitionStatsCache(info.catalogId, info.dbId, info.tblId, info.indexId,
partNames, info.colName);
partNames.clear();
sqls.clear();
count = 0;
batchRowCount = 0;
}
}
// 3. insert partition in batch
if (count > 0) {
String sql = "INSERT INTO " + StatisticConstants.FULL_QUALIFIED_PARTITION_STATS_TBL_NAME
+ Joiner.on(" UNION ALL ").join(sqls);
runInsert(sql);
analysisManager.updatePartitionStatsCache(info.catalogId, info.dbId, info.tblId, info.indexId,
partNames, info.colName);
}
// 4. Skip large partition and fallback to sample analyze if large partition exists.
if (hasHughPartition) {
tableSample = new TableSample(false, StatisticsUtil.getHugeTableSampleRows());
if (!isSync) {
long startTime = jobInfo.startTime;
jobInfo = new AnalysisInfoBuilder(jobInfo).setAnalysisMethod(AnalysisMethod.SAMPLE).build();
jobInfo.markStartTime(startTime);
analysisManager.replayCreateAnalysisJob(jobInfo);
}
if (tableStatsStatus == null || columnStatsMeta == null
|| tableStatsStatus.updatedRows.get() > columnStatsMeta.updatedRows) {
doSample();
} else {
job.taskDoneWithoutData(this);
}
} else {
// 5. calculate column stats based on partition stats
if (isAllPartitions) {
params = buildSqlParams();
params.put("min", castToNumeric("min"));
params.put("max", castToNumeric("max"));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
runQuery(stringSubstitutor.replace(MERGE_PARTITION_TEMPLATE));
} else {
job.taskDoneWithoutData(this);
}
}
}
protected abstract void deleteNotExistPartitionStats(AnalysisInfo jobInfo) throws DdlException;
protected String getPartitionInfo(String partitionName) {
return "";
}
protected Map<String, String> buildSqlParams() {
return Maps.newHashMap();
}
protected String castToNumeric(String colName) {
Type type = col.getType();
if (type.isNumericType()) {
return "CAST(" + colName + " AS " + type.toSql() + ")";
} else {
return colName;
}
}
protected void runQuery(String sql) {
long startTime = System.currentTimeMillis();
String queryId = "";
try (AutoCloseConnectContext a = StatisticsUtil.buildConnectContext(false)) {
stmtExecutor = new StmtExecutor(a.connectContext, sql);
ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0));
if (!colStatsData.isValid()) {
String message = String.format("ColStatsData is invalid, skip analyzing. %s", colStatsData.toSQL(true));
LOG.warn(message);
throw new RuntimeException(message);
}
// Update index row count after analyze.
if (this instanceof OlapAnalysisTask) {
AnalysisInfo jobInfo = Env.getCurrentEnv().getAnalysisManager().findJobInfo(job.getJobInfo().jobId);
// For sync job, get jobInfo from job.jobInfo.
jobInfo = jobInfo == null ? job.jobInfo : jobInfo;
long indexId = info.indexId == -1 ? ((OlapTable) tbl).getBaseIndexId() : info.indexId;
jobInfo.addIndexRowCount(indexId, colStatsData.count);
}
Env.getCurrentEnv().getStatisticsCache().syncColStats(colStatsData);
queryId = DebugUtil.printId(stmtExecutor.getContext().queryId());
job.appendBuf(this, Collections.singletonList(colStatsData));
} catch (Exception e) {
LOG.warn("Failed to execute sql {}", sql);
throw e;
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug("End cost time in millisec: " + (System.currentTimeMillis() - startTime)
+ " Analyze SQL: " + sql + " QueryId: " + queryId);
}
// Release the reference to stmtExecutor, reduce memory usage.
stmtExecutor = null;
}
}
protected void runInsert(String sql) throws Exception {
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
stmtExecutor = new StmtExecutor(r.connectContext, sql);
try {
stmtExecutor.execute();
QueryState queryState = stmtExecutor.getContext().getState();
if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
throw new RuntimeException(
"Failed to insert : " + stmtExecutor.getOriginStmt().originStmt + "Error msg: "
+ queryState.getErrorMessage());
}
} finally {
AuditLogHelper.logAuditLog(stmtExecutor.getContext(), stmtExecutor.getOriginStmt().toString(),
stmtExecutor.getParsedStmt(), stmtExecutor.getQueryStatisticsForAuditLog(), true);
}
}
}
}