AnalysisManager.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeDBStmt;
import org.apache.doris.analysis.AnalyzeProperties;
import org.apache.doris.analysis.AnalyzeStmt;
import org.apache.doris.analysis.AnalyzeTblStmt;
import org.apache.doris.analysis.DropAnalyzeJobStmt;
import org.apache.doris.analysis.DropCachedStatsStmt;
import org.apache.doris.analysis.DropStatsStmt;
import org.apache.doris.analysis.KillAnalysisJobStmt;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.ShowAnalyzeStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.View;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.commands.AnalyzeCommand;
import org.apache.doris.nereids.trees.plans.commands.AnalyzeDatabaseCommand;
import org.apache.doris.nereids.trees.plans.commands.AnalyzeTableCommand;
import org.apache.doris.nereids.trees.plans.commands.DropAnalyzeJobCommand;
import org.apache.doris.nereids.trees.plans.commands.DropStatsCommand;
import org.apache.doris.nereids.trees.plans.commands.KillAnalyzeJobCommand;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionNamesInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.persist.AnalyzeDeletionLog;
import org.apache.doris.persist.TableStatsDeletionLog;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
import org.apache.doris.statistics.util.DBObjects;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
import org.apache.doris.thrift.TQueryColumn;
import org.apache.doris.thrift.TUpdateFollowerPartitionStatsCacheRequest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.CronExpression;
import org.jetbrains.annotations.Nullable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class AnalysisManager implements Writable {
private static final Logger LOG = LogManager.getLogger(AnalysisManager.class);
public static final int COLUMN_QUEUE_SIZE = 1000;
public final Queue<QueryColumn> highPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
public final Queue<QueryColumn> midPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
// Map<TableName, Set<Pair<IndexName, ColumnName>>>
public final Map<TableName, Set<Pair<String, String>>> highPriorityJobs = new LinkedHashMap<>();
public final Map<TableName, Set<Pair<String, String>>> midPriorityJobs = new LinkedHashMap<>();
public final Map<TableName, Set<Pair<String, String>>> lowPriorityJobs = new LinkedHashMap<>();
public final Map<TableName, Set<Pair<String, String>>> veryLowPriorityJobs = new LinkedHashMap<>();
// Tracking running manually submitted async tasks, keep in mem only
protected final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>> analysisJobIdToTaskMap = new ConcurrentHashMap<>();
private StatisticsCache statisticsCache;
private AnalysisTaskExecutor taskExecutor;
private ThreadPoolExecutor dropStatsExecutors;
// Store task information in metadata.
protected final NavigableMap<Long, AnalysisInfo> analysisTaskInfoMap =
Collections.synchronizedNavigableMap(new TreeMap<>());
// Store job information in metadata.
protected final NavigableMap<Long, AnalysisInfo> analysisJobInfoMap =
Collections.synchronizedNavigableMap(new TreeMap<>());
// Tracking and control sync analyze tasks, keep in mem only
private final ConcurrentMap<ConnectContext, SyncTaskCollection> ctxToSyncTask = new ConcurrentHashMap<>();
private final Map<Long, TableStatsMeta> idToTblStats = new ConcurrentHashMap<>();
private final Map<Long, AnalysisJob> idToAnalysisJob = new ConcurrentHashMap<>();
private final String progressDisplayTemplate = "%d Finished | %d Failed | %d In Progress | %d Total";
public AnalysisManager() {
if (!Env.isCheckpointThread()) {
this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num,
Integer.MAX_VALUE, "Manual Analysis Job Executor");
this.statisticsCache = new StatisticsCache();
this.dropStatsExecutors = ThreadPoolManager.newDaemonThreadPool(
1, 3, 10,
TimeUnit.DAYS, new LinkedBlockingQueue<>(20),
new ThreadPoolExecutor.DiscardPolicy(),
"Drop stats executor", true);
}
}
public StatisticsCache getStatisticsCache() {
return statisticsCache;
}
public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy) throws DdlException, AnalysisException {
if (!StatisticsUtil.statsTblAvailable() && !FeConstants.runningUnitTest) {
throw new DdlException("Stats table not available, please make sure your cluster status is normal");
}
if (ConnectContext.get().getSessionVariable().forceSampleAnalyze) {
analyzeStmt.checkAndSetSample();
}
if (analyzeStmt instanceof AnalyzeDBStmt) {
createAnalysisJobs((AnalyzeDBStmt) analyzeStmt, proxy);
} else if (analyzeStmt instanceof AnalyzeTblStmt) {
createAnalysisJob((AnalyzeTblStmt) analyzeStmt, proxy);
}
}
// for nereids analyze database/table
public void createAnalyze(AnalyzeCommand command, boolean proxy) throws DdlException, AnalysisException {
if (!StatisticsUtil.statsTblAvailable() && !FeConstants.runningUnitTest) {
throw new DdlException("Stats table not available, please make sure your cluster status is normal");
}
if (ConnectContext.get().getSessionVariable().forceSampleAnalyze) {
command.checkAndSetSample();
}
if (command instanceof AnalyzeDatabaseCommand) {
createAnalysisJobs((AnalyzeDatabaseCommand) command, proxy);
} else if (command instanceof AnalyzeTableCommand) {
createAnalysisJob((AnalyzeTableCommand) command, proxy);
}
}
// for nereids analyze database/table
public void createAnalysisJobs(AnalyzeDatabaseCommand command, boolean proxy) throws AnalysisException {
DatabaseIf<TableIf> db = command.getDb();
List<AnalysisInfo> analysisInfos = buildAnalysisInfosForNereidsDB(db, command.getAnalyzeProperties());
if (!command.isSync()) {
sendJobId(analysisInfos, proxy);
}
}
// for nereids analyze database/table
public void createAnalysisJob(AnalyzeTableCommand command, boolean proxy) throws DdlException {
// Using auto analyzer if user specifies.
if ("true".equalsIgnoreCase(command.getAnalyzeProperties()
.getProperties().get("use.auto.analyzer"))) {
Env.getCurrentEnv().getStatisticsAutoCollector().processOneJob(command.getTable(),
command.getTable()
.getColumnIndexPairs(command.getColumnNames()), JobPriority.HIGH);
return;
}
AnalysisInfo jobInfo = buildAndAssignJob(command);
if (jobInfo == null) {
return;
}
sendJobId(ImmutableList.of(jobInfo), proxy);
}
// for nereids analyze database/table
@Nullable
@VisibleForTesting
protected AnalysisInfo buildAndAssignJob(AnalyzeTableCommand command) throws DdlException {
AnalysisInfo jobInfo = buildAnalysisJobInfo(command);
if (jobInfo.jobColumns == null || jobInfo.jobColumns.isEmpty()) {
// No statistics need to be collected or updated
LOG.info("Job columns are empty, skip analyze table {}", command.getTblName().toString());
return null;
}
// Only OlapTable and Hive HMSExternalTable support sample analyze.
if ((command.getSamplePercent() > 0 || command.getSampleRows() > 0) && !canSample(command.getTable())) {
String message = String.format("Table %s doesn't support sample analyze.", command.getTable().getName());
LOG.info(message);
throw new DdlException(message);
}
boolean isSync = command.isSync();
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
constructJob(jobInfo, analysisTaskInfos.values());
if (isSync) {
syncExecute(analysisTaskInfos.values());
jobInfo.state = AnalysisState.FINISHED;
updateTableStats(jobInfo);
return null;
}
recordAnalysisJob(jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
if (!jobInfo.scheduleType.equals(ScheduleType.PERIOD)) {
analysisTaskInfos.values().forEach(taskExecutor::submitTask);
}
return jobInfo;
}
// for nereids analyze database/table
public List<AnalysisInfo> buildAnalysisInfosForNereidsDB(DatabaseIf<TableIf> db,
AnalyzeProperties analyzeProperties) throws AnalysisException {
List<TableIf> tbls = db.getTables();
List<AnalysisInfo> analysisInfos = new ArrayList<>();
List<AnalyzeTableCommand> commands = new ArrayList<>();
for (TableIf table : tbls) {
if (table instanceof View) {
continue;
}
TableNameInfo tableNameInfo = new TableNameInfo(db.getCatalog().getName(),
db.getFullName(), table.getName());
// columnNames null means to add all visible columns.
// Will get all the visible columns in analyzeTableOp.check()
AnalyzeTableCommand command = new AnalyzeTableCommand(analyzeProperties, tableNameInfo,
null, db.getId(), table);
try {
command.check();
} catch (AnalysisException analysisException) {
LOG.warn("Failed to build analyze job: {}",
analysisException.getMessage(), analysisException);
}
commands.add(command);
}
for (AnalyzeTableCommand command : commands) {
try {
analysisInfos.add(buildAndAssignJob(command));
} catch (DdlException e) {
LOG.warn("Failed to build analyze job: {}",
e.getMessage(), e);
}
}
return analysisInfos;
}
public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt, boolean proxy) throws AnalysisException {
DatabaseIf<TableIf> db = analyzeDBStmt.getDb();
List<AnalysisInfo> analysisInfos = buildAnalysisInfosForDB(db, analyzeDBStmt.getAnalyzeProperties());
if (!analyzeDBStmt.isSync()) {
sendJobId(analysisInfos, proxy);
}
}
public List<AnalysisInfo> buildAnalysisInfosForDB(DatabaseIf<TableIf> db, AnalyzeProperties analyzeProperties)
throws AnalysisException {
List<TableIf> tbls = db.getTables();
List<AnalysisInfo> analysisInfos = new ArrayList<>();
List<AnalyzeTblStmt> analyzeStmts = new ArrayList<>();
for (TableIf table : tbls) {
if (table instanceof View) {
continue;
}
TableName tableName = new TableName(db.getCatalog().getName(), db.getFullName(), table.getName());
// columnNames null means to add all visible columns.
// Will get all the visible columns in analyzeTblStmt.check()
AnalyzeTblStmt analyzeTblStmt = new AnalyzeTblStmt(analyzeProperties, tableName,
null, db.getId(), table);
try {
analyzeTblStmt.check();
} catch (AnalysisException analysisException) {
LOG.warn("Failed to build analyze job: {}",
analysisException.getMessage(), analysisException);
}
analyzeStmts.add(analyzeTblStmt);
}
for (AnalyzeTblStmt analyzeTblStmt : analyzeStmts) {
try {
analysisInfos.add(buildAndAssignJob(analyzeTblStmt));
} catch (DdlException e) {
LOG.warn("Failed to build analyze job: {}",
e.getMessage(), e);
}
}
return analysisInfos;
}
// Each analyze stmt corresponding to an analysis job.
public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws DdlException {
// Using auto analyzer if user specifies.
if ("true".equalsIgnoreCase(stmt.getAnalyzeProperties().getProperties().get("use.auto.analyzer"))) {
Env.getCurrentEnv().getStatisticsAutoCollector()
.processOneJob(stmt.getTable(),
stmt.getTable().getColumnIndexPairs(stmt.getColumnNames()), JobPriority.HIGH);
return;
}
AnalysisInfo jobInfo = buildAndAssignJob(stmt);
if (jobInfo == null) {
return;
}
sendJobId(ImmutableList.of(jobInfo), proxy);
}
@Nullable
@VisibleForTesting
protected AnalysisInfo buildAndAssignJob(AnalyzeTblStmt stmt) throws DdlException {
AnalysisInfo jobInfo = buildAnalysisJobInfo(stmt);
if (jobInfo.jobColumns == null || jobInfo.jobColumns.isEmpty()) {
// No statistics need to be collected or updated
LOG.info("Job columns are empty, skip analyze table {}", stmt.getTblName().toString());
return null;
}
// Only OlapTable and Hive HMSExternalTable support sample analyze.
if ((stmt.getSamplePercent() > 0 || stmt.getSampleRows() > 0) && !canSample(stmt.getTable())) {
String message = String.format("Table %s doesn't support sample analyze.", stmt.getTable().getName());
LOG.info(message);
throw new DdlException(message);
}
boolean isSync = stmt.isSync();
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
constructJob(jobInfo, analysisTaskInfos.values());
if (isSync) {
syncExecute(analysisTaskInfos.values());
jobInfo.state = AnalysisState.FINISHED;
updateTableStats(jobInfo);
return null;
}
recordAnalysisJob(jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos);
if (!jobInfo.scheduleType.equals(ScheduleType.PERIOD)) {
analysisTaskInfos.values().forEach(taskExecutor::submitTask);
}
return jobInfo;
}
private void sendJobId(List<AnalysisInfo> analysisInfos, boolean proxy) {
List<Column> columns = new ArrayList<>();
columns.add(new Column("Job_Id", ScalarType.createVarchar(19)));
columns.add(new Column("Catalog_Name", ScalarType.createVarchar(1024)));
columns.add(new Column("DB_Name", ScalarType.createVarchar(1024)));
columns.add(new Column("Table_Name", ScalarType.createVarchar(1024)));
columns.add(new Column("Columns", ScalarType.createVarchar(1024)));
ShowResultSetMetaData commonResultSetMetaData = new ShowResultSetMetaData(columns);
List<List<String>> resultRows = new ArrayList<>();
for (AnalysisInfo analysisInfo : analysisInfos) {
if (analysisInfo == null) {
continue;
}
List<String> row = new ArrayList<>();
row.add(String.valueOf(analysisInfo.jobId));
CatalogIf<? extends DatabaseIf<? extends TableIf>> c = StatisticsUtil.findCatalog(analysisInfo.catalogId);
row.add(c.getName());
Optional<? extends DatabaseIf<? extends TableIf>> databaseIf = c.getDb(analysisInfo.dbId);
row.add(databaseIf.isPresent() ? databaseIf.get().getFullName() : "DB may get deleted");
if (databaseIf.isPresent()) {
Optional<? extends TableIf> table = databaseIf.get().getTable(analysisInfo.tblId);
row.add(table.isPresent() ? Util.getTempTableDisplayName(table.get().getName())
: "Table may get deleted");
} else {
row.add("DB not exists anymore");
}
String colNames = analysisInfo.colName;
StringBuffer sb = new StringBuffer();
if (colNames != null) {
for (String columnName : colNames.split(",")) {
String[] kv = columnName.split(":");
sb.append(Util.getTempTableDisplayName(kv[0]))
.append(":").append(kv[1]).append(",");
}
}
String newColNames = sb.toString();
newColNames = StringUtils.isEmpty(newColNames) ? "" : newColNames.substring(0, newColNames.length() - 1);
row.add(newColNames);
resultRows.add(row);
}
ShowResultSet commonResultSet = new ShowResultSet(commonResultSetMetaData, resultRows);
try {
if (!proxy) {
ConnectContext.get().getExecutor().sendResultSet(commonResultSet);
} else {
ConnectContext.get().getExecutor().setProxyShowResultSet(commonResultSet);
}
} catch (Throwable t) {
LOG.warn("Failed to send job id to user", t);
}
}
// Make sure colName of job has all the column as this AnalyzeStmt specified, no matter whether it will be analyzed
// or not.
@VisibleForTesting
public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) {
AnalysisInfoBuilder infoBuilder = new AnalysisInfoBuilder();
long jobId = Env.getCurrentEnv().getNextId();
TableIf table = stmt.getTable();
Set<String> columnNames = stmt.getColumnNames();
boolean partitionOnly = stmt.isPartitionOnly();
boolean isSamplingPartition = stmt.isSamplingPartition();
boolean isAllPartition = stmt.isStarPartition();
long partitionCount = stmt.getPartitionCount();
int samplePercent = stmt.getSamplePercent();
int sampleRows = stmt.getSampleRows();
AnalysisType analysisType = stmt.getAnalysisType();
AnalysisMethod analysisMethod = stmt.getAnalysisMethod();
ScheduleType scheduleType = stmt.getScheduleType();
CronExpression cronExpression = stmt.getCron();
infoBuilder.setJobId(jobId);
infoBuilder.setTaskId(-1);
infoBuilder.setCatalogId(stmt.getCatalogId());
infoBuilder.setDBId(stmt.getDbId());
infoBuilder.setTblId(stmt.getTable().getId());
infoBuilder.setPartitionNames(stmt.getPartitionNames());
infoBuilder.setPartitionOnly(partitionOnly);
infoBuilder.setSamplingPartition(isSamplingPartition);
infoBuilder.setAllPartition(isAllPartition);
infoBuilder.setPartitionCount(partitionCount);
infoBuilder.setJobType(JobType.MANUAL);
infoBuilder.setState(AnalysisState.PENDING);
infoBuilder.setLastExecTimeInMs(System.currentTimeMillis());
infoBuilder.setAnalysisType(analysisType);
infoBuilder.setAnalysisMethod(analysisMethod);
infoBuilder.setScheduleType(scheduleType);
infoBuilder.setCronExpression(cronExpression);
infoBuilder.setForceFull(stmt.forceFull());
infoBuilder.setUsingSqlForExternalTable(stmt.usingSqlForExternalTable());
if (analysisMethod == AnalysisMethod.SAMPLE) {
infoBuilder.setSamplePercent(samplePercent);
infoBuilder.setSampleRows(sampleRows);
}
if (analysisType == AnalysisType.HISTOGRAM) {
int numBuckets = stmt.getNumBuckets();
int maxBucketNum = numBuckets > 0 ? numBuckets : StatisticConstants.HISTOGRAM_MAX_BUCKET_NUM;
infoBuilder.setMaxBucketNum(maxBucketNum);
}
long periodTimeInMs = stmt.getPeriodTimeInMs();
infoBuilder.setPeriodTimeInMs(periodTimeInMs);
OlapTable olapTable = table instanceof OlapTable ? (OlapTable) table : null;
boolean isSampleAnalyze = analysisMethod.equals(AnalysisMethod.SAMPLE);
Set<Pair<String, String>> jobColumns = table.getColumnIndexPairs(columnNames).stream()
.filter(c -> olapTable == null || StatisticsUtil.canCollectColumn(
olapTable.getIndexMetaByIndexId(olapTable.getIndexIdByName(c.first)).getColumnByName(c.second),
table, isSampleAnalyze, olapTable.getIndexIdByName(c.first)))
.collect(Collectors.toSet());
infoBuilder.setJobColumns(jobColumns);
StringJoiner stringJoiner = new StringJoiner(",", "[", "]");
for (Pair<String, String> pair : jobColumns) {
stringJoiner.add(pair.toString());
}
infoBuilder.setColName(stringJoiner.toString());
infoBuilder.setTaskIds(Lists.newArrayList());
infoBuilder.setTblUpdateTime(table.getUpdateTime());
// Empty table row count is 0. Call fetchRowCount() when getRowCount() returns <= 0,
// because getRowCount may return <= 0 if cached is not loaded. This is mainly for external table.
long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 :
(table.getRowCount() <= 0 ? table.fetchRowCount() : table.getRowCount());
infoBuilder.setRowCount(rowCount);
TableStatsMeta tableStatsStatus = findTableStatsStatus(table.getId());
infoBuilder.setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get());
infoBuilder.setTableVersion(table instanceof OlapTable ? ((OlapTable) table).getVisibleVersion() : 0);
infoBuilder.setPriority(JobPriority.MANUAL);
infoBuilder.setPartitionUpdateRows(tableStatsStatus == null ? null : tableStatsStatus.partitionUpdateRows);
infoBuilder.setEnablePartition(StatisticsUtil.enablePartitionAnalyze());
return infoBuilder.build();
}
// for nereids analyze database/table
@VisibleForTesting
public AnalysisInfo buildAnalysisJobInfo(AnalyzeTableCommand command) {
AnalysisInfoBuilder infoBuilder = new AnalysisInfoBuilder();
long jobId = Env.getCurrentEnv().getNextId();
TableIf table = command.getTable();
Set<String> columnNames = command.getColumnNames();
boolean partitionOnly = command.isPartitionOnly();
boolean isSamplingPartition = command.isSamplingPartition();
boolean isAllPartition = command.isStarPartition();
long partitionCount = command.getPartitionCount();
int samplePercent = command.getSamplePercent();
int sampleRows = command.getSampleRows();
AnalysisType analysisType = command.getAnalysisType();
AnalysisMethod analysisMethod = command.getAnalysisMethod();
ScheduleType scheduleType = command.getScheduleType();
CronExpression cronExpression = command.getCron();
infoBuilder.setJobId(jobId);
infoBuilder.setTaskId(-1);
infoBuilder.setCatalogId(command.getCatalogId());
infoBuilder.setDBId(command.getDbId());
infoBuilder.setTblId(command.getTable().getId());
infoBuilder.setPartitionNames(command.getPartitionNames());
infoBuilder.setPartitionOnly(partitionOnly);
infoBuilder.setSamplingPartition(isSamplingPartition);
infoBuilder.setAllPartition(isAllPartition);
infoBuilder.setPartitionCount(partitionCount);
infoBuilder.setJobType(JobType.MANUAL);
infoBuilder.setState(AnalysisState.PENDING);
infoBuilder.setLastExecTimeInMs(System.currentTimeMillis());
infoBuilder.setAnalysisType(analysisType);
infoBuilder.setAnalysisMethod(analysisMethod);
infoBuilder.setScheduleType(scheduleType);
infoBuilder.setCronExpression(cronExpression);
infoBuilder.setForceFull(command.forceFull());
infoBuilder.setUsingSqlForExternalTable(command.usingSqlForExternalTable());
if (analysisMethod == AnalysisMethod.SAMPLE) {
infoBuilder.setSamplePercent(samplePercent);
infoBuilder.setSampleRows(sampleRows);
}
if (analysisType == AnalysisType.HISTOGRAM) {
int numBuckets = command.getNumBuckets();
int maxBucketNum = numBuckets > 0 ? numBuckets : StatisticConstants.HISTOGRAM_MAX_BUCKET_NUM;
infoBuilder.setMaxBucketNum(maxBucketNum);
}
long periodTimeInMs = command.getPeriodTimeInMs();
infoBuilder.setPeriodTimeInMs(periodTimeInMs);
OlapTable olapTable = table instanceof OlapTable ? (OlapTable) table : null;
boolean isSampleAnalyze = analysisMethod.equals(AnalysisMethod.SAMPLE);
Set<Pair<String, String>> jobColumns = table.getColumnIndexPairs(columnNames).stream()
.filter(c -> olapTable == null || StatisticsUtil.canCollectColumn(
olapTable.getIndexMetaByIndexId(olapTable.getIndexIdByName(c.first)).getColumnByName(c.second),
table, isSampleAnalyze, olapTable.getIndexIdByName(c.first)))
.collect(Collectors.toSet());
infoBuilder.setJobColumns(jobColumns);
StringJoiner stringJoiner = new StringJoiner(",", "[", "]");
for (Pair<String, String> pair : jobColumns) {
stringJoiner.add(pair.toString());
}
infoBuilder.setColName(stringJoiner.toString());
infoBuilder.setTaskIds(Lists.newArrayList());
infoBuilder.setTblUpdateTime(table.getUpdateTime());
// Empty table row count is 0. Call fetchRowCount() when getRowCount() returns <= 0,
// because getRowCount may return <= 0 if cached is not loaded. This is mainly for external table.
long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 :
(table.getRowCount() <= 0 ? table.fetchRowCount() : table.getRowCount());
infoBuilder.setRowCount(rowCount);
TableStatsMeta tableStatsStatus = findTableStatsStatus(table.getId());
infoBuilder.setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get());
infoBuilder.setTableVersion(table instanceof OlapTable ? ((OlapTable) table).getVisibleVersion() : 0);
infoBuilder.setPriority(JobPriority.MANUAL);
infoBuilder.setPartitionUpdateRows(tableStatsStatus == null ? null : tableStatsStatus.partitionUpdateRows);
infoBuilder.setEnablePartition(StatisticsUtil.enablePartitionAnalyze());
return infoBuilder.build();
}
@VisibleForTesting
public void recordAnalysisJob(AnalysisInfo jobInfo) {
if (jobInfo.scheduleType == ScheduleType.PERIOD && jobInfo.lastExecTimeInMs > 0) {
return;
}
replayCreateAnalysisJob(jobInfo);
}
public void createTaskForEachColumns(AnalysisInfo jobInfo, Map<Long, BaseAnalysisTask> analysisTasks,
boolean isSync) throws DdlException {
Set<Pair<String, String>> jobColumns = jobInfo.jobColumns;
TableIf table = jobInfo.getTable();
for (Pair<String, String> pair : jobColumns) {
AnalysisInfoBuilder colTaskInfoBuilder = new AnalysisInfoBuilder(jobInfo);
colTaskInfoBuilder.setAnalysisType(AnalysisType.FUNDAMENTALS);
long taskId = Env.getCurrentEnv().getNextId();
long indexId = -1;
if (table instanceof OlapTable) {
OlapTable olapTable = (OlapTable) table;
indexId = olapTable.getIndexIdByName(pair.first);
if (indexId == olapTable.getBaseIndexId()) {
indexId = -1;
}
}
AnalysisInfo analysisInfo = colTaskInfoBuilder.setColName(pair.second).setIndexId(indexId)
.setTaskId(taskId).setLastExecTimeInMs(System.currentTimeMillis()).build();
analysisTasks.put(taskId, createTask(analysisInfo));
jobInfo.addTaskId(taskId);
if (isSync) {
continue;
}
replayCreateAnalysisTask(analysisInfo);
}
}
// Change to public for unit test.
public void logCreateAnalysisTask(AnalysisInfo analysisInfo) {
replayCreateAnalysisTask(analysisInfo);
Env.getCurrentEnv().getEditLog().logCreateAnalysisTasks(analysisInfo);
}
// Change to public for unit test.
public void logCreateAnalysisJob(AnalysisInfo analysisJob) {
replayCreateAnalysisJob(analysisJob);
Env.getCurrentEnv().getEditLog().logCreateAnalysisJob(analysisJob);
}
public void updateTaskStatus(AnalysisInfo info, AnalysisState taskState, String message, long time) {
if (analysisJobIdToTaskMap.get(info.jobId) == null) {
return;
}
info.state = taskState;
info.message = message;
// Update the task cost time when task finished or failed. And only log the final state.
if (taskState.equals(AnalysisState.FINISHED) || taskState.equals(AnalysisState.FAILED)) {
info.timeCostInMs = time - info.lastExecTimeInMs;
info.lastExecTimeInMs = time;
// Persist task info for manual job.
if (info.jobType.equals(JobType.MANUAL)) {
logCreateAnalysisTask(info);
} else {
replayCreateAnalysisTask(info);
}
}
info.lastExecTimeInMs = time;
AnalysisInfo job = analysisJobInfoMap.get(info.jobId);
// Job may get deleted during execution.
if (job == null) {
return;
}
// Synchronize the job state change in job level.
synchronized (job) {
job.lastExecTimeInMs = time;
if (taskState.equals(AnalysisState.FAILED)) {
String errMessage = String.format("%s:[%s] ", info.colName, message);
job.message = job.message == null ? errMessage : job.message + errMessage;
}
// Set the job state to RUNNING when its first task becomes RUNNING.
if (info.state.equals(AnalysisState.RUNNING) && job.state.equals(AnalysisState.PENDING)) {
job.state = AnalysisState.RUNNING;
job.markStartTime(System.currentTimeMillis());
replayCreateAnalysisJob(job);
}
boolean allFinished = true;
boolean hasFailure = false;
for (BaseAnalysisTask task : analysisJobIdToTaskMap.get(info.jobId).values()) {
AnalysisInfo taskInfo = task.info;
if (taskInfo.state.equals(AnalysisState.RUNNING) || taskInfo.state.equals(AnalysisState.PENDING)) {
allFinished = false;
break;
}
if (taskInfo.state.equals(AnalysisState.FAILED)) {
hasFailure = true;
}
}
if (allFinished) {
if (hasFailure) {
job.markFailed();
} else {
job.markFinished();
try {
updateTableStats(job);
} catch (Throwable e) {
LOG.warn("Failed to update Table statistics in job: {}", info.toString(), e);
}
}
logCreateAnalysisJob(job);
analysisJobIdToTaskMap.remove(job.jobId);
}
}
}
@VisibleForTesting
public void updateTableStats(AnalysisInfo jobInfo) {
TableIf tbl = StatisticsUtil.findTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId);
TableStatsMeta tableStats = findTableStatsStatus(tbl.getId());
if (tableStats == null) {
updateTableStatsStatus(new TableStatsMeta(jobInfo.rowCount, jobInfo, tbl));
} else {
tableStats.update(jobInfo, tbl);
logCreateTableStats(tableStats);
}
if (jobInfo.jobColumns != null) {
jobInfo.jobColumns.clear();
}
if (jobInfo.partitionNames != null) {
jobInfo.partitionNames.clear();
}
if (jobInfo.partitionUpdateRows != null) {
jobInfo.partitionUpdateRows.clear();
}
}
@VisibleForTesting
public void updateTableStatsForAlterStats(AnalysisInfo jobInfo, TableIf tbl) {
TableStatsMeta tableStats = findTableStatsStatus(tbl.getId());
if (tableStats == null) {
updateTableStatsStatus(new TableStatsMeta(0, jobInfo, tbl));
} else {
tableStats.update(jobInfo, tbl);
logCreateTableStats(tableStats);
}
}
public List<AutoAnalysisPendingJob> showAutoPendingJobs(TableName tblName, String priority) {
List<AutoAnalysisPendingJob> result = Lists.newArrayList();
if (priority == null || priority.isEmpty()) {
result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, tblName));
result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, tblName));
result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, tblName));
result.addAll(getPendingJobs(veryLowPriorityJobs, JobPriority.VERY_LOW, tblName));
} else if (priority.equals(JobPriority.HIGH.name())) {
result.addAll(getPendingJobs(highPriorityJobs, JobPriority.HIGH, tblName));
} else if (priority.equals(JobPriority.MID.name())) {
result.addAll(getPendingJobs(midPriorityJobs, JobPriority.MID, tblName));
} else if (priority.equals(JobPriority.LOW.name())) {
result.addAll(getPendingJobs(lowPriorityJobs, JobPriority.LOW, tblName));
} else if (priority.equals(JobPriority.VERY_LOW.name())) {
result.addAll(getPendingJobs(veryLowPriorityJobs, JobPriority.VERY_LOW, tblName));
}
return result;
}
protected List<AutoAnalysisPendingJob> getPendingJobs(Map<TableName, Set<Pair<String, String>>> jobMap,
JobPriority priority, TableName tblName) {
List<AutoAnalysisPendingJob> result = Lists.newArrayList();
synchronized (jobMap) {
for (Entry<TableName, Set<Pair<String, String>>> entry : jobMap.entrySet()) {
TableName table = entry.getKey();
if (tblName == null
|| tblName.getCtl() == null && tblName.getDb() == null && tblName.getTbl() == null
|| tblName.equals(table)) {
result.add(new AutoAnalysisPendingJob(table.getCtl(),
table.getDb(), table.getTbl(), entry.getValue(), priority));
}
}
}
return result;
}
public List<AnalysisInfo> findAnalysisJobs(ShowAnalyzeStmt stmt) {
String ctl = null;
String db = null;
String table = null;
TableName dbTableName = stmt.getDbTableName();
if (dbTableName != null) {
ctl = dbTableName.getCtl();
db = dbTableName.getDb();
table = dbTableName.getTbl();
}
return findAnalysisJobs(stmt.getStateValue(), ctl, db, table, stmt.getJobId(), stmt.isAuto());
}
public List<AnalysisInfo> findAnalysisJobs(String state, String ctl, String db,
String table, long jobId, boolean isAuto) {
TableIf tbl = null;
boolean tableSpecified = ctl != null && db != null && table != null;
if (tableSpecified) {
tbl = StatisticsUtil.findTable(ctl, db, table);
}
long tblId = tbl == null ? -1 : tbl.getId();
synchronized (analysisJobInfoMap) {
return analysisJobInfoMap.values().stream()
.filter(a -> jobId == 0 || a.jobId == jobId)
.filter(a -> state == null || a.state.equals(AnalysisState.valueOf(state.toUpperCase())))
.filter(a -> !tableSpecified || a.tblId == tblId)
.filter(a -> isAuto && a.jobType.equals(JobType.SYSTEM)
|| !isAuto && a.jobType.equals(JobType.MANUAL))
.sorted(Comparator.comparingLong(a -> a.jobId))
.collect(Collectors.toList());
}
}
public String getJobProgress(long jobId) {
List<AnalysisInfo> tasks = findTasksByTaskIds(jobId);
if (tasks == null || tasks.isEmpty()) {
return "N/A";
}
int finished = 0;
int failed = 0;
int inProgress = 0;
int total = tasks.size();
for (AnalysisInfo info : tasks) {
switch (info.state) {
case FINISHED:
finished++;
break;
case FAILED:
failed++;
break;
default:
inProgress++;
break;
}
}
return String.format(progressDisplayTemplate, finished, failed, inProgress, total);
}
@VisibleForTesting
public void syncExecute(Collection<BaseAnalysisTask> tasks) {
SyncTaskCollection syncTaskCollection = new SyncTaskCollection(tasks);
ConnectContext ctx = ConnectContext.get();
ThreadPoolExecutor syncExecPool = createThreadPoolForSyncAnalyze();
try {
ctxToSyncTask.put(ctx, syncTaskCollection);
syncTaskCollection.execute(syncExecPool);
} finally {
syncExecPool.shutdown();
ctxToSyncTask.remove(ctx);
}
}
private ThreadPoolExecutor createThreadPoolForSyncAnalyze() {
String poolName = "SYNC ANALYZE THREAD POOL";
return new ThreadPoolExecutor(0,
ConnectContext.get().getSessionVariable().parallelSyncAnalyzeTaskNum,
ThreadPoolManager.KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SYNC ANALYZE" + "-%d")
.build(), new BlockedPolicy(poolName,
StatisticsUtil.getAnalyzeTimeout()));
}
public void dropStats(DropStatsCommand dropStatsCommand) throws DdlException {
TableStatsMeta tableStats = findTableStatsStatus(dropStatsCommand.getTblId());
Set<String> cols = dropStatsCommand.getColumnNames();
PartitionNamesInfo partitionNamesInfo = dropStatsCommand.getOpPartitionNamesInfo();
PartitionNames partitionNames = null;
if (partitionNamesInfo != null) {
partitionNames = new PartitionNames(partitionNamesInfo.isTemp(), partitionNamesInfo.getPartitionNames());
}
long catalogId = dropStatsCommand.getCatalogId();
long dbId = dropStatsCommand.getDbId();
long tblId = dropStatsCommand.getTblId();
TableIf table = StatisticsUtil.findTable(catalogId, dbId, tblId);
// Remove tableMetaStats if drop whole table stats.
if ((cols == null || cols.isEmpty()) && (!table.isPartitionedTable() || partitionNames == null
|| partitionNames.isStar() || partitionNames.getPartitionNames() == null)) {
removeTableStats(tblId);
Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tblId));
}
invalidateLocalStats(catalogId, dbId, tblId, cols, tableStats, partitionNames);
// Drop stats ddl is master only operation.
Set<String> partitions = null;
if (partitionNames != null && !partitionNames.isStar() && partitionNames.getPartitionNames() != null) {
partitions = new HashSet<>(partitionNames.getPartitionNames());
}
invalidateRemoteStats(catalogId, dbId, tblId, cols, partitions, false);
StatisticsRepository.dropStatistics(catalogId, dbId, tblId, cols, partitions);
}
public void dropExpiredStats() {
Env.getCurrentEnv().getStatisticsCleaner().clear();
}
public void dropCachedStats(DropCachedStatsStmt stmt) {
long catalogId = stmt.getCatalogIdId();
long dbId = stmt.getDbId();
long tblId = stmt.getTblId();
dropCachedStats(catalogId, dbId, tblId);
}
public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException {
if (dropStatsStmt.dropExpired) {
Env.getCurrentEnv().getStatisticsCleaner().clear();
return;
}
TableStatsMeta tableStats = findTableStatsStatus(dropStatsStmt.getTblId());
if (tableStats == null) {
return;
}
Set<String> cols = dropStatsStmt.getColumnNames();
PartitionNames partitionNames = dropStatsStmt.getPartitionNames();
long catalogId = dropStatsStmt.getCatalogIdId();
long dbId = dropStatsStmt.getDbId();
long tblId = dropStatsStmt.getTblId();
TableIf table = StatisticsUtil.findTable(catalogId, dbId, tblId);
// Remove tableMetaStats if drop whole table stats.
if (cols == null && (!table.isPartitionedTable() || partitionNames == null
|| partitionNames.isStar() || partitionNames.getPartitionNames() == null)) {
removeTableStats(tblId);
Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tblId));
}
invalidateLocalStats(catalogId, dbId, tblId, cols, tableStats, partitionNames);
// Drop stats ddl is master only operation.
Set<String> partitions = null;
if (partitionNames != null && !partitionNames.isStar() && partitionNames.getPartitionNames() != null) {
partitions = new HashSet<>(partitionNames.getPartitionNames());
}
invalidateRemoteStats(catalogId, dbId, tblId, cols, partitions, false);
StatisticsRepository.dropStatistics(catalogId, dbId, tblId, cols, partitions);
}
public void dropStats(TableIf table, PartitionNames partitionNames) {
try {
TableStatsMeta tableStats = findTableStatsStatus(table.getId());
if (tableStats == null) {
return;
}
long catalogId = table.getDatabase().getCatalog().getId();
long dbId = table.getDatabase().getId();
long tableId = table.getId();
submitAsyncDropStatsTask(table, catalogId, dbId, tableId, tableStats, partitionNames, true);
} catch (Throwable e) {
LOG.warn("Failed to drop stats for table {}", table.getName(), e);
}
}
class DropStatsTask implements Runnable {
private final long catalogId;
private final long dbId;
private final long tableId;
private final Set<String> columns;
private final TableStatsMeta tableStats;
private final PartitionNames partitionNames;
private final TableIf table;
private final boolean isMaster;
public DropStatsTask(TableIf table, long catalogId, long dbId, long tableId, Set<String> columns,
TableStatsMeta tableStats, PartitionNames partitionNames, boolean isMaster) {
this.catalogId = catalogId;
this.dbId = dbId;
this.tableId = tableId;
this.columns = columns;
this.tableStats = tableStats;
this.partitionNames = partitionNames;
this.table = table;
this.isMaster = isMaster;
}
@Override
public void run() {
try {
if (isMaster) {
if (!table.isPartitionedTable() || partitionNames == null
|| partitionNames.isStar() || partitionNames.getPartitionNames() == null) {
removeTableStats(tableId);
Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tableId));
}
// Drop stats ddl is master only operation.
Set<String> partitions = null;
if (partitionNames != null && !partitionNames.isStar()
&& partitionNames.getPartitionNames() != null) {
partitions = new HashSet<>(partitionNames.getPartitionNames());
}
// Drop stats ddl is master only operation.
StatisticsRepository.dropStatistics(catalogId, dbId, tableId, null, partitions);
invalidateRemoteStats(catalogId, dbId, tableId, null, partitions, true);
}
invalidateLocalStats(catalogId, dbId, tableId, columns, tableStats, partitionNames);
} catch (Throwable t) {
LOG.info("Failed to async drop stats for table {}.{}.{}, reason: {}",
catalogId, dbId, tableId, t.getMessage());
}
}
}
public void submitAsyncDropStatsTask(TableIf table, long catalogId, long dbId, long tableId,
TableStatsMeta tableStats, PartitionNames partitionNames, boolean isMaster) {
try {
dropStatsExecutors.submit(new DropStatsTask(table, catalogId, dbId, tableId, null,
tableStats, partitionNames, isMaster));
} catch (Throwable t) {
LOG.info("Failed to submit async drop stats job. reason: {}", t.getMessage());
}
}
public void dropCachedStats(long catalogId, long dbId, long tableId) {
TableIf table = StatisticsUtil.findTable(catalogId, dbId, tableId);
StatisticsCache statsCache = Env.getCurrentEnv().getStatisticsCache();
Set<String> columns = table.getSchemaAllIndexes(false)
.stream().map(Column::getName).collect(Collectors.toSet());
for (String column : columns) {
List<Long> indexIds = Lists.newArrayList();
if (table instanceof OlapTable) {
indexIds = ((OlapTable) table).getMvColumnIndexIds(column);
} else {
indexIds.add(-1L);
}
for (long indexId : indexIds) {
statsCache.invalidateColumnStatsCache(catalogId, dbId, tableId, indexId, column);
for (String part : table.getPartitionNames()) {
statsCache.invalidatePartitionColumnStatsCache(catalogId, dbId, tableId, indexId, part, column);
}
}
}
}
public void invalidateLocalStats(long catalogId, long dbId, long tableId, Set<String> columns,
TableStatsMeta tableStats, PartitionNames partitionNames) {
TableIf table = StatisticsUtil.findTable(catalogId, dbId, tableId);
StatisticsCache statsCache = Env.getCurrentEnv().getStatisticsCache();
if (columns == null || columns.isEmpty()) {
columns = table.getSchemaAllIndexes(false)
.stream().map(Column::getName).collect(Collectors.toSet());
}
Set<String> partNames = new HashSet<>();
boolean allPartition = false;
if (table.isPartitionedTable()) {
if (partitionNames == null || partitionNames.isStar() || partitionNames.getPartitionNames() == null) {
partNames = table.getPartitionNames();
allPartition = true;
} else {
partNames = new HashSet<>(partitionNames.getPartitionNames());
}
} else {
allPartition = true;
}
for (String column : columns) {
List<Long> indexIds = Lists.newArrayList();
if (table instanceof OlapTable) {
indexIds = ((OlapTable) table).getMvColumnIndexIds(column);
} else {
indexIds.add(-1L);
}
for (long indexId : indexIds) {
String indexName = table.getName();
if (table instanceof OlapTable) {
OlapTable olapTable = (OlapTable) table;
if (indexId == -1) {
indexName = olapTable.getIndexNameById(olapTable.getBaseIndexId());
} else {
indexName = olapTable.getIndexNameById(indexId);
}
}
if (allPartition) {
statsCache.invalidateColumnStatsCache(catalogId, dbId, tableId, indexId, column);
if (tableStats != null) {
tableStats.removeColumn(indexName, column);
}
}
ColStatsMeta columnStatsMeta = null;
if (tableStats != null) {
columnStatsMeta = tableStats.findColumnStatsMeta(indexName, column);
}
for (String part : partNames) {
statsCache.invalidatePartitionColumnStatsCache(catalogId, dbId, tableId, indexId, part, column);
if (columnStatsMeta != null && columnStatsMeta.partitionUpdateRows != null) {
Partition partition = table.getPartition(part);
if (partition != null) {
columnStatsMeta.partitionUpdateRows.remove(partition.getId());
}
}
}
}
}
if (tableStats != null) {
tableStats.userInjected = false;
tableStats.rowCount = table.getRowCount();
}
}
public void invalidateRemoteStats(long catalogId, long dbId, long tableId,
Set<String> columns, Set<String> partitions, boolean isTruncate) {
InvalidateStatsTarget target = new InvalidateStatsTarget(
catalogId, dbId, tableId, columns, partitions, isTruncate);
TInvalidateFollowerStatsCacheRequest request = new TInvalidateFollowerStatsCacheRequest();
request.key = GsonUtils.GSON.toJson(target);
StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache();
SystemInfoService.HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) {
// Skip master
if (selfNode.getHost().equals(frontend.getHost())) {
continue;
}
statisticsCache.invalidateStats(frontend, request);
}
TableStatsMeta tableStats = findTableStatsStatus(tableId);
if (tableStats != null) {
logCreateTableStats(tableStats);
}
}
public void updatePartitionStatsCache(long catalogId, long dbId, long tableId, long indexId,
Set<String> partNames, String colName) {
updateLocalPartitionStatsCache(catalogId, dbId, tableId, indexId, partNames, colName);
updateRemotePartitionStats(catalogId, dbId, tableId, indexId, partNames, colName);
}
public void updateLocalPartitionStatsCache(long catalogId, long dbId, long tableId, long indexId,
Set<String> partNames, String colName) {
if (partNames == null || partNames.isEmpty()) {
return;
}
Iterator<String> iterator = partNames.iterator();
StringBuilder partNamePredicate = new StringBuilder();
while (iterator.hasNext()) {
partNamePredicate.append("'");
partNamePredicate.append(StatisticsUtil.escapeSQL(iterator.next()));
partNamePredicate.append("'");
partNamePredicate.append(",");
}
if (partNamePredicate.length() > 0) {
partNamePredicate.delete(partNamePredicate.length() - 1, partNamePredicate.length());
}
List<ResultRow> resultRows = StatisticsRepository.loadPartitionColumnStats(
catalogId, dbId, tableId, indexId, partNamePredicate.toString(), colName);
// row : [catalog_id, db_id, tbl_id, idx_id, part_name, col_id,
// count, ndv, null_count, min, max, data_size, update_time]
StatisticsCache cache = Env.getCurrentEnv().getStatisticsCache();
for (ResultRow row : resultRows) {
try {
cache.updatePartitionColStatsCache(catalogId, dbId, tableId, indexId, row.get(4), colName,
PartitionColumnStatistic.fromResultRow(row));
} catch (Exception e) {
cache.invalidatePartitionColumnStatsCache(catalogId, dbId, tableId, indexId, row.get(4), colName);
}
}
}
public void updateRemotePartitionStats(long catalogId, long dbId, long tableId, long indexId,
Set<String> partNames, String colName) {
UpdatePartitionStatsTarget target = new UpdatePartitionStatsTarget(
catalogId, dbId, tableId, indexId, colName, partNames);
TUpdateFollowerPartitionStatsCacheRequest request = new TUpdateFollowerPartitionStatsCacheRequest();
request.key = GsonUtils.GSON.toJson(target);
StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache();
SystemInfoService.HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) {
// Skip master
if (selfNode.getHost().equals(frontend.getHost())) {
continue;
}
statisticsCache.updatePartitionStats(frontend, request);
}
}
public void handleKillAnalyzeJob(KillAnalyzeJobCommand killAnalyzeJobCommand) throws DdlException {
Map<Long, BaseAnalysisTask> analysisTaskMap = analysisJobIdToTaskMap.remove(killAnalyzeJobCommand.getJobId());
if (analysisTaskMap == null) {
throw new DdlException("Job not exists or already finished");
}
BaseAnalysisTask anyTask = analysisTaskMap.values().stream().findFirst().orElse(null);
if (anyTask == null) {
return;
}
checkPriv(anyTask);
logKilled(analysisJobInfoMap.get(anyTask.getJobId()));
for (BaseAnalysisTask taskInfo : analysisTaskMap.values()) {
taskInfo.cancel();
logKilled(taskInfo.info);
}
}
public void handleKillAnalyzeStmt(KillAnalysisJobStmt killAnalysisJobStmt) throws DdlException {
Map<Long, BaseAnalysisTask> analysisTaskMap = analysisJobIdToTaskMap.remove(killAnalysisJobStmt.jobId);
if (analysisTaskMap == null) {
throw new DdlException("Job not exists or already finished");
}
BaseAnalysisTask anyTask = analysisTaskMap.values().stream().findFirst().orElse(null);
if (anyTask == null) {
return;
}
checkPriv(anyTask);
logKilled(analysisJobInfoMap.get(anyTask.getJobId()));
for (BaseAnalysisTask taskInfo : analysisTaskMap.values()) {
taskInfo.cancel();
logKilled(taskInfo.info);
}
}
private void logKilled(AnalysisInfo info) {
info.state = AnalysisState.FAILED;
info.message = "Killed by user: " + ConnectContext.get().getQualifiedUser();
info.lastExecTimeInMs = System.currentTimeMillis();
Env.getCurrentEnv().getEditLog().logCreateAnalysisTasks(info);
}
private void checkPriv(BaseAnalysisTask analysisTask) {
checkPriv(analysisTask.info);
}
private void checkPriv(AnalysisInfo analysisInfo) {
DBObjects dbObjects = StatisticsUtil.convertIdToObjects(analysisInfo.catalogId,
analysisInfo.dbId, analysisInfo.tblId);
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), dbObjects.catalog.getName(), dbObjects.db.getFullName(),
dbObjects.table.getName(), PrivPredicate.SELECT)) {
throw new RuntimeException("You need at least SELECT PRIV to corresponding table to kill this analyze"
+ " job");
}
}
public void cancelSyncTask(ConnectContext connectContext) {
SyncTaskCollection syncTaskCollection = ctxToSyncTask.get(connectContext);
if (syncTaskCollection != null) {
syncTaskCollection.cancel();
}
}
private BaseAnalysisTask createTask(AnalysisInfo analysisInfo) throws DdlException {
try {
TableIf table = StatisticsUtil.findTable(analysisInfo.catalogId,
analysisInfo.dbId, analysisInfo.tblId);
return table.createAnalysisTask(analysisInfo);
} catch (Throwable t) {
LOG.warn("Failed to create task.", t);
throw new DdlException("Failed to create task", t);
}
}
public void replayCreateAnalysisJob(AnalysisInfo jobInfo) {
synchronized (analysisJobInfoMap) {
while (analysisJobInfoMap.size() >= Config.analyze_record_limit) {
analysisJobInfoMap.remove(analysisJobInfoMap.pollFirstEntry().getKey());
}
if (jobInfo.message != null && jobInfo.message.length() >= StatisticConstants.MSG_LEN_UPPER_BOUND) {
jobInfo.message = jobInfo.message.substring(0, StatisticConstants.MSG_LEN_UPPER_BOUND);
}
this.analysisJobInfoMap.put(jobInfo.jobId, jobInfo);
}
}
public void replayCreateAnalysisTask(AnalysisInfo taskInfo) {
synchronized (analysisTaskInfoMap) {
while (analysisTaskInfoMap.size() >= Config.analyze_record_limit) {
analysisTaskInfoMap.remove(analysisTaskInfoMap.pollFirstEntry().getKey());
}
if (taskInfo.message != null && taskInfo.message.length() >= StatisticConstants.MSG_LEN_UPPER_BOUND) {
taskInfo.message = taskInfo.message.substring(0, StatisticConstants.MSG_LEN_UPPER_BOUND);
}
this.analysisTaskInfoMap.put(taskInfo.taskId, taskInfo);
}
}
public void replayDeleteAnalysisJob(AnalyzeDeletionLog log) {
synchronized (analysisJobInfoMap) {
this.analysisJobInfoMap.remove(log.id);
}
}
public void replayDeleteAnalysisTask(AnalyzeDeletionLog log) {
synchronized (analysisTaskInfoMap) {
this.analysisTaskInfoMap.remove(log.id);
}
}
private static class SyncTaskCollection {
public volatile boolean cancelled;
public final Collection<BaseAnalysisTask> tasks;
public SyncTaskCollection(Collection<BaseAnalysisTask> tasks) {
this.tasks = tasks;
}
public void cancel() {
cancelled = true;
tasks.forEach(BaseAnalysisTask::cancel);
}
public void execute(ThreadPoolExecutor executor) {
List<String> colNames = Collections.synchronizedList(new ArrayList<>());
List<String> errorMessages = Collections.synchronizedList(new ArrayList<>());
CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
for (BaseAnalysisTask task : tasks) {
executor.submit(() -> {
try {
if (cancelled) {
errorMessages.add("Query Timeout or user Cancelled."
+ "Could set analyze_timeout to a bigger value.");
return;
}
try {
task.execute();
} catch (Throwable t) {
colNames.add(task.info.colName);
errorMessages.add(Util.getRootCauseMessage(t));
LOG.warn("Failed to analyze, info: {}", task, t);
}
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException t) {
LOG.warn("Thread got interrupted when waiting sync analyze task execution finished", t);
}
if (!colNames.isEmpty()) {
if (cancelled) {
throw new RuntimeException("User Cancelled or Timeout.");
}
throw new RuntimeException("Failed to analyze following columns:[" + String.join(",", colNames)
+ "] Reasons: " + String.join(",", errorMessages));
}
}
}
public List<AnalysisInfo> findTasks(long jobId) {
synchronized (analysisTaskInfoMap) {
return analysisTaskInfoMap.values().stream().filter(i -> i.jobId == jobId).collect(Collectors.toList());
}
}
public List<AnalysisInfo> findTasksByTaskIds(long jobId) {
AnalysisInfo jobInfo = analysisJobInfoMap.get(jobId);
if (jobInfo != null && jobInfo.taskIds != null) {
return jobInfo.taskIds.stream().map(analysisTaskInfoMap::get).filter(Objects::nonNull)
.collect(Collectors.toList());
}
return null;
}
public void removeAll(List<AnalysisInfo> analysisInfos) {
synchronized (analysisTaskInfoMap) {
for (AnalysisInfo analysisInfo : analysisInfos) {
analysisTaskInfoMap.remove(analysisInfo.taskId);
}
}
}
public void dropAnalyzeJob(DropAnalyzeJobCommand analyzeJobCommand) throws DdlException {
AnalysisInfo jobInfo = analysisJobInfoMap.get(analyzeJobCommand.getJobId());
if (jobInfo == null) {
throw new DdlException(String.format("Analyze job [%d] not exists", analyzeJobCommand.getJobId()));
}
checkPriv(jobInfo);
long jobId = analyzeJobCommand.getJobId();
AnalyzeDeletionLog analyzeDeletionLog = new AnalyzeDeletionLog(jobId);
Env.getCurrentEnv().getEditLog().logDeleteAnalysisJob(analyzeDeletionLog);
replayDeleteAnalysisJob(analyzeDeletionLog);
removeAll(findTasks(jobId));
}
public void dropAnalyzeJob(DropAnalyzeJobStmt analyzeJobStmt) throws DdlException {
AnalysisInfo jobInfo = analysisJobInfoMap.get(analyzeJobStmt.getJobId());
if (jobInfo == null) {
throw new DdlException(String.format("Analyze job [%d] not exists", analyzeJobStmt.getJobId()));
}
checkPriv(jobInfo);
long jobId = analyzeJobStmt.getJobId();
AnalyzeDeletionLog analyzeDeletionLog = new AnalyzeDeletionLog(jobId);
Env.getCurrentEnv().getEditLog().logDeleteAnalysisJob(analyzeDeletionLog);
replayDeleteAnalysisJob(analyzeDeletionLog);
removeAll(findTasks(jobId));
}
public static AnalysisManager readFields(DataInput in) throws IOException {
AnalysisManager analysisManager = new AnalysisManager();
readAnalysisInfo(in, analysisManager.analysisJobInfoMap, true);
readAnalysisInfo(in, analysisManager.analysisTaskInfoMap, false);
readIdToTblStats(in, analysisManager.idToTblStats);
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_128) {
readAutoJobs(in, analysisManager);
}
return analysisManager;
}
private static void readAnalysisInfo(DataInput in, Map<Long, AnalysisInfo> map, boolean job) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
// AnalysisInfo is compatible with AnalysisJobInfo and AnalysisTaskInfo.
AnalysisInfo analysisInfo = AnalysisInfo.read(in);
// Unfinished manual once job/tasks doesn't need to keep in memory anymore.
if (needAbandon(analysisInfo)) {
continue;
}
map.put(job ? analysisInfo.jobId : analysisInfo.taskId, analysisInfo);
}
}
// Need to abandon the unfinished manual once jobs/tasks while loading image and replay journal.
// Journal only store finished tasks and jobs.
public static boolean needAbandon(AnalysisInfo analysisInfo) {
if (analysisInfo == null) {
return true;
}
if (analysisInfo.scheduleType == null || analysisInfo.jobType == null) {
return true;
}
return (AnalysisState.PENDING.equals(analysisInfo.state) || AnalysisState.RUNNING.equals(analysisInfo.state))
&& ScheduleType.ONCE.equals(analysisInfo.scheduleType)
&& JobType.MANUAL.equals(analysisInfo.jobType);
}
private static void readIdToTblStats(DataInput in, Map<Long, TableStatsMeta> map) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
TableStatsMeta tableStats = TableStatsMeta.read(in);
map.put(tableStats.tblId, tableStats);
}
}
// To be deprecated, keep it for meta compatibility now, will remove later.
private static void readAutoJobs(DataInput in, AnalysisManager analysisManager) throws IOException {
Type type = new TypeToken<LinkedList<AnalysisInfo>>() {}.getType();
GsonUtils.GSON.fromJson(Text.readString(in), type);
}
@Override
public void write(DataOutput out) throws IOException {
synchronized (analysisJobInfoMap) {
writeJobInfo(out, analysisJobInfoMap);
}
synchronized (analysisTaskInfoMap) {
writeJobInfo(out, analysisTaskInfoMap);
}
writeTableStats(out);
}
private void writeJobInfo(DataOutput out, Map<Long, AnalysisInfo> infoMap) throws IOException {
out.writeInt(infoMap.size());
for (Entry<Long, AnalysisInfo> entry : infoMap.entrySet()) {
entry.getValue().write(out);
}
}
private void writeTableStats(DataOutput out) throws IOException {
synchronized (idToTblStats) {
out.writeInt(idToTblStats.size());
for (Entry<Long, TableStatsMeta> entry : idToTblStats.entrySet()) {
entry.getValue().write(out);
}
}
}
// For unit test use only.
public void addToJobIdTasksMap(long jobId, Map<Long, BaseAnalysisTask> tasks) {
analysisJobIdToTaskMap.put(jobId, tasks);
}
public TableStatsMeta findTableStatsStatus(long tblId) {
return idToTblStats.get(tblId);
}
// Invoke this when load transaction finished.
public void updateUpdatedRows(Map<Long, Map<Long, Long>> tabletRecords, long dbId, long txnId) {
try {
if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) {
return;
}
UpdateRowsEvent updateRowsEvent = new UpdateRowsEvent(tabletRecords, dbId);
LOG.info("Update rows transactionId is {}", txnId);
replayUpdateRowsRecord(updateRowsEvent);
logUpdateRowsRecord(updateRowsEvent);
} catch (Throwable t) {
LOG.warn("Failed to record update rows.", t);
}
}
// Invoke this when load truncate table finished.
public void updateUpdatedRows(Map<Long, Long> partitionToUpdateRows, long dbId, long tableId, long txnId) {
try {
if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) {
return;
}
UpdateRowsEvent updateRowsEvent = new UpdateRowsEvent(partitionToUpdateRows, dbId, tableId);
replayUpdateRowsRecord(updateRowsEvent);
logUpdateRowsRecord(updateRowsEvent);
} catch (Throwable t) {
LOG.warn("Failed to record update rows.", t);
}
}
// Invoke this for cloud version load.
public void updateUpdatedRows(Map<Long, Long> updatedRows) {
try {
if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread()) {
return;
}
UpdateRowsEvent updateRowsEvent = new UpdateRowsEvent(updatedRows);
replayUpdateRowsRecord(updateRowsEvent);
logUpdateRowsRecord(updateRowsEvent);
} catch (Throwable t) {
LOG.warn("Failed to record update rows.", t);
}
}
// Set to true means new partition loaded data
public void setNewPartitionLoaded(List<Long> tableIds) {
if (!Env.getCurrentEnv().isMaster() || Env.isCheckpointThread() || tableIds == null || tableIds.isEmpty()) {
return;
}
for (long tableId : tableIds) {
TableStatsMeta statsStatus = idToTblStats.get(tableId);
if (statsStatus != null) {
statsStatus.partitionChanged.set(true);
}
}
logNewPartitionLoadedEvent(new NewPartitionLoadedEvent(tableIds));
}
public void updateTableStatsStatus(TableStatsMeta tableStats) {
replayUpdateTableStatsStatus(tableStats);
logCreateTableStats(tableStats);
}
public void replayUpdateTableStatsStatus(TableStatsMeta tableStats) {
synchronized (idToTblStats) {
idToTblStats.put(tableStats.tblId, tableStats);
}
}
public void logCreateTableStats(TableStatsMeta tableStats) {
Env.getCurrentEnv().getEditLog().logCreateTableStats(tableStats);
}
public void logUpdateRowsRecord(UpdateRowsEvent record) {
Env.getCurrentEnv().getEditLog().logUpdateRowsRecord(record);
}
public void logNewPartitionLoadedEvent(NewPartitionLoadedEvent event) {
Env.getCurrentEnv().getEditLog().logNewPartitionLoadedEvent(event);
}
public void replayUpdateRowsRecord(UpdateRowsEvent event) {
// For older version compatible.
InternalCatalog catalog = Env.getCurrentInternalCatalog();
if (event.getRecords() != null) {
for (Entry<Long, Long> record : event.getRecords().entrySet()) {
TableStatsMeta statsStatus = idToTblStats.get(record.getKey());
if (statsStatus != null) {
statsStatus.updatedRows.addAndGet(record.getValue());
}
}
return;
}
// Record : TableId -> (TabletId -> update rows)
if (event.getTabletRecords() != null) {
for (Entry<Long, Map<Long, Long>> record : event.getTabletRecords().entrySet()) {
TableStatsMeta statsStatus = idToTblStats.get(record.getKey());
if (statsStatus != null) {
Optional<Database> dbOption = catalog.getDb(event.getDbId());
if (!dbOption.isPresent()) {
LOG.warn("Database {} does not exist.", event.getDbId());
continue;
}
Database database = dbOption.get();
Optional<Table> tableOption = database.getTable(record.getKey());
if (!tableOption.isPresent()) {
LOG.warn("Table {} does not exist in DB {}.", record.getKey(), event.getDbId());
continue;
}
Table table = tableOption.get();
if (!(table instanceof OlapTable)) {
continue;
}
OlapTable olapTable = (OlapTable) table;
short replicaNum = olapTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum();
Map<Long, Long> tabletRows = record.getValue();
if (tabletRows == null || tabletRows.isEmpty()) {
LOG.info("Tablet row count map is empty");
continue;
}
long rowsForAllReplica = 0;
for (Entry<Long, Long> entry : tabletRows.entrySet()) {
rowsForAllReplica += entry.getValue();
if (LOG.isDebugEnabled()) {
LOG.debug("Table id {}, tablet id {}, row count {}",
record.getKey(), entry.getKey(), entry.getValue());
}
}
long tableUpdateRows = rowsForAllReplica / replicaNum;
LOG.info("Update rows for table {} is {}, replicaNum is {}, "
+ "rows for all replica {}, tablets count {}",
olapTable.getName(), tableUpdateRows, replicaNum, rowsForAllReplica, tabletRows.size());
statsStatus.updatedRows.addAndGet(tableUpdateRows);
if (StatisticsUtil.enablePartitionAnalyze()) {
updatePartitionRows(olapTable, tabletRows, statsStatus, replicaNum);
}
}
}
return;
}
// Handle truncate table
if (event.getPartitionToUpdateRows() != null && event.getTableId() > 0) {
Map<Long, Long> partRows = event.getPartitionToUpdateRows();
long totalRows = partRows.values().stream().mapToLong(rows -> rows).sum();
TableStatsMeta statsStatus = idToTblStats.get(event.getTableId());
if (statsStatus != null) {
statsStatus.updatedRows.addAndGet(totalRows);
if (StatisticsUtil.enablePartitionAnalyze()) {
for (Entry<Long, Long> entry : partRows.entrySet()) {
statsStatus.partitionUpdateRows.computeIfPresent(entry.getKey(),
(id, rows) -> rows += entry.getValue());
statsStatus.partitionUpdateRows.putIfAbsent(entry.getKey(), entry.getValue());
}
}
}
}
}
protected void updatePartitionRows(OlapTable table, Map<Long, Long> originTabletToRows,
TableStatsMeta tableStats, short replicaNum) {
if (!table.isPartitionedTable()) {
return;
}
List<Partition> partitions = table.getPartitions().stream().sorted(
Comparator.comparing(Partition::getVisibleVersionTime).reversed()).collect(Collectors.toList());
Map<Long, Long> tabletToRows = new HashMap<>(originTabletToRows);
int tabletCount = tabletToRows.size();
if (tableStats.partitionUpdateRows == null) {
tableStats.partitionUpdateRows = new ConcurrentHashMap<>();
}
for (Partition p : partitions) {
MaterializedIndex baseIndex = p.getBaseIndex();
Iterator<Entry<Long, Long>> iterator = tabletToRows.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Long, Long> entry = iterator.next();
long tabletId = entry.getKey();
Tablet tablet = baseIndex.getTablet(tabletId);
if (tablet == null) {
continue;
}
long tabletRows = entry.getValue();
tableStats.partitionUpdateRows.computeIfPresent(p.getId(),
(id, rows) -> rows += tabletRows / replicaNum);
tableStats.partitionUpdateRows.putIfAbsent(p.getId(), tabletRows / replicaNum);
iterator.remove();
tabletCount--;
}
if (tabletCount <= 0) {
break;
}
}
}
public void replayNewPartitionLoadedEvent(NewPartitionLoadedEvent event) {
if (event == null || event.getTableIds() == null) {
return;
}
for (long tableId : event.getTableIds()) {
TableStatsMeta statsStatus = idToTblStats.get(tableId);
if (statsStatus != null) {
statsStatus.partitionChanged.set(true);
}
}
}
public void registerSysJob(AnalysisInfo jobInfo, Map<Long, BaseAnalysisTask> taskInfos) {
recordAnalysisJob(jobInfo);
analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos);
}
public void removeTableStats(long tableId) {
synchronized (idToTblStats) {
idToTblStats.remove(tableId);
}
}
public Set<Long> getIdToTblStatsKeys() {
return new HashSet<>(idToTblStats.keySet());
}
public ColStatsMeta findColStatsMeta(long tblId, String indexName, String colName) {
TableStatsMeta tableStats = findTableStatsStatus(tblId);
if (tableStats == null) {
return null;
}
return tableStats.findColumnStatsMeta(indexName, colName);
}
public AnalysisInfo findJobInfo(long id) {
return analysisJobInfoMap.get(id);
}
public void constructJob(AnalysisInfo jobInfo, Collection<? extends BaseAnalysisTask> tasks) {
AnalysisJob job = new AnalysisJob(jobInfo, tasks);
idToAnalysisJob.put(jobInfo.jobId, job);
}
public void removeJob(long id) {
idToAnalysisJob.remove(id);
}
/**
* Only OlapTable and Hive HMSExternalTable can sample for now.
* @param table Table to check
* @return Return true if the given table can do sample analyze. False otherwise.
*/
public boolean canSample(TableIf table) {
if (table instanceof OlapTable) {
return true;
}
return table instanceof HMSExternalTable
&& ((HMSExternalTable) table).getDlaType().equals(HMSExternalTable.DLAType.HIVE);
}
public void updateHighPriorityColumn(Set<Slot> slotReferences) {
updateColumn(slotReferences, highPriorityColumns);
}
public void updateMidPriorityColumn(Collection<Slot> slotReferences) {
updateColumn(slotReferences, midPriorityColumns);
}
protected void updateColumn(Collection<Slot> slotReferences, Queue<QueryColumn> queue) {
for (Slot s : slotReferences) {
if (!(s instanceof SlotReference)) {
return;
}
Optional<Column> optionalColumn = ((SlotReference) s).getOriginalColumn();
Optional<TableIf> optionalTable = ((SlotReference) s).getOriginalTable();
if (optionalColumn.isPresent() && optionalTable.isPresent()
&& !StatisticsUtil.isUnsupportedType(optionalColumn.get().getType())) {
TableIf table = optionalTable.get();
DatabaseIf database = table.getDatabase();
if (database != null) {
CatalogIf catalog = database.getCatalog();
if (catalog != null) {
queue.offer(new QueryColumn(catalog.getId(), database.getId(),
table.getId(), optionalColumn.get().getName()));
if (LOG.isDebugEnabled()) {
LOG.debug("Offer column " + table.getName() + "(" + table.getId() + ")."
+ optionalColumn.get().getName());
}
}
}
}
}
}
public void mergeFollowerQueryColumns(Collection<TQueryColumn> highColumns,
Collection<TQueryColumn> midColumns) {
LOG.info("Received {} high columns and {} mid columns", highColumns.size(), midColumns.size());
for (TQueryColumn c : highColumns) {
if (!highPriorityColumns.offer(new QueryColumn(Long.parseLong(c.catalogId), Long.parseLong(c.dbId),
Long.parseLong(c.tblId), c.colName))) {
break;
}
}
for (TQueryColumn c : midColumns) {
if (!midPriorityColumns.offer(new QueryColumn(Long.parseLong(c.catalogId), Long.parseLong(c.dbId),
Long.parseLong(c.tblId), c.colName))) {
break;
}
}
}
}