MasterImpl.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.master;

import org.apache.doris.alter.AlterJobV2.JobType;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Partition.PartitionState;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.cloud.catalog.CloudTablet;
import org.apache.doris.cloud.master.CloudReportHandler;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.load.DeleteJob;
import org.apache.doris.load.loadv2.IngestionLoadJob;
import org.apache.doris.load.loadv2.SparkLoadJob;
import org.apache.doris.system.Backend;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterInvertedIndexTask;
import org.apache.doris.task.AlterReplicaTask;
import org.apache.doris.task.CalcDeleteBitmapTask;
import org.apache.doris.task.CheckConsistencyTask;
import org.apache.doris.task.ClearAlterTask;
import org.apache.doris.task.CloneTask;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.task.PushCooldownConfTask;
import org.apache.doris.task.PushTask;
import org.apache.doris.task.SnapshotTask;
import org.apache.doris.task.StorageMediaMigrationTask;
import org.apache.doris.task.UpdateTabletMetaInfoTask;
import org.apache.doris.task.UploadTask;
import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TMasterResult;
import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TReportRequest;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTabletInfo;
import org.apache.doris.thrift.TTaskType;

import com.google.common.base.Preconditions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class MasterImpl {
    private static final Logger LOG = LogManager.getLogger(MasterImpl.class);

    private ReportHandler reportHandler =  Config.isCloudMode() ? new CloudReportHandler() : new ReportHandler();

    public MasterImpl() {
        reportHandler.start();
    }

    public TMasterResult finishTask(TFinishTaskRequest request) {
        TMasterResult result = null;
        long startTime = System.currentTimeMillis();
        try {
            result = finishTaskInternal(request);
            return result;
        } finally {
            long endTime = System.currentTimeMillis();
            long duration = endTime - startTime;
            if (result != null && duration > 1000L) {
                LOG.warn("Finish task rpc exceeded 1s, request={}, result={}", request, result);
            }
        }
    }

    public TMasterResult finishTaskInternal(TFinishTaskRequest request) {
        TMasterResult result = new TMasterResult();
        TStatus tStatus = new TStatus(TStatusCode.OK);
        result.setStatus(tStatus);
        // check task status
        // retry task by report process
        TStatus taskStatus = request.getTaskStatus();
        TTaskType taskType = request.getTaskType();
        long signature = request.getSignature();
        if (LOG.isDebugEnabled()) {
            LOG.debug("get task report: {}", request);
        }

        if (taskStatus.getStatusCode() != TStatusCode.OK && taskType != TTaskType.PUBLISH_VERSION) {
            LOG.warn("finish task reports bad. request: {}", request);
        }

        // get backend
        TBackend tBackend = request.getBackend();
        String host = tBackend.getHost();
        int bePort = tBackend.getBePort();
        Backend backend = Env.getCurrentSystemInfo().getBackendWithBePort(host, bePort);
        if (backend == null) {
            tStatus.setStatusCode(TStatusCode.CANCELLED);
            List<String> errorMsgs = new ArrayList<>();
            errorMsgs.add("backend not exist.");
            tStatus.setErrorMsgs(errorMsgs);
            LOG.warn("backend does not found. host: {}, be port: {}. task: {}", host, bePort, request);
            return result;
        }

        long backendId = backend.getId();

        AgentTask task = AgentTaskQueue.getTask(backendId, taskType, signature);
        if (task == null) {
            if (taskType != TTaskType.DROP && taskType != TTaskType.RELEASE_SNAPSHOT
                    && taskType != TTaskType.CLEAR_TRANSACTION_TASK) {
                String errMsg = "cannot find task. type: " + taskType + ", backendId: " + backendId
                        + ", signature: " + signature;
                LOG.warn(errMsg);
                tStatus.setStatusCode(TStatusCode.CANCELLED);
                List<String> errorMsgs = new ArrayList<String>();
                errorMsgs.add(errMsg);
                tStatus.setErrorMsgs(errorMsgs);
            } else {
                LOG.warn("Finish task rpc got null task for request={}", request);
            }
            return result;
        } else {
            if (taskStatus.getStatusCode() != TStatusCode.OK) {
                task.failed();
                if (taskType == TTaskType.PUBLISH_VERSION) {
                    boolean needLog = (Config.publish_version_task_failed_log_threshold < 0
                            || task.getFailedTimes() <= Config.publish_version_task_failed_log_threshold);
                    if (needLog) {
                        LOG.warn("finish task reports bad. request: {}", request);
                    }
                }
                String errMsg = "task type: " + taskType + ", status_code: " + taskStatus.getStatusCode().toString()
                        + (taskStatus.isSetErrorMsgs() ? (", status_message: " + taskStatus.getErrorMsgs()) : "")
                        + ", backendId: " + backend + ", signature: " + signature;
                task.setErrorMsg(errMsg);
                task.setErrorCode(taskStatus.getStatusCode());
                // We start to let FE perceive the task's error msg
                if (taskType != TTaskType.MAKE_SNAPSHOT && taskType != TTaskType.UPLOAD
                        && taskType != TTaskType.DOWNLOAD && taskType != TTaskType.MOVE
                        && taskType != TTaskType.CLONE && taskType != TTaskType.PUBLISH_VERSION
                        && taskType != TTaskType.CREATE && taskType != TTaskType.UPDATE_TABLET_META_INFO
                        && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE
                        && taskType != TTaskType.CALCULATE_DELETE_BITMAP
                        && taskType != TTaskType.REALTIME_PUSH) {
                    return result;
                }
            }
        }

        try {
            switch (taskType) {
                case CREATE:
                    Preconditions.checkState(request.isSetReportVersion());
                    finishCreateReplica(task, request);
                    break;
                case REALTIME_PUSH:
                    Preconditions.checkState(request.isSetReportVersion());
                    finishRealtimePush(task, request);
                    break;
                case PUBLISH_VERSION:
                    finishPublishVersion(task, request);
                    break;
                case CLEAR_ALTER_TASK:
                    finishClearAlterTask(task, request);
                    break;
                case DROP:
                    finishDropReplica(task);
                    break;
                case SCHEMA_CHANGE:
                case ROLLUP:
                    throw new RuntimeException("Schema change and rollup job is not used any more,"
                            + " use alter task instead");
                case CLONE:
                    finishClone(task, request);
                    break;
                case STORAGE_MEDIUM_MIGRATE:
                    finishStorageMediumMigrate(task, request);
                    break;
                case CHECK_CONSISTENCY:
                    finishConsistencyCheck(task, request);
                    break;
                case MAKE_SNAPSHOT:
                    finishMakeSnapshot(task, request);
                    break;
                case UPLOAD:
                    finishUpload(task, request);
                    break;
                case DOWNLOAD:
                    finishDownloadTask(task, request);
                    break;
                case MOVE:
                    finishMoveDirTask(task, request);
                    break;
                case RECOVER_TABLET:
                    finishRecoverTablet(task);
                    break;
                case ALTER:
                    finishAlterTask(task, request);
                    break;
                case ALTER_INVERTED_INDEX:
                    finishAlterInvertedIndexTask(task, request);
                    break;
                case UPDATE_TABLET_META_INFO:
                    finishUpdateTabletMeta(task, request);
                    break;
                case PUSH_COOLDOWN_CONF:
                    finishPushCooldownConfTask(task);
                    break;
                case CALCULATE_DELETE_BITMAP:
                    finishCalcDeleteBitmap(task, request);
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            tStatus.setStatusCode(TStatusCode.CANCELLED);
            String errMsg = "finish agent task error.";
            LOG.warn(errMsg, e);
            List<String> errorMsgs = new ArrayList<String>();
            errorMsgs.add(errMsg);
            tStatus.setErrorMsgs(errorMsgs);
        }

        if (tStatus.getStatusCode() == TStatusCode.OK) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("report task success. {}", request.toString());
            }
        }

        return result;
    }

    private void checkHasTabletInfo(TFinishTaskRequest request) throws Exception {
        if (!request.isSetFinishTabletInfos() || request.getFinishTabletInfos().isEmpty()) {
            throw new Exception("tablet info is not set");
        }
    }

    private void finishCreateReplica(AgentTask task, TFinishTaskRequest request) {
        // if we get here, this task will be removed from AgentTaskQueue for certain.
        // because in this function, the only problem that cause failure is meta missing.
        // and if meta is missing, we no longer need to resend this task
        try {
            CreateReplicaTask createReplicaTask = (CreateReplicaTask) task;
            if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
                createReplicaTask.countDownToZero(task.getBackendId() + ": "
                        + request.getTaskStatus().getErrorMsgs().toString());
            } else {
                createReplicaTask.setFinished(true);
                long tabletId = createReplicaTask.getTabletId();

                if (request.isSetFinishTabletInfos()) {
                    Replica replica = Env.getCurrentInvertedIndex().getReplica(createReplicaTask.getTabletId(),
                            createReplicaTask.getBackendId());
                    replica.setPathHash(request.getFinishTabletInfos().get(0).getPathHash());

                    if (createReplicaTask.isRecoverTask()) {
                        /**
                         * This create replica task may be generated by recovery
                         * (See comment of Config.recover_with_empty_tablet)
                         * So we set replica back to good.
                         */
                        replica.setBad(false);
                        LOG.info("finish recover create replica task. set replica to good. tablet {},"
                                        + " replica {}, backend {}", tabletId, task.getBackendId(), replica.getId());
                    }
                }

                // this should be called before 'countDownLatch()'
                Env.getCurrentSystemInfo().updateBackendReportVersion(task.getBackendId(),
                        request.getReportVersion(), task.getDbId(), task.getTableId(), true);

                createReplicaTask.countDownLatch(task.getBackendId(), task.getSignature());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("finish create replica. tablet id: {}, be: {}, report version: {}",
                            tabletId, task.getBackendId(), request.getReportVersion());
                }
            }
        } finally {
            AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.CREATE, task.getSignature());
        }
    }

    private void finishUpdateTabletMeta(AgentTask task, TFinishTaskRequest request) {
        // if we get here, this task will be removed from AgentTaskQueue for certain.
        // because in this function, the only problem that cause failure is meta missing.
        // and if meta is missing, we no longer need to resend this task
        try {
            UpdateTabletMetaInfoTask tabletTask = (UpdateTabletMetaInfoTask) task;
            if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
                tabletTask.countDownToZero(task.getBackendId() + ": "
                        + request.getTaskStatus().getErrorMsgs().toString());
            } else {
                tabletTask.countDownLatch(task.getBackendId(), tabletTask.getTablets());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("finish update tablet meta. tablet id: {}, be: {}",
                            tabletTask.getTablets(), task.getBackendId());
                }
            }
        } finally {
            AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.UPDATE_TABLET_META_INFO, task.getSignature());
        }
    }

    private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) throws Exception {
        PushTask pushTask = (PushTask) task;

        long dbId = pushTask.getDbId();
        long backendId = pushTask.getBackendId();
        long signature = task.getSignature();
        long transactionId = ((PushTask) task).getTransactionId();

        if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
            if (pushTask.getPushType() == TPushType.DELETE) {
                // we don't need to retry if the returned status code is DELETE_INVALID_CONDITION
                // or DELETE_INVALID_PARAMETERS
                // note that they will be converted to TStatusCode.INVALID_ARGUMENT when being sent from be to fe
                if (request.getTaskStatus().getStatusCode() == TStatusCode.INVALID_ARGUMENT) {
                    pushTask.countDownToZero(request.getTaskStatus().getStatusCode(),
                            task.getBackendId() + ": " + request.getTaskStatus().getErrorMsgs().toString());
                    AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
                    LOG.warn("finish push replica error: {}", request.getTaskStatus().getErrorMsgs().toString());
                }
            }
            return;
        }

        checkHasTabletInfo(request);
        List<TTabletInfo> finishTabletInfos = request.getFinishTabletInfos();

        Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
        if (db == null) {
            AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
            return;
        }

        long tableId = pushTask.getTableId();
        long partitionId = pushTask.getPartitionId();
        long pushIndexId = pushTask.getIndexId();
        long pushTabletId = pushTask.getTabletId();
        // push finish type:
        //                  numOfFinishTabletInfos  tabletId schemaHash
        // Normal:                     1                   /          /
        // SchemaChangeHandler         2                 same      diff
        // RollupHandler               2                 diff      diff
        //
        // reuse enum 'PartitionState' here as 'push finish type'
        PartitionState pushState = null;
        if (finishTabletInfos.size() == 1) {
            pushState = PartitionState.NORMAL;
        } else if (finishTabletInfos.size() == 2) {
            if (finishTabletInfos.get(0).getTabletId() == finishTabletInfos.get(1).getTabletId()) {
                pushState = PartitionState.SCHEMA_CHANGE;
            } else {
                pushState = PartitionState.ROLLUP;
            }
        } else {
            LOG.warn("invalid push report infos. finishTabletInfos' size: " + finishTabletInfos.size());
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("push report state: {}", pushState.name());
        }

        OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
        if (olapTable == null || !olapTable.writeLockIfExist()) {
            AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
            LOG.warn("finish push replica error, cannot find table[" + tableId + "] when push finished");
            return;
        }
        try {
            Partition partition = olapTable.getPartition(partitionId);
            if (partition == null) {
                throw new MetaNotFoundException("cannot find partition[" + partitionId + "] when push finished");
            }

            MaterializedIndex pushIndex = partition.getIndex(pushIndexId);
            if (pushIndex == null) {
                // yiguolei: if index is dropped during load, it is not a failure.
                // throw exception here and cause the job to cancel the task
                throw new MetaNotFoundException("cannot find index[" + pushIndex + "] when push finished");
            }

            // should be done before addReplicaPersistInfos and countDownLatch
            long reportVersion = request.getReportVersion();
            Env.getCurrentSystemInfo().updateBackendReportVersion(task.getBackendId(), reportVersion,
                                                                       task.getDbId(), task.getTableId(), true);

            List<Long> tabletIds = finishTabletInfos.stream().map(
                    tTabletInfo -> tTabletInfo.getTabletId()).collect(Collectors.toList());
            List<TabletMeta> tabletMetaList = Env.getCurrentInvertedIndex().getTabletMetaList(tabletIds);

            if (pushTask.getPushType() == TPushType.DELETE) {
                DeleteJob deleteJob = Env.getCurrentEnv().getDeleteHandler().getDeleteJob(transactionId);
                if (deleteJob == null) {
                    throw new MetaNotFoundException("cannot find delete job, job[" + transactionId + "]");
                }
                for (int i = 0; i < tabletMetaList.size(); i++) {
                    TabletMeta tabletMeta = tabletMetaList.get(i);
                    long tabletId = tabletIds.get(i);
                    Replica replica = findRelatedReplica(olapTable, partition,
                            backendId, tabletId, tabletMeta.getIndexId());
                    if (replica != null) {
                        deleteJob.addFinishedReplica(partitionId, pushTabletId, replica);
                        pushTask.countDownLatch(backendId, pushTabletId);
                    }
                }
            } else if (pushTask.getPushType() == TPushType.LOAD_V2) {
                long loadJobId = pushTask.getLoadJobId();
                org.apache.doris.load.loadv2.LoadJob job
                        = Env.getCurrentEnv().getLoadManager().getLoadJob(loadJobId);
                if (job == null) {
                    throw new MetaNotFoundException("cannot find load job, job[" + loadJobId + "]");
                }
                for (int i = 0; i < tabletMetaList.size(); i++) {
                    TabletMeta tabletMeta = tabletMetaList.get(i);
                    checkReplica(finishTabletInfos.get(i), tabletMeta);
                    long tabletId = tabletIds.get(i);
                    Replica replica = findRelatedReplica(
                            olapTable, partition, backendId, tabletId, tabletMeta.getIndexId());
                    // if the replica is under schema change, could not find the replica with aim schema hash
                    if (replica != null) {
                        if (job instanceof SparkLoadJob) {
                            ((SparkLoadJob) job).addFinishedReplica(replica.getId(), pushTabletId, backendId);
                        } else if (job instanceof IngestionLoadJob) {
                            ((IngestionLoadJob) job).addFinishedReplica(replica.getId(), pushTabletId, backendId);
                        }
                    }
                }
            }

            AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
            if (LOG.isDebugEnabled()) {
                LOG.debug("finish push replica. tabletId: {}, backendId: {}", pushTabletId, backendId);
            }
        } catch (MetaNotFoundException e) {
            AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, signature);
            LOG.warn("finish push replica error", e);
            if (pushTask.getPushType() == TPushType.DELETE) {
                pushTask.countDownLatch(backendId, pushTabletId);
            }
        } finally {
            olapTable.writeUnlock();
        }
    }

    private void checkReplica(TTabletInfo tTabletInfo, TabletMeta tabletMeta)
            throws MetaNotFoundException {
        long tabletId = tTabletInfo.getTabletId();
        // during finishing stage, index's schema hash switched, when old schema hash finished
        // current index hash != old schema hash and alter job's new schema hash != old schema hash
        // the check replica will failed
        // should use tabletid not pushTabletid because in rollup state, the push tabletid != tabletid
        // and tablet meta will not contain rollupindex's schema hash
        if (tabletMeta == null || tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
            // rollup may be dropped
            throw new MetaNotFoundException("tablet " + tabletId + " does not exist");
        }
    }

    private Replica findRelatedReplica(OlapTable olapTable, Partition partition,
                                                 long backendId, long tabletId, long indexId)
            throws MetaNotFoundException {
        // both normal index and rollingup index are in inverted index
        // this means the index is dropped during load
        if (indexId == TabletInvertedIndex.NOT_EXIST_VALUE) {
            LOG.warn("tablet[{}] may be dropped. push index[{}]", tabletId, indexId);
            return null;
        }
        MaterializedIndex index = partition.getIndex(indexId);
        if (index == null) {
            // In alter job v2 case
            // alter job is always == null, so that we could remove the condition
            // if alter job is always null, then could not covert it to a rollup
            // job, will throw exception, so just throw exception in this case
            if (olapTable.getState() == OlapTableState.ROLLUP) {
                // this happens when:
                // a rollup job is finish and a delete job is the next first job (no load job before)
                // and delete task is first send to base tablet, so it will return 2 tablets info.
                // the second tablet is rollup tablet and it is no longer exist in alterJobs queue.
                // just ignore the rollup tablet info. it will be handled in rollup tablet delete task report.

                // add log to observe
                LOG.warn("Cannot find table[{}].", olapTable.getId());
                return null;
            }
            throw new MetaNotFoundException("Could not find related replica");
        }
        Tablet tablet = index.getTablet(tabletId);
        if (tablet == null) {
            LOG.warn("could not find tablet {} in rollup index {} ", tabletId, indexId);
            return null;
        }
        Replica replica = tablet.getReplicaByBackendId(backendId);
        if (replica == null) {
            String reps = "";
            for (Replica r : tablet.getReplicas()) {
                reps += r.toString();
            }
            LOG.warn("could not find replica with backend {} in tablet {} in rollup index, is cloud: {}, replicas: {}",
                    backendId, tabletId, indexId, tablet instanceof CloudTablet, reps);
        }
        return replica;
    }

    private void finishClearAlterTask(AgentTask task, TFinishTaskRequest request) {
        ClearAlterTask clearAlterTask = (ClearAlterTask) task;
        clearAlterTask.setFinished(true);
        AgentTaskQueue.removeTask(task.getBackendId(), task.getTaskType(), task.getSignature());
    }

    private void finishPublishVersion(AgentTask task, TFinishTaskRequest request) {
        Map<Long, Long> succTablets = null;
        if (request.isSetSuccTablets()) {
            succTablets = request.getSuccTablets();
        }
        List<Long> errorTabletIds = null;
        if (request.isSetErrorTabletIds()) {
            errorTabletIds = request.getErrorTabletIds();
        }

        if (request.isSetReportVersion()) {
            // report version is required. here we check if set, for compatibility.
            long reportVersion = request.getReportVersion();
            Env.getCurrentSystemInfo().updateBackendReportVersion(
                    task.getBackendId(), reportVersion, task.getDbId(), task.getTableId(), true);
        }

        PublishVersionTask publishVersionTask = (PublishVersionTask) task;
        publishVersionTask.setSuccTablets(succTablets);
        publishVersionTask.addErrorTablets(errorTabletIds);
        publishVersionTask.setFinished(true);

        if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
            // not remove the task from queue and be will retry
            return;
        }
        if (request.isSetTableIdToTabletIdToDeltaNumRows()) {
            publishVersionTask.setTableIdTabletsDeltaRows(request.getTableIdToTabletIdToDeltaNumRows());
        }
        AgentTaskQueue.removeTask(publishVersionTask.getBackendId(),
                                  publishVersionTask.getTaskType(),
                                  publishVersionTask.getSignature());
    }

    private void finishDropReplica(AgentTask task) {
        AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.DROP, task.getSignature());
    }

    private void finishClone(AgentTask task, TFinishTaskRequest request) {
        CloneTask cloneTask = (CloneTask) task;
        if (cloneTask.getTaskVersion() == CloneTask.VERSION_2) {
            if (request.isSetReportVersion()) {
                long reportVersion = request.getReportVersion();
                Env.getCurrentSystemInfo().updateBackendReportVersion(
                        task.getBackendId(), reportVersion, task.getDbId(), task.getTableId(), true);
            }
            Env.getCurrentEnv().getTabletScheduler().finishCloneTask(cloneTask, request);
        } else {
            LOG.warn("invalid clone task, ignore it. {}", task);
        }

        AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.CLONE, task.getSignature());
    }

    private void finishStorageMediumMigrate(AgentTask task, TFinishTaskRequest request) {
        StorageMediaMigrationTask migrationTask = (StorageMediaMigrationTask) task;
        Env.getCurrentEnv().getTabletScheduler().finishStorageMediaMigrationTask(migrationTask, request);
        AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.STORAGE_MEDIUM_MIGRATE, task.getSignature());
    }

    private void finishConsistencyCheck(AgentTask task, TFinishTaskRequest request) {
        CheckConsistencyTask checkConsistencyTask = (CheckConsistencyTask) task;

        if (checkConsistencyTask.getVersion() != request.getRequestVersion()) {
            LOG.warn("check consistency task is not match. [{}-{}]",
                     checkConsistencyTask.getVersion(), request.getRequestVersion());
            return;
        }

        Env.getCurrentEnv().getConsistencyChecker().handleFinishedConsistencyCheck(checkConsistencyTask,
                                                                                     request.getTabletChecksum());
        AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.CHECK_CONSISTENCY, task.getSignature());
    }

    private void finishMakeSnapshot(AgentTask task, TFinishTaskRequest request) {
        SnapshotTask snapshotTask = (SnapshotTask) task;
        task.setFinished(true);
        if (snapshotTask.isCopyTabletTask()) {
            snapshotTask.setResultSnapshotPath(request.getSnapshotPath());
            snapshotTask.countDown(task.getBackendId(), task.getTabletId());
        } else {
            if (Env.getCurrentEnv().getBackupHandler().handleFinishedSnapshotTask(snapshotTask, request)) {
                AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.MAKE_SNAPSHOT, task.getSignature());
            }
        }
    }

    private void finishUpload(AgentTask task, TFinishTaskRequest request) {
        UploadTask uploadTask = (UploadTask) task;
        if (Env.getCurrentEnv().getBackupHandler().handleFinishedSnapshotUploadTask(uploadTask, request)) {
            AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.UPLOAD, task.getSignature());
        }
    }

    private void finishDownloadTask(AgentTask task, TFinishTaskRequest request) {
        DownloadTask downloadTask = (DownloadTask) task;
        task.setFinished(true);
        if (Env.getCurrentEnv().getBackupHandler().handleDownloadSnapshotTask(downloadTask, request)) {
            AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.DOWNLOAD, task.getSignature());
        }
    }

    private void finishMoveDirTask(AgentTask task, TFinishTaskRequest request) {
        DirMoveTask dirMoveTask = (DirMoveTask) task;
        task.setFinished(true);
        if (Env.getCurrentEnv().getBackupHandler().handleDirMoveTask(dirMoveTask, request)) {
            AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.MOVE, task.getSignature());
        }
    }

    private void finishRecoverTablet(AgentTask task) {
        AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.RECOVER_TABLET, task.getSignature());
    }

    public TMasterResult report(TReportRequest request) throws TException {
        return reportHandler.handleReport(request);
    }

    private void finishAlterTask(AgentTask task, TFinishTaskRequest request) {
        AlterReplicaTask alterTask = (AlterReplicaTask) task;
        try {
            if (alterTask.getJobType() == JobType.ROLLUP) {
                Env.getCurrentEnv().getMaterializedViewHandler().handleFinishAlterTask(alterTask);
            } else if (alterTask.getJobType() == JobType.SCHEMA_CHANGE) {
                Env.getCurrentEnv().getSchemaChangeHandler().handleFinishAlterTask(alterTask);
            }
            alterTask.setFinished(true);
            if (request.isSetReportVersion()) {
                long reportVersion = request.getReportVersion();
                Env.getCurrentSystemInfo().updateBackendReportVersion(
                        task.getBackendId(), reportVersion, task.getDbId(), task.getTableId(), true);
            }
        } catch (MetaNotFoundException e) {
            LOG.warn("failed to handle finish alter task: {}, {}", task.getSignature(), e.getMessage());
        }
        AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature());
    }

    private void finishAlterInvertedIndexTask(AgentTask task, TFinishTaskRequest request) {
        TStatus taskStatus = request.getTaskStatus();
        if (taskStatus.getStatusCode() != TStatusCode.OK) {
            LOG.warn("AlterInvertedIndexTask: {} failed, failed times: {}, remaining it in agent task queue",
                     task.getSignature(), task.getFailedTimes());
            return;
        }

        AlterInvertedIndexTask alterInvertedIndexTask = (AlterInvertedIndexTask) task;
        LOG.info("begin finish AlterInvertedIndexTask: {}, tablet: {}, toString: {}",
                alterInvertedIndexTask.getSignature(),
                alterInvertedIndexTask.getTabletId(),
                alterInvertedIndexTask.toString());
        // TODO: more check
        alterInvertedIndexTask.setFinished(true);
        AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER_INVERTED_INDEX, task.getSignature());
    }

    private void finishPushCooldownConfTask(AgentTask task) {
        PushCooldownConfTask cooldownTask = (PushCooldownConfTask) task;
        cooldownTask.setFinished(true);
        AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUSH_COOLDOWN_CONF, task.getSignature());
    }

    private void finishCalcDeleteBitmap(AgentTask task, TFinishTaskRequest request) {
        // if we get here, this task will be removed from AgentTaskQueue for certain.
        // because in this function, the only problem that cause failure is meta missing.
        // and if meta is missing, we no longer need to resend this task
        try {
            CalcDeleteBitmapTask calcDeleteBitmapTask = (CalcDeleteBitmapTask) task;
            if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
                calcDeleteBitmapTask.countDownToZero(request.getTaskStatus().getStatusCode(),
                        "backend: " + task.getBackendId() + ", error_tablet_size: " + request.getErrorTabletIdsSize()
                                + ", error_tablets: " + request.getErrorTabletIds()
                                + ", err_msg: " + request.getTaskStatus().getErrorMsgs().toString());
            } else if (request.isSetRespPartitions()
                    && calcDeleteBitmapTask.isFinishRequestStale(request.getRespPartitions())) {
                LOG.warn("get staled response from backend: {}, report version: {}. calcDeleteBitmapTask's"
                        + "partitionInfos: {}. response's partitionInfos: {}", task.getBackendId(),
                                request.getReportVersion(),
                                        calcDeleteBitmapTask.getCalcDeleteBimapPartitionInfos().toString(),
                                                request.getRespPartitions().toString());
                // DELETE_BITMAP_LOCK_ERROR will be retried
                calcDeleteBitmapTask.countDownToZero(TStatusCode.DELETE_BITMAP_LOCK_ERROR,
                        "get staled response from backend " + task.getBackendId() + ", report version: "
                                + request.getReportVersion());
            } else {
                calcDeleteBitmapTask.countDownLatch(task.getBackendId(), calcDeleteBitmapTask.getTransactionId());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("finish calc delete bitmap. transaction id: {}, be: {}, report version: {}",
                            calcDeleteBitmapTask.getTransactionId(), calcDeleteBitmapTask.getBackendId(),
                            request.getReportVersion());
                }
            }
        } finally {
            AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.CALCULATE_DELETE_BITMAP, task.getSignature());
        }
    }
}