StatisticsCleaner.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.statistics;

import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.persist.TableStatsDeletionLog;
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * Maintenance the internal statistics table.
 * Delete rows that corresponding DB/Table/Column not exists anymore.
 */
public class StatisticsCleaner extends MasterDaemon {

    private static final Logger LOG = LogManager.getLogger(StatisticsCleaner.class);

    private OlapTable colStatsTbl;
    private OlapTable partitionColStatsTbl;

    private Map<Long, CatalogIf<? extends DatabaseIf<? extends TableIf>>> idToCatalog;
    private Map<Long, DatabaseIf<? extends TableIf>> idToDb;
    private Map<Long, TableIf> idToTbl;

    private Map<Long, MaterializedIndexMeta> idToMVIdx;

    public StatisticsCleaner() {
        super("Statistics Table Cleaner",
                TimeUnit.HOURS.toMillis(StatisticConstants.STATISTIC_CLEAN_INTERVAL_IN_HOURS));
    }

    @Override
    protected void runAfterCatalogReady() {
        if (!Env.getCurrentEnv().isMaster()) {
            return;
        }
        clear();
    }

    public synchronized void clear() {
        clearTableStats();
        try {
            if (!init()) {
                return;
            }
            clearStats(colStatsTbl, true);
            clearStats(partitionColStatsTbl, false);
        } finally {
            colStatsTbl = null;
            idToCatalog = null;
            idToDb = null;
            idToTbl = null;
            idToMVIdx = null;
        }
    }

    private void clearStats(OlapTable statsTbl, boolean isTableColumnStats) {
        ExpiredStats expiredStats;
        long offset = 0;
        do {
            expiredStats = new ExpiredStats();
            offset = findExpiredStats(statsTbl, expiredStats, offset, isTableColumnStats);
            deleteExpiredStats(expiredStats, statsTbl.getName(), isTableColumnStats);
        } while (!expiredStats.isEmpty());
    }

    private void clearTableStats() {
        AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
        Set<Long> tableIds = analysisManager.getIdToTblStatsKeys();
        InternalCatalog internalCatalog = Env.getCurrentInternalCatalog();
        for (long id : tableIds) {
            try {
                TableStatsMeta stats = analysisManager.findTableStatsStatus(id);
                if (stats == null) {
                    continue;
                }
                // If ctlName, dbName and tblName exist, it means the table stats is created under new version.
                // First try to find the table by the given names. If table exists, means the tableMeta is valid,
                // it should be kept in memory.
                boolean tableExist = false;
                try {
                    TableIf table = StatisticsUtil.findTable(stats.ctlName, stats.dbName, stats.tblName);
                    // Tables may have identical names but different id, e.g. replace table.
                    tableExist = table.getId() == id;
                } catch (Exception e) {
                    LOG.debug("Table {}.{}.{} not found.", stats.ctlName, stats.dbName, stats.tblName);
                }
                // If we couldn't find table by names, try to find it in internal catalog by id.
                // This is to support older version which the tableStats object doesn't store the names
                // but only table id.
                // We may remove external table's tableStats here, but it's not a big problem.
                // Because the stats in column_statistics table is still available,
                // the only disadvantage is auto analyze may be triggered for this table again.
                // But it only happens once, the new table stats object will have all the catalog,
                // db and table names.
                // Also support REPLACE TABLE
                if (tableExist || tableExistInInternalCatalog(internalCatalog, id)) {
                    continue;
                }
                LOG.info("Table {}.{}.{} with id {} not exist, remove its table stats record.",
                        stats.ctlName, stats.dbName, stats.tblName, id);
                analysisManager.removeTableStats(id);
                Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(id));
            } catch (Exception e) {
                LOG.info(e);
            }
        }
    }

    private boolean tableExistInInternalCatalog(InternalCatalog internalCatalog, long tableId) {
        List<Long> dbIds = internalCatalog.getDbIds();
        for (long dbId : dbIds) {
            Database database = internalCatalog.getDbNullable(dbId);
            if (database == null) {
                continue;
            }
            TableIf table = database.getTableNullable(tableId);
            if (table != null) {
                return true;
            }
        }
        return false;
    }

    private boolean init() {
        try {
            String dbName = FeConstants.INTERNAL_DB_NAME;
            colStatsTbl = (OlapTable) StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
                dbName, StatisticConstants.TABLE_STATISTIC_TBL_NAME);
            partitionColStatsTbl = (OlapTable) StatisticsUtil.findTable(InternalCatalog.INTERNAL_CATALOG_NAME,
                dbName, StatisticConstants.PARTITION_STATISTIC_TBL_NAME);
        } catch (Throwable t) {
            LOG.warn("Failed to init stats cleaner", t);
            return false;
        }

        idToCatalog = Env.getCurrentEnv().getCatalogMgr().getIdToCatalog();
        idToDb = constructDbMap();
        idToTbl = constructTblMap();
        idToMVIdx = constructIdxMap();
        return true;
    }

    private Map<Long, DatabaseIf<? extends TableIf>> constructDbMap() {
        Map<Long, DatabaseIf<? extends TableIf>> idToDb = Maps.newHashMap();
        Collection<DatabaseIf<? extends TableIf>> internalDBs = Env.getCurrentEnv().getInternalCatalog().getAllDbs();
        for (DatabaseIf<? extends TableIf> db : internalDBs) {
            idToDb.put(db.getId(), db);
        }
        return idToDb;
    }

    private Map<Long, TableIf> constructTblMap() {
        Map<Long, TableIf> idToTbl = new HashMap<>();
        for (DatabaseIf<? extends TableIf> db : idToDb.values()) {
            for (TableIf tbl : db.getTables()) {
                idToTbl.put(tbl.getId(), tbl);
            }
        }
        return idToTbl;
    }

    private Map<Long, MaterializedIndexMeta> constructIdxMap() {
        Map<Long, MaterializedIndexMeta> idToMVIdx = new HashMap<>();
        for (TableIf t : idToTbl.values()) {
            if (t instanceof OlapTable) {
                OlapTable olapTable = (OlapTable) t;
                olapTable.getCopyOfIndexIdToMeta()
                        .entrySet()
                        .stream()
                        .filter(idx -> idx.getValue().getDefineStmt() != null)
                        .forEach(e -> idToMVIdx.put(e.getKey(), e.getValue()));
            }
        }
        return idToMVIdx;
    }

    private void deleteExpiredStats(ExpiredStats expiredStats, String tblName, boolean isTableColumnStats) {
        doDelete("catalog_id", expiredStats.expiredCatalog.stream()
                        .map(String::valueOf).collect(Collectors.toList()),
                FeConstants.INTERNAL_DB_NAME + "." + tblName);
        doDelete("db_id", expiredStats.expiredDatabase.stream()
                        .map(String::valueOf).collect(Collectors.toList()),
                FeConstants.INTERNAL_DB_NAME + "." + tblName);
        doDelete("tbl_id", expiredStats.expiredTable.stream()
                        .map(String::valueOf).collect(Collectors.toList()),
                FeConstants.INTERNAL_DB_NAME + "." + tblName);
        doDelete("idx_id", expiredStats.expiredIdxId.stream()
                        .map(String::valueOf).collect(Collectors.toList()),
                FeConstants.INTERNAL_DB_NAME + "." + tblName);
        // Partition level column stats doesn't need to do the following deletion.
        // 1. For invalid part id, do it in next analyze
        // 2. Partition stats table doesn't have id column.
        if (isTableColumnStats) {
            doDelete("part_id", expiredStats.expiredPartitionId.stream()
                    .map(String::valueOf).collect(Collectors.toList()),
                    FeConstants.INTERNAL_DB_NAME + "." + tblName);
            doDelete("id", expiredStats.ids.stream()
                    .map(String::valueOf).collect(Collectors.toList()),
                    FeConstants.INTERNAL_DB_NAME + "." + tblName);
        }
    }

    private void doDelete(String colName, List<String> pred, String tblName) {
        String deleteTemplate = "DELETE FROM " + tblName + " WHERE ${left} IN (${right})";
        if (CollectionUtils.isEmpty(pred)) {
            return;
        }
        String right = pred.stream().map(s -> "'" + s + "'").collect(Collectors.joining(","));
        Map<String, String> params = new HashMap<>();
        params.put("left", colName);
        params.put("right", right);
        String sql = new StringSubstitutor(params).replace(deleteTemplate);
        try {
            StatisticsUtil.execUpdate(sql);
        } catch (Exception e) {
            LOG.warn("Failed to delete expired stats!", e);
        }
    }

    private long findExpiredStats(OlapTable statsTbl, ExpiredStats expiredStats,
                                  long offset, boolean isTableColumnStats) {
        long pos = offset;
        while (pos < statsTbl.getRowCount() && !expiredStats.isFull()) {
            List<ResultRow> rows = StatisticsRepository.fetchStatsFullName(
                    StatisticConstants.FETCH_LIMIT, pos, isTableColumnStats);
            pos += StatisticConstants.FETCH_LIMIT;
            for (ResultRow r : rows) {
                try {
                    StatsId statsId = new StatsId(r);
                    String id = statsId.id;
                    long catalogId = statsId.catalogId;
                    if (!idToCatalog.containsKey(catalogId)) {
                        expiredStats.expiredCatalog.add(catalogId);
                        continue;
                    }
                    // Skip check external DBs and tables to avoid fetch too much metadata.
                    // Remove expired external table stats only when the external catalog is dropped.
                    // TODO: Need to check external database and table exist or not. But for now, we only check catalog.
                    // Because column_statistics table only keep table id and db id.
                    // But meta data doesn't always cache all external tables' ids.
                    // So we may fail to find the external table only by id. Need to use db name and table name instead.
                    // Have to store db name and table name in column_statistics in the future.
                    if (catalogId != InternalCatalog.INTERNAL_CATALOG_ID) {
                        continue;
                    }
                    long dbId = statsId.dbId;
                    if (!idToDb.containsKey(dbId)) {
                        expiredStats.expiredDatabase.add(dbId);
                        continue;
                    }
                    long tblId = statsId.tblId;
                    if (!idToTbl.containsKey(tblId)) {
                        expiredStats.expiredTable.add(tblId);
                        continue;
                    }

                    long idxId = statsId.idxId;
                    if (idxId != -1 && !idToMVIdx.containsKey(idxId)) {
                        expiredStats.expiredIdxId.add(idxId);
                        continue;
                    }

                    TableIf t = idToTbl.get(tblId);
                    String colId = statsId.colId;
                    if (!StatisticsUtil.isMvColumn(t, colId) && t.getColumn(colId) == null) {
                        expiredStats.ids.add(id);
                        continue;
                    }
                    if (!(t instanceof OlapTable)) {
                        continue;
                    }
                    // part_id should always be NULL in column_statistics table.
                    String partId = statsId.partId;
                    if (partId == null) {
                        continue;
                    }
                    expiredStats.expiredPartitionId.add(partId);
                } catch (Exception e) {
                    LOG.warn("Error occurred when retrieving expired stats", e);
                }
            }
            this.yieldForOtherTask();
        }
        return pos;
    }

    private static class ExpiredStats {
        Set<Long> expiredCatalog = new HashSet<>();
        Set<Long> expiredDatabase = new HashSet<>();
        Set<Long> expiredTable = new HashSet<>();
        Set<Long> expiredIdxId = new HashSet<>();
        Set<String> expiredPartitionId = new HashSet<>();
        Set<String> ids = new HashSet<>();

        public boolean isFull() {
            return expiredCatalog.size() >= Config.max_allowed_in_element_num_of_delete
                    || expiredDatabase.size() >= Config.max_allowed_in_element_num_of_delete
                    || expiredTable.size() >= Config.max_allowed_in_element_num_of_delete
                    || expiredIdxId.size() >= Config.max_allowed_in_element_num_of_delete
                    || expiredPartitionId.size() >= Config.max_allowed_in_element_num_of_delete
                    || ids.size() >= Config.max_allowed_in_element_num_of_delete;
        }

        public boolean isEmpty() {
            return expiredCatalog.isEmpty()
                    && expiredDatabase.isEmpty()
                    && expiredTable.isEmpty()
                    && expiredIdxId.isEmpty()
                    && expiredPartitionId.isEmpty()
                    && ids.size() < Config.max_allowed_in_element_num_of_delete / 10;
        }
    }

    // Avoid this task takes too much IO.
    private void yieldForOtherTask() {
        try {
            Thread.sleep(StatisticConstants.FETCH_INTERVAL_IN_MS);
        } catch (InterruptedException t) {
            // IGNORE
        }
    }

}