ConsistencyChecker.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.consistency;

import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MetaObject;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.consistency.CheckConsistencyJob.JobState;
import org.apache.doris.persist.ConsistencyCheckInfo;
import org.apache.doris.task.CheckConsistencyTask;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Calendar;
import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ConsistencyChecker extends MasterDaemon {
    private static final Logger LOG = LogManager.getLogger(ConsistencyChecker.class);

    private static final int MAX_JOB_NUM = 100;

    private static final Comparator<MetaObject> COMPARATOR =
            (first, second) -> Long.signum(first.getLastCheckTime() - second.getLastCheckTime());

    // tabletId -> job
    private Map<Long, CheckConsistencyJob> jobs;

    /*
     * ATTN:
     *      lock order is:
     *       jobs lock
     *       CheckConsistencyJob's synchronized
     *       db lock
     *
     * if reversal is inevitable. use db.tryLock() instead to avoid dead lock
     */
    private ReentrantReadWriteLock jobsLock;

    private int startTime;
    private int endTime;

    public ConsistencyChecker() {
        super("consistency checker");

        jobs = Maps.newHashMap();
        jobsLock = new ReentrantReadWriteLock();

        if (!initWorkTime()) {
            LOG.error("failed to init time in ConsistencyChecker. exit");
            System.exit(-1);
        }
    }

    private boolean initWorkTime() {
        Date startDate = TimeUtils.getHourAsDate(Config.consistency_check_start_time);
        Date endDate = TimeUtils.getHourAsDate(Config.consistency_check_end_time);

        if (startDate == null || endDate == null) {
            return false;
        }

        Calendar calendar = Calendar.getInstance();

        calendar.setTime(startDate);
        startTime = calendar.get(Calendar.HOUR_OF_DAY);

        calendar.setTime(endDate);
        endTime = calendar.get(Calendar.HOUR_OF_DAY);

        LOG.info("consistency checker will work from {}:00 to {}:00", startTime, endTime);
        return true;
    }

    @Override
    protected void runAfterCatalogReady() {
        // for each round. try chose enough new tablets to check
        // only add new job when it's work time
        if (itsTime() && getJobNum() == 0) {
            List<Long> chosenTabletIds = chooseTablets();
            for (Long tabletId : chosenTabletIds) {
                CheckConsistencyJob job = new CheckConsistencyJob(tabletId);
                addJob(job);
            }
        }

        jobsLock.writeLock().lock();
        try {
            // handle all jobs
            Iterator<Map.Entry<Long, CheckConsistencyJob>> iterator = jobs.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<Long, CheckConsistencyJob> entry = iterator.next();
                CheckConsistencyJob oneJob = entry.getValue();

                JobState state = oneJob.getState();
                switch (state) {
                    case PENDING:
                        if (!oneJob.sendTasks()) {
                            clearJob(oneJob);
                            iterator.remove();
                        }
                        break;
                    case RUNNING:
                        int res = oneJob.tryFinishJob();
                        if (res == -1 || res == 1) {
                            // cancelled or finished
                            clearJob(oneJob);
                            iterator.remove();
                        }
                        break;
                    default:
                        break;
                }
            } // end while
        } finally {
            jobsLock.writeLock().unlock();
        }
    }

    /*
     * check if time comes
     */
    private boolean itsTime() {
        if (startTime == endTime) {
            return false;
        }

        Calendar calendar = Calendar.getInstance();
        calendar.setTimeInMillis(System.currentTimeMillis());
        int currentTime = calendar.get(Calendar.HOUR_OF_DAY);

        boolean isTime = false;
        if (startTime < endTime) {
            if (currentTime >= startTime && currentTime <= endTime) {
                isTime = true;
            } else {
                isTime = false;
            }
        } else {
            // startTime > endTime (across the day)
            if (currentTime >= startTime || currentTime <= endTime) {
                isTime = true;
            } else {
                isTime = false;
            }
        }

        if (!isTime) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("current time is {}:00, waiting to {}:00 to {}:00",
                          currentTime, startTime, endTime);
            }
        }

        return isTime;
    }

    private void clearJob(CheckConsistencyJob job) {
        job.clear();
        if (LOG.isDebugEnabled()) {
            LOG.debug("tablet[{}] consistency checking job is cleared", job.getTabletId());
        }
    }

    private boolean addJob(CheckConsistencyJob job) {
        this.jobsLock.writeLock().lock();
        try {
            if (jobs.containsKey(job.getTabletId())) {
                return false;
            } else {
                LOG.info("add tablet[{}] to check consistency", job.getTabletId());
                jobs.put(job.getTabletId(), job);
                return true;
            }
        } finally {
            this.jobsLock.writeLock().unlock();
        }
    }

    private CheckConsistencyJob getJob(long tabletId) {
        this.jobsLock.readLock().lock();
        try {
            return jobs.get(tabletId);
        } finally {
            this.jobsLock.readLock().unlock();
        }
    }

    private int getJobNum() {
        this.jobsLock.readLock().lock();
        try {
            return jobs.size();
        } finally {
            this.jobsLock.readLock().unlock();
        }
    }

    /**
     *  choose a tablet to check it's consistency
     *  we use a priority queue to sort db/table/partition/index/tablet by 'lastCheckTime'.
     *  chose a tablet which has the smallest 'lastCheckTime'.
     */
    private List<Long> chooseTablets() {
        Env env = Env.getCurrentEnv();
        MetaObject chosenOne = null;

        List<Long> chosenTablets = Lists.newArrayList();

        // sort dbs
        List<Long> dbIds = env.getInternalCatalog().getDbIds();
        if (dbIds.isEmpty()) {
            return chosenTablets;
        }
        Queue<MetaObject> dbQueue = new PriorityQueue<>(Math.max(dbIds.size(), 1), COMPARATOR);
        for (Long dbId : dbIds) {
            if (dbId == 0L) {
                // skip 'information_schema' database
                continue;
            }
            Database db = env.getInternalCatalog().getDbNullable(dbId);
            if (db == null) {
                continue;
            }
            dbQueue.add(db);
        }

        // must lock jobsLock first to obey the lock order rule
        this.jobsLock.readLock().lock();
        try {
            while ((chosenOne = dbQueue.poll()) != null) {
                Database db = (Database) chosenOne;
                List<Table> tables = db.getTables();
                // sort tables
                Queue<MetaObject> tableQueue = new PriorityQueue<>(Math.max(tables.size(), 1), COMPARATOR);
                for (Table table : tables) {
                    if (!table.isManagedTable()) {
                        continue;
                    }
                    tableQueue.add(table);
                }

                while ((chosenOne = tableQueue.poll()) != null) {
                    OlapTable table = (OlapTable) chosenOne;
                    table.readLock();
                    try {
                        // sort partitions
                        Queue<MetaObject> partitionQueue =
                                new PriorityQueue<>(Math.max(table.getAllPartitions().size(), 1), COMPARATOR);
                        for (Partition partition : table.getPartitions()) {
                            // check partition's replication num. if 1 replication. skip
                            if (table.getPartitionInfo().getReplicaAllocation(
                                    partition.getId()).getTotalReplicaNum() == (short) 1) {
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("partition[{}]'s replication num is 1. ignore", partition.getId());
                                }
                                continue;
                            }

                            // check if this partition has no data
                            if (partition.getVisibleVersion() == Partition.PARTITION_INIT_VERSION) {
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("partition[{}]'s version is {}. ignore", partition.getId(),
                                            Partition.PARTITION_INIT_VERSION);
                                }
                                continue;
                            }
                            partitionQueue.add(partition);
                        }

                        while ((chosenOne = partitionQueue.poll()) != null) {
                            Partition partition = (Partition) chosenOne;

                            // sort materializedIndices
                            List<MaterializedIndex> visibleIndexes
                                    = partition.getMaterializedIndices(IndexExtState.VISIBLE);
                            Queue<MetaObject> indexQueue
                                    = new PriorityQueue<>(Math.max(visibleIndexes.size(), 1), COMPARATOR);
                            indexQueue.addAll(visibleIndexes);

                            while ((chosenOne = indexQueue.poll()) != null) {
                                MaterializedIndex index = (MaterializedIndex) chosenOne;

                                // sort tablets
                                Queue<MetaObject> tabletQueue
                                        = new PriorityQueue<>(Math.max(index.getTablets().size(), 1), COMPARATOR);
                                tabletQueue.addAll(index.getTablets());

                                while ((chosenOne = tabletQueue.poll()) != null) {
                                    Tablet tablet = (Tablet) chosenOne;
                                    long chosenTabletId = tablet.getId();

                                    if (this.jobs.containsKey(chosenTabletId)) {
                                        continue;
                                    }

                                    // check if version has already been checked
                                    if (partition.getVisibleVersion() == tablet.getCheckedVersion()) {
                                        if (tablet.isConsistent()) {
                                            if (LOG.isDebugEnabled()) {
                                                LOG.debug("tablet[{}]'s version[{}] has been checked. ignore",
                                                        chosenTabletId, tablet.getCheckedVersion());
                                            }
                                        }
                                    } else {
                                        LOG.info("chose tablet[{}-{}-{}-{}-{}] to check consistency", db.getId(),
                                                table.getId(), partition.getId(), index.getId(), chosenTabletId);

                                        chosenTablets.add(chosenTabletId);
                                    }
                                } // end while tabletQueue
                            } // end while indexQueue

                            if (chosenTablets.size() >= MAX_JOB_NUM) {
                                return chosenTablets;
                            }
                        } // end while partitionQueue
                    } finally {
                        table.readUnlock();
                    }
                } // end while tableQueue
            } // end while dbQueue
        } finally {
            jobsLock.readLock().unlock();
        }

        return chosenTablets;
    }

    public void handleFinishedConsistencyCheck(CheckConsistencyTask task, long checksum) {
        long tabletId = task.getTabletId();
        long backendId = task.getBackendId();

        CheckConsistencyJob job = getJob(tabletId);
        if (job == null) {
            LOG.warn("cannot find {} job[{}]", task.getTaskType().name(), tabletId);
            return;
        }

        job.handleFinishedReplica(backendId, checksum);
    }

    public void replayFinishConsistencyCheck(ConsistencyCheckInfo info, Env env) throws MetaNotFoundException {
        Database db = env.getInternalCatalog().getDbOrMetaException(info.getDbId());
        OlapTable table = (OlapTable) db.getTableOrMetaException(info.getTableId());
        table.writeLock();
        try {
            Partition partition = table.getPartition(info.getPartitionId());
            MaterializedIndex index = partition.getIndex(info.getIndexId());
            Tablet tablet = index.getTablet(info.getTabletId());

            long lastCheckTime = info.getLastCheckTime();
            db.setLastCheckTime(lastCheckTime);
            table.setLastCheckTime(lastCheckTime);
            partition.setLastCheckTime(lastCheckTime);
            index.setLastCheckTime(lastCheckTime);
            tablet.setLastCheckTime(lastCheckTime);
            tablet.setCheckedVersion(info.getCheckedVersion());

            tablet.setIsConsistent(info.isConsistent());
        } finally {
            table.writeUnlock();
        }
    }

    // manually adding tablets to check
    public void addTabletsToCheck(List<Long> tabletIds) {
        for (Long tabletId : tabletIds) {
            CheckConsistencyJob job = new CheckConsistencyJob(tabletId);
            addJob(job);
        }
    }
}