HMSAnalysisTask.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HiveUtil;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.collect.Sets;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
public class HMSAnalysisTask extends ExternalAnalysisTask {
private static final Logger LOG = LogManager.getLogger(HMSAnalysisTask.class);
private HMSExternalTable hmsExternalTable;
// for test
public HMSAnalysisTask() {
}
public HMSAnalysisTask(AnalysisInfo info) {
super(info);
hmsExternalTable = (HMSExternalTable) tbl;
}
private boolean isPartitionColumn() {
return hmsExternalTable.getPartitionColumns().stream().anyMatch(c -> c.getName().equals(col.getName()));
}
// For test
protected void setTable(HMSExternalTable table) {
setTable((ExternalTable) table);
this.hmsExternalTable = table;
}
@Override
public void doExecute() throws Exception {
if (killed) {
return;
}
if (info.usingSqlForExternalTable) {
// If user specify with sql in analyze statement, using sql to collect stats.
super.doExecute();
} else {
// By default, using HMS stats and partition info to collect stats.
try {
if (StatisticsUtil.enablePartitionAnalyze() && tbl.isPartitionedTable()) {
throw new RuntimeException("HMS doesn't support fetch partition level stats.");
}
if (isPartitionColumn()) {
getPartitionColumnStats();
} else {
getHmsColumnStats();
}
} catch (Exception e) {
LOG.info("Failed to collect stats for {}col {} using metadata, "
+ "fallback to normal collection. Because {}",
isPartitionColumn() ? "partition " : "", col.getName(), e.getMessage());
/* retry using sql way! */
super.doExecute();
}
}
}
// Collect the partition column stats through HMS metadata.
// Get all the partition values and calculate the stats based on the values.
private void getPartitionColumnStats() {
Set<String> partitionNames = hmsExternalTable.getPartitionNames();
Set<String> ndvPartValues = Sets.newHashSet();
long numNulls = 0;
long dataSize = 0;
String min = null;
String max = null;
for (String names : partitionNames) {
// names is like "date=20230101" for one level partition
// and like "date=20230101/hour=12" for two level partition
List<String[]> parts = HiveUtil.toPartitionColNameAndValues(names);
for (String[] part : parts) {
String colName = part[0];
String value = part[1];
if (colName != null && colName.equals(col.getName())) {
// HIVE_DEFAULT_PARTITION hive partition value when the partition name is not specified.
if (value == null || value.isEmpty() || value.equals(HiveMetaStoreCache.HIVE_DEFAULT_PARTITION)) {
numNulls += 1;
continue;
}
ndvPartValues.add(value);
dataSize += col.getType().isStringType() ? value.length() : col.getType().getSlotSize();
min = updateMinValue(min, value);
max = updateMaxValue(max, value);
}
}
}
// getRowCount may return 0 if cache is empty, in this case, call fetchRowCount.
long count = hmsExternalTable.getRowCount();
if (count == 0) {
count = hmsExternalTable.fetchRowCount();
}
dataSize = dataSize * count / partitionNames.size();
numNulls = numNulls * count / partitionNames.size();
int ndv = ndvPartValues.size();
Map<String, String> params = buildSqlParams();
params.put("row_count", String.valueOf(count));
params.put("ndv", String.valueOf(ndv));
params.put("null_count", String.valueOf(numNulls));
params.put("min", StatisticsUtil.quote(min));
params.put("max", StatisticsUtil.quote(max));
params.put("data_size", String.valueOf(dataSize));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_TEMPLATE);
runQuery(sql);
}
// Collect the spark analyzed column stats through HMS metadata.
private void getHmsColumnStats() throws Exception {
// getRowCount may return 0 if cache is empty, in this case, call fetchRowCount.
long count = hmsExternalTable.getRowCount();
if (count == 0) {
count = hmsExternalTable.fetchRowCount();
}
Map<String, String> params = buildSqlParams();
Map<StatsType, String> statsParams = new HashMap<>();
statsParams.put(StatsType.NDV, "ndv");
statsParams.put(StatsType.NUM_NULLS, "null_count");
statsParams.put(StatsType.MIN_VALUE, "min");
statsParams.put(StatsType.MAX_VALUE, "max");
statsParams.put(StatsType.AVG_SIZE, "avg_len");
if (!hmsExternalTable.fillColumnStatistics(info.colName, statsParams, params)) {
throw new AnalysisException("some column stats not available");
}
long dataSize = Long.parseLong(params.get("avg_len")) * count;
params.put("row_count", String.valueOf(count));
params.put("data_size", String.valueOf(dataSize));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_TEMPLATE);
runQuery(sql);
}
private String updateMinValue(String currentMin, String value) {
if (currentMin == null) {
return value;
}
if (col.getType().isFixedPointType()) {
if (Long.parseLong(value) < Long.parseLong(currentMin)) {
return value;
} else {
return currentMin;
}
}
if (col.getType().isFloatingPointType() || col.getType().isDecimalV2() || col.getType().isDecimalV3()) {
if (Double.parseDouble(value) < Double.parseDouble(currentMin)) {
return value;
} else {
return currentMin;
}
}
return value.compareTo(currentMin) < 0 ? value : currentMin;
}
private String updateMaxValue(String currentMax, String value) {
if (currentMax == null) {
return value;
}
if (col.getType().isFixedPointType()) {
if (Long.parseLong(value) > Long.parseLong(currentMax)) {
return value;
} else {
return currentMax;
}
}
if (col.getType().isFloatingPointType() || col.getType().isDecimalV2() || col.getType().isDecimalV3()) {
if (Double.parseDouble(value) > Double.parseDouble(currentMax)) {
return value;
} else {
return currentMax;
}
}
return value.compareTo(currentMax) > 0 ? value : currentMax;
}
@Override
protected void doSample() {
StringBuilder sb = new StringBuilder();
Map<String, String> params = buildSqlParams();
params.put("min", getMinFunction());
params.put("max", getMaxFunction());
params.put("dataSizeFunction", getDataSizeFunction(col, false));
Pair<Double, Long> sampleInfo = getSampleInfo();
params.put("scaleFactor", String.valueOf(sampleInfo.first));
if (LOG.isDebugEnabled()) {
LOG.debug("Will do sample collection for column {}", col.getName());
}
boolean limitFlag = false;
boolean bucketFlag = false;
// If sample size is too large, use limit to control the sample size.
if (needLimit(sampleInfo.second, sampleInfo.first)) {
limitFlag = true;
long columnSize = 0;
for (Column column : table.getFullSchema()) {
columnSize += column.getDataType().getSlotSize();
}
double targetRows = (double) sampleInfo.second / columnSize;
// Estimate the new scaleFactor based on the schema.
if (targetRows > StatisticsUtil.getHugeTableSampleRows()) {
params.put("limit", "limit " + StatisticsUtil.getHugeTableSampleRows());
params.put("scaleFactor",
String.valueOf(sampleInfo.first * targetRows / StatisticsUtil.getHugeTableSampleRows()));
}
}
// Single distribution column is not fit for DUJ1 estimator, use linear estimator.
Set<String> distributionColumns = tbl.getDistributionColumnNames();
if (distributionColumns.size() == 1 && distributionColumns.contains(col.getName().toLowerCase())) {
bucketFlag = true;
sb.append(LINEAR_ANALYZE_TEMPLATE);
params.put("ndvFunction", "ROUND(NDV(`${colName}`) * ${scaleFactor})");
params.put("rowCount", "ROUND(count(1) * ${scaleFactor})");
} else {
sb.append(DUJ1_ANALYZE_TEMPLATE);
params.put("subStringColName", getStringTypeColName(col));
params.put("dataSizeFunction", getDataSizeFunction(col, true));
params.put("ndvFunction", getNdvFunction("ROUND(SUM(t1.count) * ${scaleFactor})"));
params.put("rowCount", "ROUND(SUM(t1.count) * ${scaleFactor})");
}
LOG.info("Sample for column [{}]. Scale factor [{}], "
+ "limited [{}], is distribute column [{}]",
col.getName(), params.get("scaleFactor"), limitFlag, bucketFlag);
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(sb.toString());
runQuery(sql);
}
@Override
protected void doFull() throws Exception {
if (StatisticsUtil.enablePartitionAnalyze() && tbl.isPartitionedTable()) {
doPartitionTable();
} else {
super.doFull();
}
}
@Override
protected void deleteNotExistPartitionStats(AnalysisInfo jobInfo) throws DdlException {
TableStatsMeta tableStats = Env.getServingEnv().getAnalysisManager().findTableStatsStatus(tbl.getId());
if (tableStats == null) {
return;
}
String indexName = table.getName();
ColStatsMeta columnStats = tableStats.findColumnStatsMeta(indexName, info.colName);
if (columnStats == null) {
return;
}
// For external table, simply remove all partition stats for the given column and re-analyze it again.
String columnCondition = "AND col_id = " + StatisticsUtil.quote(col.getName());
StatisticsRepository.dropPartitionsColumnStatistics(info.catalogId, info.dbId, info.tblId,
columnCondition, "");
}
@Override
protected String getPartitionInfo(String partitionName) {
// partitionName is like "date=20230101" for one level partition
// and like "date=20230101/hour=12" for two level partition
String[] parts = partitionName.split("/");
if (parts.length == 0) {
throw new RuntimeException("Invalid partition name " + partitionName);
}
StringBuilder sb = new StringBuilder();
sb.append(" WHERE ");
for (int i = 0; i < parts.length; i++) {
String[] split = parts[i].split("=");
if (split.length != 2 || split[0].isEmpty() || split[1].isEmpty()) {
throw new RuntimeException("Invalid partition name " + partitionName);
}
sb.append("`");
sb.append(split[0]);
sb.append("`");
sb.append(" = ");
sb.append("'");
sb.append(split[1]);
sb.append("'");
if (i < parts.length - 1) {
sb.append(" AND ");
}
}
return sb.toString();
}
protected String getSampleHint() {
if (tableSample == null) {
return "";
}
if (tableSample.isPercent()) {
return String.format("TABLESAMPLE(%d PERCENT)", tableSample.getSampleValue());
} else {
return String.format("TABLESAMPLE(%d ROWS)", tableSample.getSampleValue());
}
}
/**
* Get the pair of sample scale factor and the file size going to sample.
* While analyzing, the result of count, null count and data size need to
* multiply this scale factor to get more accurate result.
* @return Pair of sample scale factor and the file size going to sample.
*/
protected Pair<Double, Long> getSampleInfo() {
if (tableSample == null) {
return Pair.of(1.0, 0L);
}
long target;
// Get list of all files' size in this HMS table.
List<Long> chunkSizes = table.getChunkSizes();
Collections.shuffle(chunkSizes, new Random(tableSample.getSeek()));
long total = 0;
// Calculate the total size of this HMS table.
for (long size : chunkSizes) {
total += size;
}
if (total == 0) {
return Pair.of(1.0, 0L);
}
// Calculate the sample target size for percent and rows sample.
if (tableSample.isPercent()) {
target = total * tableSample.getSampleValue() / 100;
} else {
int columnSize = 0;
for (Column column : table.getFullSchema()) {
columnSize += column.getDataType().getSlotSize();
}
target = columnSize * tableSample.getSampleValue();
}
// Calculate the actual sample size (cumulate).
long cumulate = 0;
for (long size : chunkSizes) {
cumulate += size;
if (cumulate >= target) {
break;
}
}
return Pair.of(Math.max(((double) total) / cumulate, 1), cumulate);
}
/**
* If the size to sample is larger than LIMIT_SIZE (1GB)
* and is much larger (1.2*) than the size user want to sample,
* use limit to control the total sample size.
* @param sizeToRead The file size to sample.
* @param factor sizeToRead * factor = Table total size.
* @return True if need to limit.
*/
protected boolean needLimit(long sizeToRead, double factor) {
long total = (long) (sizeToRead * factor);
long target;
if (tableSample.isPercent()) {
target = total * tableSample.getSampleValue() / 100;
} else {
int columnSize = 0;
for (Column column : table.getFullSchema()) {
columnSize += column.getDataType().getSlotSize();
}
target = columnSize * tableSample.getSampleValue();
}
return sizeToRead > LIMIT_SIZE && sizeToRead > target * LIMIT_FACTOR;
}
}