StatisticsAutoCollector.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.statistics;

import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
import org.apache.doris.statistics.util.StatisticsUtil;

import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class StatisticsAutoCollector extends MasterDaemon {

    private static final Logger LOG = LogManager.getLogger(StatisticsAutoCollector.class);

    protected final AnalysisTaskExecutor analysisTaskExecutor;
    // Waited flag. Wait once when FE started for TabletStatMgr has received BE report at least once.
    // This couldn't guarantee getRowCount will return up-to-date value,
    // but could reduce the chance to get wrong row count. e.g. 0 after FE restart.
    private boolean waited = false;

    public StatisticsAutoCollector() {
        super("Automatic Analyzer", TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes));
        this.analysisTaskExecutor = new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num,
                StatisticConstants.TASK_QUEUE_CAP, "Auto Analysis Job Executor");
    }

    @Override
    protected void runAfterCatalogReady() {
        if (!Env.getCurrentEnv().isMaster()) {
            return;
        }
        if (!StatisticsUtil.statsTblAvailable()) {
            LOG.info("Stats table not available, skip");
            return;
        }
        if (Env.isCheckpointThread()) {
            return;
        }
        if (waited) {
            collect();
        } else {
            try {
                Thread.sleep((long) Config.tablet_stat_update_interval_second * 1000 * 2);
                waited = true;
            } catch (InterruptedException e) {
                LOG.info("Wait Sleep interrupted.", e);
            }
        }
    }

    protected void collect() {
        while (StatisticsUtil.canCollect()) {
            Pair<Entry<TableName, Set<Pair<String, String>>>, JobPriority> job = getJob();
            if (job == null) {
                // No more job to process, break and sleep.
                LOG.info("No auto analyze jobs to process.");
                break;
            }
            try {
                TableName tblName = job.first.getKey();
                TableIf table = StatisticsUtil.findTable(tblName.getCtl(), tblName.getDb(), tblName.getTbl());
                if (!supportAutoAnalyze(table)) {
                    continue;
                }
                processOneJob(table, job.first.getValue(), job.second);
            } catch (Exception e) {
                LOG.warn("Failed to analyze table {} with columns [{}]", job.first.getKey().getTbl(),
                        job.first.getValue().stream().map(Pair::toString).collect(Collectors.joining(",")), e);
            }
        }
    }

    protected Pair<Entry<TableName, Set<Pair<String, String>>>, JobPriority> getJob() {
        AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
        Optional<Entry<TableName, Set<Pair<String, String>>>> job = fetchJobFromMap(manager.highPriorityJobs);
        if (job.isPresent()) {
            return Pair.of(job.get(), JobPriority.HIGH);
        }
        job = fetchJobFromMap(manager.midPriorityJobs);
        if (job.isPresent()) {
            return Pair.of(job.get(), JobPriority.MID);
        }
        job = fetchJobFromMap(manager.lowPriorityJobs);
        if (job.isPresent()) {
            return Pair.of(job.get(), JobPriority.LOW);
        }
        job = fetchJobFromMap(manager.veryLowPriorityJobs);
        return job.map(tableNameSetEntry -> Pair.of(tableNameSetEntry, JobPriority.VERY_LOW)).orElse(null);
    }

    protected Optional<Map.Entry<TableName, Set<Pair<String, String>>>> fetchJobFromMap(
            Map<TableName, Set<Pair<String, String>>> jobMap) {
        synchronized (jobMap) {
            Optional<Map.Entry<TableName, Set<Pair<String, String>>>> first = jobMap.entrySet().stream().findFirst();
            first.ifPresent(entry -> jobMap.remove(entry.getKey()));
            return first;
        }
    }

    protected void processOneJob(TableIf table, Set<Pair<String, String>> columns,
            JobPriority priority) throws DdlException {
        appendAllColumns(table, columns);
        AnalysisMethod analysisMethod = table.getDataSize(true) >= StatisticsUtil.getHugeTableLowerBoundSizeInBytes()
                ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
        if (StatisticsUtil.enablePartitionAnalyze() && table.isPartitionedTable()) {
            analysisMethod = AnalysisMethod.FULL;
        }
        boolean isSampleAnalyze = analysisMethod.equals(AnalysisMethod.SAMPLE);
        OlapTable olapTable = table instanceof OlapTable ? (OlapTable) table : null;
        columns = columns.stream()
                .filter(c -> StatisticsUtil.needAnalyzeColumn(table, c) || StatisticsUtil.isLongTimeColumn(table, c))
                .filter(c -> olapTable == null || StatisticsUtil.canCollectColumn(
                        olapTable.getIndexMetaByIndexId(olapTable.getIndexIdByName(c.first)).getColumnByName(c.second),
                        table, isSampleAnalyze, olapTable.getIndexIdByName(c.first)))
            .collect(Collectors.toSet());
        AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns, priority, analysisMethod);
        if (analyzeJob == null) {
            return;
        }
        LOG.debug("Auto analyze job : {}", analyzeJob.toString());
        try {
            executeSystemAnalysisJob(analyzeJob);
        } catch (Exception e) {
            StringJoiner stringJoiner = new StringJoiner(",", "[", "]");
            for (Pair<String, String> pair : columns) {
                stringJoiner.add(pair.toString());
            }
            LOG.warn("Fail to auto analyze table {}, columns [{}]", table.getName(), stringJoiner.toString());
        }
    }

    // If partition changed (partition first loaded, partition dropped and so on), need re-analyze all columns.
    protected void appendAllColumns(TableIf table, Set<Pair<String, String>> columns) throws DdlException {
        if (!(table instanceof OlapTable)) {
            return;
        }
        AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
        TableStatsMeta tableStatsStatus = manager.findTableStatsStatus(table.getId());
        if (tableStatsStatus != null && tableStatsStatus.partitionChanged.get()) {
            OlapTable olapTable = (OlapTable) table;
            Set<String> allColumnPairs = olapTable.getSchemaAllIndexes(false).stream()
                    .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
                    .map(Column::getName)
                    .collect(Collectors.toSet());
            columns.addAll(olapTable.getColumnIndexPairs(allColumnPairs));
        }
    }

    protected boolean supportAutoAnalyze(TableIf tableIf) {
        if (tableIf == null) {
            return false;
        }
        return tableIf instanceof OlapTable
                || tableIf instanceof HMSExternalTable
                && ((HMSExternalTable) tableIf).getDlaType().equals(HMSExternalTable.DLAType.HIVE);
    }

    protected AnalysisInfo createAnalyzeJobForTbl(
            TableIf table, Set<Pair<String, String>> jobColumns, JobPriority priority, AnalysisMethod analysisMethod) {
        AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
        TableStatsMeta tableStatsStatus = manager.findTableStatsStatus(table.getId());
        if (table instanceof OlapTable && analysisMethod.equals(AnalysisMethod.SAMPLE)) {
            OlapTable ot = (OlapTable) table;
            if (ot.getRowCountForIndex(ot.getBaseIndexId(), true) == TableIf.UNKNOWN_ROW_COUNT) {
                LOG.info("Table {} row count is not fully reported, skip auto analyzing this time.", ot.getName());
                return null;
            }
        }
        // We don't auto analyze empty table to avoid all 0 stats.
        // Because all 0 is more dangerous than unknown stats when row count report is delayed.
        long rowCount = table.getRowCount();
        if (rowCount <= 0) {
            LOG.info("Table {} is empty, remove its old stats and skip auto analyze it.", table.getName());
            // Remove the table's old stats if exists.
            if (tableStatsStatus != null && !tableStatsStatus.isColumnsStatsEmpty()) {
                manager.dropStats(table, null);
            }
            return null;
        }
        if (jobColumns == null || jobColumns.isEmpty()) {
            return null;
        }
        LOG.info("Auto analyze table {} row count is {}", table.getName(), rowCount);
        StringJoiner stringJoiner = new StringJoiner(",", "[", "]");
        for (Pair<String, String> pair : jobColumns) {
            stringJoiner.add(pair.toString());
        }
        return new AnalysisInfoBuilder()
                .setJobId(Env.getCurrentEnv().getNextId())
                .setCatalogId(table.getDatabase().getCatalog().getId())
                .setDBId(table.getDatabase().getId())
                .setTblId(table.getId())
                .setColName(stringJoiner.toString())
                .setJobColumns(jobColumns)
                .setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
                .setAnalysisMethod(analysisMethod)
                .setPartitionNames(Collections.emptySet())
                .setSampleRows(analysisMethod.equals(AnalysisMethod.SAMPLE)
                    ? StatisticsUtil.getHugeTableSampleRows() : -1)
                .setScheduleType(ScheduleType.AUTOMATIC)
                .setState(AnalysisState.PENDING)
                .setTaskIds(new ArrayList<>())
                .setLastExecTimeInMs(System.currentTimeMillis())
                .setJobType(JobType.SYSTEM)
                .setTblUpdateTime(table.getUpdateTime())
                .setRowCount(rowCount)
                .setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get())
                .setTableVersion(table instanceof OlapTable ? ((OlapTable) table).getVisibleVersion() : 0)
                .setPriority(priority)
                .setPartitionUpdateRows(tableStatsStatus == null ? null : tableStatsStatus.partitionUpdateRows)
                .setEnablePartition(StatisticsUtil.enablePartitionAnalyze())
                .build();
    }

    // Analysis job created by the system
    @VisibleForTesting
    protected void executeSystemAnalysisJob(AnalysisInfo jobInfo)
            throws DdlException, ExecutionException, InterruptedException {
        Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
        AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
        analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false);
        Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values());
        Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTasks);
        Future<?>[] futures = new Future[analysisTasks.values().size()];
        int i = 0;
        for (BaseAnalysisTask task : analysisTasks.values()) {
            futures[i++] = analysisTaskExecutor.submitTask(task);
        }
        for (Future future : futures) {
            future.get();
        }
    }

    public boolean isReady() {
        return waited;
    }
}