AlterJobV2.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.alter;

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.task.AgentTask;

import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.IOException;
import java.util.List;
import java.util.Map;

/*
 * Version 2 of AlterJob, for replacing the old version of AlterJob.
 * This base class of RollupJob and SchemaChangeJob
 */
public abstract class AlterJobV2 implements Writable {
    private static final Logger LOG = LogManager.getLogger(AlterJobV2.class);


    public enum JobState {
        PENDING, // Job is created
        // CHECKSTYLE OFF
        WAITING_TXN, // New replicas are created and Shadow catalog object is visible for incoming txns, waiting for previous txns to be finished
        // CHECKSTYLE ON
        RUNNING, // alter tasks are sent to BE, and waiting for them finished.
        FINISHED, // job is done
        CANCELLED; // job is cancelled(failed or be cancelled by user)

        public boolean isFinalState() {
            return this == JobState.FINISHED || this == JobState.CANCELLED;
        }
    }

    public enum JobType {
        // Must not remove it or change the order, because catalog depend on it to traverse the image
        // and load meta data
        ROLLUP, SCHEMA_CHANGE, DECOMMISSION_BACKEND
    }

    @SerializedName(value = "type")
    protected JobType type;
    @SerializedName(value = "jobId")
    protected long jobId;
    @SerializedName(value = "jobState")
    protected JobState jobState;

    @SerializedName(value = "dbId")
    protected long dbId;
    @SerializedName(value = "tableId")
    protected long tableId;
    @SerializedName(value = "tableName")
    protected String tableName;

    @SerializedName(value = "errMsg")
    protected String errMsg = "";
    @SerializedName(value = "createTimeMs")
    protected long createTimeMs = -1;
    @SerializedName(value = "finishedTimeMs")
    protected long finishedTimeMs = -1;
    @SerializedName(value = "timeoutMs")
    protected long timeoutMs = -1;
    @SerializedName(value = "rawSql")
    protected String rawSql;
    @SerializedName(value = "cloudClusterName")
    protected String cloudClusterName = "";

    // The job will wait all transactions before this txn id finished, then send the schema_change/rollup tasks.
    @SerializedName(value = "watershedTxnId")
    protected long watershedTxnId = -1;

    // save failed task after retry three times, tablet -> backends
    @SerializedName(value = "failedTabletBackends")
    protected Map<Long, List<Long>> failedTabletBackends = Maps.newHashMap();

    @SerializedName(value = "uid")
    protected UserIdentity userIdentity = null;

    public AlterJobV2(String rawSql, long jobId, JobType jobType, long dbId, long tableId, String tableName,
                      long timeoutMs) {
        this.rawSql = rawSql;
        this.jobId = jobId;
        this.type = jobType;
        this.dbId = dbId;
        this.tableId = tableId;
        this.tableName = tableName;
        this.timeoutMs = timeoutMs;

        this.createTimeMs = System.currentTimeMillis();
        this.jobState = JobState.PENDING;

        if (ConnectContext.get() != null) {
            userIdentity = ConnectContext.get().getCurrentUserIdentity();
        }
    }

    protected AlterJobV2(JobType type) {
        this.type = type;
    }

    public String getCloudClusterName() {
        return cloudClusterName;
    }

    public void setCloudClusterName(final String clusterName) {
        cloudClusterName = clusterName;
    }

    protected void sleepSeveralSeconds() {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {
            LOG.warn("ignore InterruptedException");
        }
    }

    public long getJobId() {
        return jobId;
    }

    public JobState getJobState() {
        return jobState;
    }

    public void setJobState(JobState jobState) {
        this.jobState = jobState;
    }

    public JobType getType() {
        return type;
    }

    public long getDbId() {
        return dbId;
    }

    public long getTableId() {
        return tableId;
    }

    public String getTableName() {
        return tableName;
    }

    public long getWatershedTxnId() {
        return watershedTxnId;
    }

    public boolean isTimeout() {
        return System.currentTimeMillis() - createTimeMs > timeoutMs;
    }

    public boolean isExpire() {
        return isDone() && (System.currentTimeMillis() - finishedTimeMs) / 1000 > Config.history_job_keep_max_second;
    }

    public boolean isDone() {
        return jobState.isFinalState();
    }

    public long getFinishedTimeMs() {
        return finishedTimeMs;
    }

    public void setFinishedTimeMs(long finishedTimeMs) {
        this.finishedTimeMs = finishedTimeMs;
    }

    public String getRawSql() {
        return rawSql;
    }

    // /api/debug_point/add/{name}?value=100
    protected void stateWait(final String name) {
        long waitTimeMs = DebugPointUtil.getDebugParamOrDefault(name, 0);
        if (waitTimeMs > 0) {
            try {
                LOG.info("debug point {} wait {} ms", name, waitTimeMs);
                Thread.sleep(waitTimeMs);
            } catch (InterruptedException e) {
                LOG.warn(name, e);
            }
        }
    }

    /**
     * The keyword 'synchronized' only protects 2 methods:
     * run() and cancel()
     * Only these 2 methods can be visited by different thread(internal working thread and user connection thread)
     * So using 'synchronized' to make sure only one thread can run the job at one time.
     *
     * lock order:
     *      synchronized
     *      db lock
     */
    public synchronized void run() {
        if (isTimeout()) {
            cancelImpl("Timeout");
            return;
        }

        if (!Strings.isNullOrEmpty(cloudClusterName)) {
            ConnectContext ctx = new ConnectContext();
            ctx.setThreadLocalInfo();
            ctx.setCloudCluster(cloudClusterName);
            // currently used for CloudReplica.getCurrentClusterId
            // later maybe used for managing all workload in BE.
            ctx.setCurrentUserIdentity(this.userIdentity);
        }

        // /api/debug_point/add/FE.STOP_ALTER_JOB_RUN
        if (DebugPointUtil.isEnable("FE.STOP_ALTER_JOB_RUN")) {
            LOG.info("debug point FE.STOP_ALTER_JOB_RUN, schema change schedule stopped");
            return;
        }

        try {
            switch (jobState) {
                case PENDING:
                    stateWait("FE.ALTER_JOB_V2_PENDING");
                    runPendingJob();
                    break;
                case WAITING_TXN:
                    stateWait("FE.ALTER_JOB_V2_WAITING_TXN");
                    runWaitingTxnJob();
                    break;
                case RUNNING:
                    stateWait("FE.ALTER_JOB_V2_RUNNING");
                    runRunningJob();
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            LOG.error("failed to run alter job {}", jobId, e);
            cancelImpl(e.getMessage());
        }
    }

    public final synchronized boolean cancel(String errMsg) {
        return cancelImpl(errMsg);
    }

    /**
    * should be call before executing the job.
    * return false if table is not stable.
    */
    protected boolean checkTableStable(Database db) throws AlterCancelException {
        OlapTable tbl;
        try {
            tbl = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
        } catch (MetaNotFoundException e) {
            throw new AlterCancelException(e.getMessage());
        }

        tbl.writeLockOrAlterCancelException();
        try {
            boolean isStable = tbl.isStable(Env.getCurrentSystemInfo(),
                    Env.getCurrentEnv().getTabletScheduler());

            if (!isStable) {
                errMsg = "table is unstable";
                LOG.warn("wait table {} to be stable before doing {} job", tableId, type);
                tbl.setState(OlapTableState.WAITING_STABLE);
                return false;
            } else {
                // table is stable, set is to ROLLUP and begin altering.
                LOG.info("table {} is stable, start {} job {}", tableId, type, jobId);
                tbl.setState(type == JobType.ROLLUP ? OlapTableState.ROLLUP : OlapTableState.SCHEMA_CHANGE);
                errMsg = "";
                return true;
            }
        } finally {
            tbl.writeUnlock();
        }
    }

    protected abstract void runPendingJob() throws Exception;

    protected abstract void runWaitingTxnJob() throws AlterCancelException;

    protected abstract void runRunningJob() throws AlterCancelException;

    protected abstract boolean cancelImpl(String errMsg);

    protected abstract void getInfo(List<List<Comparable>> infos);

    protected void ensureCloudClusterExist(List<AgentTask> tasks) throws AlterCancelException {}

    public abstract void replay(AlterJobV2 replayedJob);

    public static AlterJobV2 read(DataInput in) throws IOException {
        String json = Text.readString(in);
        return GsonUtils.GSON.fromJson(json, AlterJobV2.class);
    }

    public String toJson() {
        return GsonUtils.GSON.toJson(this);
    }
}