OlapAnalysisTask.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.statistics;

import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import org.apache.commons.text.StringSubstitutor;

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * Each task analyze one column.
 */
public class OlapAnalysisTask extends BaseAnalysisTask {

    private static final String BASIC_STATS_TEMPLATE = "SELECT "
            + "SUBSTRING(CAST(MIN(`${colName}`) AS STRING), 1, 1024) as min, "
            + "SUBSTRING(CAST(MAX(`${colName}`) AS STRING), 1, 1024) as max "
            + "FROM `${dbName}`.`${tblName}` ${index}";

    private boolean keyColumnSampleTooManyRows = false;
    private boolean partitionColumnSampleTooManyRows = false;
    private boolean scanFullTable = false;
    private static final long MAXIMUM_SAMPLE_ROWS = 1_000_000_000;
    public static final long NO_SKIP_TABLET_ID = -1;

    @VisibleForTesting
    public OlapAnalysisTask() {
    }

    public OlapAnalysisTask(AnalysisInfo info) {
        super(info);
    }

    public void doExecute() throws Exception {
        if (killed) {
            return;
        }
        // For empty table, write empty result directly, no need to run SQL to collect stats.
        if (info.rowCount == 0 && tableSample != null) {
            StatsId statsId = new StatsId(concatColumnStatsId(), info.catalogId, info.dbId,
                    info.tblId, info.indexId, info.colName, null);
            job.appendBuf(this, Collections.singletonList(new ColStatsData(statsId)));
            return;
        }
        if (tableSample != null) {
            doSample();
        } else {
            doFull();
        }
        LOG.info("AnalysisTask Done {}", this.toString());
    }

    /**
     * 1. Get col stats in sample ways
     * 2. estimate partition stats
     * 3. insert col stats and partition stats
     */
    @Override
    protected void doSample() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Will do sample collection for column {}", col.getName());
        }
        // Get basic stats, including min and max.
        ResultRow minMax = collectMinMax();
        String min = StatisticsUtil.escapeSQL(minMax != null && minMax.getValues().size() > 0
                ? minMax.get(0) : null);
        String max = StatisticsUtil.escapeSQL(minMax != null && minMax.getValues().size() > 1
                ? minMax.get(1) : null);

        Map<String, String> params = buildSqlParams();
        params.put("min", StatisticsUtil.quote(min));
        params.put("max", StatisticsUtil.quote(max));
        long tableRowCount = info.indexId == -1
                ? tbl.getRowCount()
                : ((OlapTable) tbl).getRowCountForIndex(info.indexId, false);
        getSampleParams(params, tableRowCount);
        StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
        String sql;
        if (useLinearAnalyzeTemplate()) {
            sql = stringSubstitutor.replace(LINEAR_ANALYZE_TEMPLATE);
        } else {
            sql = stringSubstitutor.replace(DUJ1_ANALYZE_TEMPLATE);
        }
        LOG.info("Analyze param: scanFullTable {}, partitionColumnTooMany {}, keyColumnTooMany {}",
                scanFullTable, partitionColumnSampleTooManyRows, keyColumnSampleTooManyRows);
        LOG.debug(sql);
        runQuery(sql);
    }

    protected ResultRow collectMinMax() {
        long startTime = System.currentTimeMillis();
        Map<String, String> params = buildSqlParams();
        StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
        String sql = stringSubstitutor.replace(BASIC_STATS_TEMPLATE);
        ResultRow resultRow;
        try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
            stmtExecutor = new StmtExecutor(r.connectContext, sql);
            resultRow = stmtExecutor.executeInternalQuery().get(0);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Cost time in millisec: " + (System.currentTimeMillis() - startTime) + " Min max SQL: "
                        + sql + " QueryId: " + DebugUtil.printId(stmtExecutor.getContext().queryId()));
            }
            // Release the reference to stmtExecutor, reduce memory usage.
            stmtExecutor = null;
        } catch (Exception e) {
            LOG.info("Failed to collect basic stat {}. Reason {}", sql, e.getMessage());
            throw e;
        }
        return resultRow;
    }

    /**
     * Select the tablets to read.
     * @return Pair of tablet id list and how many rows are going to read.
     */
    protected Pair<List<Long>, Long> getSampleTablets() {
        long targetSampleRows = getSampleRows();
        OlapTable olapTable = (OlapTable) tbl;
        boolean forPartitionColumn = tbl.isPartitionColumn(col.getName());
        long avgTargetRowsPerPartition = targetSampleRows / Math.max(olapTable.getPartitions().size(), 1);
        List<Long> sampleTabletIds = new ArrayList<>();
        long selectedRows = 0;
        boolean enough = false;
        // Sort the partitions to get stable result.
        List<Partition> sortedPartitions = olapTable.getPartitions().stream().sorted(
                Comparator.comparing(Partition::getName)).collect(Collectors.toList());
        long largeTabletId = 0;
        long largeTabletRows = Long.MAX_VALUE;
        for (Partition p : sortedPartitions) {
            MaterializedIndex materializedIndex = info.indexId == -1 ? p.getBaseIndex() : p.getIndex(info.indexId);
            if (materializedIndex == null) {
                continue;
            }
            List<Long> ids = materializedIndex.getTabletIdsInOrder();
            if (ids.isEmpty()) {
                continue;
            }
            long avgRowsPerTablet = Math.max(materializedIndex.getRowCount() / ids.size(), 1);
            long tabletCounts = Math.max(avgTargetRowsPerPartition / avgRowsPerTablet
                    + (avgTargetRowsPerPartition % avgRowsPerTablet != 0 ? 1 : 0), 1);
            tabletCounts = Math.min(tabletCounts, ids.size());
            long seek = tableSample.getSeek() != -1 ? tableSample.getSeek()
                    : (long) (new SecureRandom().nextDouble() * ids.size());
            for (int i = 0; i < tabletCounts; i++) {
                int seekTid = (int) ((i + seek) % ids.size());
                long tabletId = ids.get(seekTid);
                long tabletRows = materializedIndex.getTablet(tabletId).getMinReplicaRowCount(p.getVisibleVersion());
                if (tabletRows > MAXIMUM_SAMPLE_ROWS) {
                    LOG.debug("Found one large tablet id {} in table {}, rows {}",
                            largeTabletId, tbl.getName(), largeTabletRows);
                    // Skip very large tablet and record the smallest large tablet id and row count.
                    if (tabletRows < largeTabletRows) {
                        LOG.debug("Current smallest large tablet id {} in table {}, rows {}",
                                largeTabletId, tbl.getName(), largeTabletRows);
                        largeTabletId = tabletId;
                        largeTabletRows = tabletRows;
                    }
                    continue;
                }
                sampleTabletIds.add(tabletId);
                if (tabletRows > 0) {
                    selectedRows += tabletRows;
                    // For regular column, will stop adding more tablets when selected tablets'
                    // row count is more than the target sample rows.
                    // But for partition columns, will not stop adding. For ndv sample accuracy,
                    // better to choose at least one tablet in each partition.
                    if (selectedRows >= targetSampleRows && !forPartitionColumn) {
                        enough = true;
                        break;
                    }
                }
            }
            if (enough) {
                break;
            }
        }
        // If we skipped some large tablets and this cause the sampled rows is not enough, we add the large tablet back.
        if (!enough && largeTabletId != 0) {
            sampleTabletIds.add(largeTabletId);
            selectedRows += largeTabletRows;
            LOG.info("Add large tablet {} in table {} back, with rows {}",
                    largeTabletId, tbl.getName(), largeTabletRows);
        }
        if (selectedRows < targetSampleRows) {
            scanFullTable = true;
        } else if (forPartitionColumn && selectedRows > MAXIMUM_SAMPLE_ROWS) {
            // If the selected tablets for partition column contain too many rows, change to linear sample.
            partitionColumnSampleTooManyRows = true;
            sampleTabletIds.clear();
            Collections.shuffle(sortedPartitions);
            selectedRows = pickSamplePartition(sortedPartitions, sampleTabletIds, getSkipPartitionId(sortedPartitions));
        } else if (col.isKey() && selectedRows > MAXIMUM_SAMPLE_ROWS) {
            // For key column, if a single tablet contains too many rows, need to use limit to control rows to read.
            // In most cases, a single tablet shouldn't contain more than MAXIMUM_SAMPLE_ROWS, in this case, we
            // don't use limit for key column for ndv accuracy reason.
            keyColumnSampleTooManyRows = true;
        }
        return Pair.of(sampleTabletIds, selectedRows);
    }

    /**
     * Get the sql params for this sample task.
     * @param params Sql params to use in analyze task.
     * @param tableRowCount BE reported table/index row count.
     */
    protected void getSampleParams(Map<String, String> params, long tableRowCount) {
        long targetSampleRows = getSampleRows();
        params.put("rowCount", String.valueOf(tableRowCount));
        params.put("type", col.getType().toString());
        params.put("limit", "");

        // For agg table and mor unique table, set PREAGGOPEN preAggHint.
        if (((OlapTable) tbl).getKeysType().equals(KeysType.AGG_KEYS)
                || ((OlapTable) tbl).getKeysType().equals(KeysType.UNIQUE_KEYS)
                && !((OlapTable) tbl).isUniqKeyMergeOnWrite()) {
            params.put("preAggHint", "/*+PREAGGOPEN*/");
        }

        // If table row count is less than the target sample row count, simple scan the full table.
        if (tableRowCount <= targetSampleRows) {
            params.put("scaleFactor", "1");
            params.put("sampleHints", "");
            params.put("ndvFunction", "ROUND(NDV(`${colName}`) * ${scaleFactor})");
            scanFullTable = true;
            return;
        }
        Pair<List<Long>, Long> sampleTabletsInfo = getSampleTablets();
        String tabletStr = sampleTabletsInfo.first.stream()
                .map(Object::toString)
                .collect(Collectors.joining(", "));
        String sampleHints = scanFullTable ? "" : String.format("TABLET(%s)", tabletStr);
        params.put("sampleHints", sampleHints);
        long selectedRows = sampleTabletsInfo.second;
        long finalScanRows = selectedRows;
        double scaleFactor = scanFullTable ? 1 : (double) tableRowCount / finalScanRows;
        params.put("scaleFactor", String.valueOf(scaleFactor));

        // If the tablets to be sampled are too large, use limit to control the rows to read, and re-calculate
        // the scaleFactor.
        if (needLimit()) {
            finalScanRows = Math.min(targetSampleRows, selectedRows);
            if (col.isKey() && keyColumnSampleTooManyRows) {
                finalScanRows = MAXIMUM_SAMPLE_ROWS;
            }
            // Empty table doesn't need to limit.
            if (finalScanRows > 0) {
                scaleFactor = (double) tableRowCount / finalScanRows;
                params.put("limit", "limit " + finalScanRows);
                params.put("scaleFactor", String.valueOf(scaleFactor));
            }
        }
        // Set algorithm related params.
        if (useLinearAnalyzeTemplate()) {
            // For single unique key, use count as ndv.
            if (isSingleUniqueKey()) {
                params.put("ndvFunction", String.valueOf(tableRowCount));
            } else {
                params.put("ndvFunction", "ROUND(NDV(`${colName}`) * ${scaleFactor})");
            }
        } else {
            params.put("ndvFunction", getNdvFunction(String.valueOf(tableRowCount)));
            params.put("dataSizeFunction", getDataSizeFunction(col, true));
            params.put("subStringColName", getStringTypeColName(col));
        }
    }

    protected void doFull() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Will do full collection for column {}", col.getName());
        }
        if (StatisticsUtil.enablePartitionAnalyze() && tbl.isPartitionedTable()) {
            doPartitionTable();
        } else {
            StringSubstitutor stringSubstitutor = new StringSubstitutor(buildSqlParams());
            runQuery(stringSubstitutor.replace(FULL_ANALYZE_TEMPLATE));
        }
    }

    @Override
    protected void deleteNotExistPartitionStats(AnalysisInfo jobInfo) throws DdlException {
        TableStatsMeta tableStats = Env.getServingEnv().getAnalysisManager().findTableStatsStatus(tbl.getId());
        if (tableStats == null) {
            return;
        }
        OlapTable table = (OlapTable) tbl;
        String indexName = info.indexId == -1 ? table.getName() : table.getIndexNameById(info.indexId);
        ColStatsMeta columnStats = tableStats.findColumnStatsMeta(indexName, info.colName);
        if (columnStats == null || columnStats.partitionUpdateRows == null
                || columnStats.partitionUpdateRows.isEmpty()) {
            return;
        }
        // When a partition was dropped, partitionChanged will be set to true.
        // So we don't need to check dropped partition if partitionChanged is false.
        if (!tableStats.partitionChanged.get()
                && columnStats.partitionUpdateRows.size() == table.getPartitions().size()) {
            return;
        }
        Set<Long> expiredPartition = Sets.newHashSet();
        String columnCondition = "AND col_id = " + StatisticsUtil.quote(col.getName());
        for (long partId : columnStats.partitionUpdateRows.keySet()) {
            Partition partition = table.getPartition(partId);
            if (partition == null) {
                columnStats.partitionUpdateRows.remove(partId);
                tableStats.partitionUpdateRows.remove(partId);
                jobInfo.partitionUpdateRows.remove(partId);
                expiredPartition.add(partId);
                if (expiredPartition.size() == Config.max_allowed_in_element_num_of_delete) {
                    String partitionCondition = " AND part_id in (" + Joiner.on(", ").join(expiredPartition) + ")";
                    StatisticsRepository.dropPartitionsColumnStatistics(info.catalogId, info.dbId, info.tblId,
                            columnCondition, partitionCondition);
                    expiredPartition.clear();
                }
            }
        }
        if (expiredPartition.size() > 0) {
            String partitionCondition = " AND part_id in (" + Joiner.on(", ").join(expiredPartition) + ")";
            StatisticsRepository.dropPartitionsColumnStatistics(info.catalogId, info.dbId, info.tblId,
                    columnCondition, partitionCondition);
        }
    }

    @Override
    protected String getPartitionInfo(String partitionName) {
        return "partition " + partitionName;
    }

    @Override
    protected Map<String, String> buildSqlParams() {
        Map<String, String> params = new HashMap<>();
        params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
        params.put("columnStatTbl", StatisticConstants.TABLE_STATISTIC_TBL_NAME);
        params.put("catalogId", String.valueOf(catalog.getId()));
        params.put("dbId", String.valueOf(db.getId()));
        params.put("tblId", String.valueOf(tbl.getId()));
        params.put("idxId", String.valueOf(info.indexId));
        params.put("colId", StatisticsUtil.escapeSQL(String.valueOf(info.colName)));
        params.put("dataSizeFunction", getDataSizeFunction(col, false));
        params.put("catalogName", catalog.getName());
        params.put("dbName", db.getFullName());
        params.put("colName", StatisticsUtil.escapeColumnName(String.valueOf(info.colName)));
        params.put("tblName", String.valueOf(tbl.getName()));
        params.put("index", getIndex());
        params.put("preAggHint", "");
        return params;
    }

    protected String getIndex() {
        if (info.indexId == -1) {
            return "";
        } else {
            OlapTable olapTable = (OlapTable) this.tbl;
            return "index `" + olapTable.getIndexNameById(info.indexId) + "`";
        }
    }

    // For partition tables with single time type partition column, we'd better to skip sampling the partition
    // that contains all the history data. Because this partition may contain many old data which is not
    // visited by most queries. To sample this partition may cause the statistics not accurate.
    // For example, one table has 366 partitions, partition 1 ~ 365 store date for each day of the year from now.
    // Partition 0 stores all the history data earlier than 1 year. We want to skip sampling partition 0.
    protected long getSkipPartitionId(List<Partition> partitions) {
        if (partitions == null || partitions.size() < StatisticsUtil.getPartitionSampleCount()) {
            return NO_SKIP_TABLET_ID;
        }
        PartitionInfo partitionInfo = ((OlapTable) tbl).getPartitionInfo();
        if (!PartitionType.RANGE.equals(partitionInfo.getType())) {
            return NO_SKIP_TABLET_ID;
        }
        if (partitionInfo.getPartitionColumns().size() != 1) {
            return NO_SKIP_TABLET_ID;
        }
        Column column = partitionInfo.getPartitionColumns().get(0);
        if (!column.getType().isDateType()) {
            return NO_SKIP_TABLET_ID;
        }
        PartitionKey lowestKey = PartitionKey.createMaxPartitionKey();
        long lowestPartitionId = -1;
        for (Partition p : partitions) {
            RangePartitionItem item = (RangePartitionItem) partitionInfo.getItem(p.getId());
            Range<PartitionKey> items = item.getItems();
            if (!items.hasLowerBound()) {
                lowestPartitionId = p.getId();
                break;
            }
            if (items.lowerEndpoint().compareTo(lowestKey) < 0) {
                lowestKey = items.lowerEndpoint();
                lowestPartitionId = p.getId();
            }
        }
        return lowestPartitionId;
    }

    protected long pickSamplePartition(List<Partition> partitions, List<Long> pickedTabletIds, long skipPartitionId) {
        Partition partition = ((OlapTable) tbl).getPartition(skipPartitionId);
        long averageRowsPerPartition;
        if (partition != null) {
            LOG.debug("Going to skip partition {} in table {}", skipPartitionId, tbl.getName());
            // If we want to skip the oldest partition, calculate the average rows per partition value without
            // the oldest partition, otherwise if the oldest partition is very large, we may skip all partitions.
            // Because we only pick partitions which meet partitionRowCount >= averageRowsPerPartition.
            Preconditions.checkNotNull(partitions, "Partition list of table " + tbl.getName() + " is null");
            Preconditions.checkState(partitions.size() > 1, "Too few partitions in " + tbl.getName());
            averageRowsPerPartition = (tbl.getRowCount() - partition.getRowCount()) / (partitions.size() - 1);
        } else {
            averageRowsPerPartition = tbl.getRowCount() / partitions.size();
        }
        long indexId = info.indexId == -1 ? ((OlapTable) tbl).getBaseIndexId() : info.indexId;
        long pickedRows = 0;
        int pickedPartitionCount = 0;
        for (Partition p : partitions) {
            if (skipPartitionId == p.getId()) {
                LOG.info("Partition {} in table {} skipped", skipPartitionId, tbl.getName());
                continue;
            }
            long partitionRowCount = p.getRowCount();
            if (partitionRowCount >= averageRowsPerPartition) {
                pickedRows += partitionRowCount;
                pickedPartitionCount++;
                MaterializedIndex materializedIndex = p.getIndex(indexId);
                pickedTabletIds.addAll(materializedIndex.getTabletIdsInOrder());
            }
            if (pickedRows >= StatisticsUtil.getPartitionSampleRowCount()
                    || pickedPartitionCount >= StatisticsUtil.getPartitionSampleCount()) {
                break;
            }
        }
        return pickedRows;
    }

    @VisibleForTesting
    protected void setTable(OlapTable table) {
        tbl = table;
    }

    /**
     * For ordinary column (neither key column nor partition column), need to limit sample size to user specified value.
     * @return Return true when need to limit.
     */
    protected boolean needLimit() {
        if (scanFullTable) {
            return false;
        }
        // Key column is sorted, use limit will cause the ndv not accurate enough, so skip key columns.
        if (col.isKey() && !keyColumnSampleTooManyRows) {
            return false;
        }
        // Partition column need to scan tablets from all partitions.
        return !tbl.isPartitionColumn(col.getName());
    }

    /**
     * Calculate rows to sample based on user given sample value.
     * @return Rows to sample.
     */
    protected long getSampleRows() {
        long sampleRows;
        if (tableSample.isPercent()) {
            sampleRows = (long) Math.max(tbl.getRowCount() * (tableSample.getSampleValue() / 100.0), 1);
        } else {
            sampleRows = Math.max(tableSample.getSampleValue(), 1);
        }
        return sampleRows;
    }

    /**
     * Check if the task should use linear analyze template.
     * @return True for single unique key column and single distribution column.
     */
    protected boolean useLinearAnalyzeTemplate() {
        if (partitionColumnSampleTooManyRows || scanFullTable) {
            return true;
        }
        if (isSingleUniqueKey()) {
            return true;
        }
        String columnName = col.getName();
        if (columnName.startsWith(CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PREFIX)) {
            columnName = columnName.substring(CreateMaterializedViewStmt.MATERIALIZED_VIEW_NAME_PREFIX.length());
        }
        Set<String> distributionColumns = tbl.getDistributionColumnNames();
        return distributionColumns.size() == 1 && distributionColumns.contains(columnName.toLowerCase());
    }

    /**
     * Check if the olap table has a single unique key.
     * @return True if the table has a single unique/agg key. False otherwise.
     */
    protected boolean isSingleUniqueKey() {
        OlapTable olapTable = (OlapTable) this.tbl;
        List<Column> schema;
        KeysType keysType;
        if (info.indexId == -1) {
            schema = olapTable.getBaseSchema();
            keysType = olapTable.getKeysType();
        } else {
            MaterializedIndexMeta materializedIndexMeta = olapTable.getIndexIdToMeta().get(info.indexId);
            schema = materializedIndexMeta.getSchema();
            keysType = materializedIndexMeta.getKeysType();
        }

        int keysNum = 0;
        for (Column column : schema) {
            if (column.isKey()) {
                keysNum += 1;
            }
        }

        return col.isKey()
            && keysNum == 1
            && (keysType.equals(KeysType.UNIQUE_KEYS) || keysType.equals(KeysType.AGG_KEYS));
    }

    protected String concatColumnStatsId() {
        return info.tblId + "-" + info.indexId + "-" + info.colName;
    }

    @VisibleForTesting
    public void setKeyColumnSampleTooManyRows(boolean value) {
        keyColumnSampleTooManyRows = value;
    }

    @VisibleForTesting
    public void setPartitionColumnSampleTooManyRows(boolean value) {
        partitionColumnSampleTooManyRows = value;
    }

    @VisibleForTesting
    public void setScanFullTable(boolean value) {
        scanFullTable = value;
    }

    @VisibleForTesting
    public boolean scanFullTable() {
        return scanFullTable;
    }
}