SyncJob.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load.sync;

import org.apache.doris.analysis.BinlogDesc;
import org.apache.doris.analysis.ChannelDescription;
import org.apache.doris.analysis.CreateDataSyncJobStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.sync.SyncFailMsg.MsgType;
import org.apache.doris.load.sync.canal.CanalSyncJob;
import org.apache.doris.nereids.trees.plans.commands.load.CreateDataSyncJobCommand;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;

public abstract class SyncJob implements Writable {
    private static final Logger LOG = LogManager.getLogger(SyncJob.class);

    @SerializedName(value = "id")
    protected long id;
    @SerializedName(value = "dbId")
    protected long dbId;
    @SerializedName(value = "jobName")
    protected String jobName;
    @SerializedName(value = "channelDescriptions")
    protected List<ChannelDescription> channelDescriptions;
    protected BinlogDesc binlogDesc;
    @SerializedName(value = "createTimeMs")
    protected long createTimeMs;
    @SerializedName(value = "lastStartTimeMs")
    protected long lastStartTimeMs;
    @SerializedName(value = "lastStopTimeMs")
    protected long lastStopTimeMs;
    @SerializedName(value = "finishTimeMs")
    protected long finishTimeMs;
    @SerializedName(value = "jobState")
    protected JobState jobState;
    @SerializedName(value = "failMsg")
    protected SyncFailMsg failMsg;
    @SerializedName(value = "dataSyncJobType")
    protected DataSyncJobType dataSyncJobType;
    protected List<SyncChannel> channels;

    public SyncJob(long id, String jobName, long dbId) {
        this.id = id;
        this.dbId = dbId;
        this.jobName = jobName;
        this.jobState = JobState.PENDING;
        this.createTimeMs = System.currentTimeMillis();
        this.lastStartTimeMs = -1L;
        this.lastStopTimeMs = -1L;
        this.finishTimeMs = -1L;
    }

    /**
     *                       +-------------+
     *           create job  |  PENDING    |    resume job
     *           +-----------+             | <-------------+
     *           |           +-------------+               |
     *           v                                         ^
     *           |                                         |
     *      +------------+   pause job             +-------+----+
     *      |  RUNNING   |   run error             |  PAUSED    |
     *      |            +-----------------------> |            |
     *      +----+-------+                         +-------+----+
     *           |                                         |
     *           v           +-------------+               v
     *           |           | CANCELLED   |               |
     *           +---------> |             |   <-----------+
     *          stop job     +-------------+    stop job
     *          system error
     */
    public enum JobState {
        PENDING,
        RUNNING,
        PAUSED,
        CANCELLED
    }

    public static SyncJob fromCommand(long jobId, CreateDataSyncJobCommand command) throws DdlException {
        String dbName = command.getDbName();
        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
        SyncJob syncJob;
        try {
            switch (command.getDataSyncJobType()) {
                case CANAL:
                    syncJob = new CanalSyncJob(jobId, command.getJobName(), db.getId());
                    break;
                default:
                    throw new DdlException("Unknown load job type.");
            }
            syncJob.setChannelDescriptions(command.getChannelDescriptions());
            syncJob.checkAndSetBinlogInfo(command.getBinlogDesc());
            return syncJob;
        } catch (Exception e) {
            throw new DdlException(e.getMessage());
        }
    }

    public static SyncJob fromStmt(long jobId, CreateDataSyncJobStmt stmt) throws DdlException {
        String dbName = stmt.getDbName();
        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
        SyncJob syncJob;
        try {
            switch (stmt.getDataSyncJobType()) {
                case CANAL:
                    syncJob = new CanalSyncJob(jobId, stmt.getJobName(), db.getId());
                    break;
                default:
                    throw new DdlException("Unknown load job type.");
            }
            syncJob.setChannelDescriptions(stmt.getChannelDescriptions());
            syncJob.checkAndSetBinlogInfo(stmt.getBinlogDesc());
            return syncJob;
        } catch (Exception e) {
            throw new DdlException(e.getMessage());
        }
    }

    // return true if job is done (CANCELLED)
    public boolean isCompleted() {
        return jobState == JobState.CANCELLED;
    }

    public boolean isPaused() {
        return jobState == JobState.PAUSED;
    }

    public boolean isRunning() {
        return jobState == JobState.RUNNING;
    }

    public boolean isCancelled() {
        return jobState == JobState.CANCELLED;
    }

    public boolean isNeedReschedule() {
        return false;
    }

    public synchronized void updateState(JobState newState, boolean isReplay) throws UserException {
        checkStateTransform(newState);
        unprotectedUpdateState(newState, isReplay);
    }

    public void unprotectedUpdateState(JobState newState, boolean isReplay) {
        this.jobState = newState;
        switch (newState) {
            case PENDING:
                break;
            case RUNNING:
                this.lastStartTimeMs = System.currentTimeMillis();
                break;
            case PAUSED:
                this.lastStopTimeMs = System.currentTimeMillis();
                break;
            case CANCELLED:
                this.lastStopTimeMs = System.currentTimeMillis();
                this.finishTimeMs = System.currentTimeMillis();
                break;
            default:
                Preconditions.checkState(false, "wrong job state: " + newState.name());
                break;
        }
        if (!isReplay) {
            SyncJobUpdateStateInfo info = new SyncJobUpdateStateInfo(id, jobState, lastStartTimeMs, lastStopTimeMs,
                    finishTimeMs, failMsg);
            Env.getCurrentEnv().getEditLog().logUpdateSyncJobState(info);
        }
    }

    private void checkStateTransform(JobState newState) throws UserException {
        switch (jobState) { // CHECKSTYLE IGNORE THIS LINE: missing switch default
            case PENDING:
                break;
            case RUNNING:
                if (newState == JobState.RUNNING) {
                    throw new DdlException("Could not transform " + jobState + " to " + newState);
                }
                break;
            case PAUSED:
                if (newState == JobState.PAUSED || newState == JobState.RUNNING) {
                    throw new DdlException("Could not transform " + jobState + " to " + newState);
                }
                break;
            case CANCELLED:
                throw new DdlException("Could not transform " + jobState + " to " + newState);
        }
    }

    public void checkAndDoUpdate() throws UserException {
        Database database = Env.getCurrentInternalCatalog().getDbNullable(dbId);
        if (database == null) {
            if (!isCompleted()) {
                String msg = "The database has been deleted. Change job state to cancelled";
                LOG.warn(new LogBuilder(LogKey.SYNC_JOB, id)
                        .add("database", dbId)
                        .add("msg", msg).build());
                cancel(MsgType.SCHEDULE_FAIL, msg);
            }
            return;
        }

        for (ChannelDescription channelDescription : channelDescriptions) {
            Table table = database.getTableNullable(channelDescription.getTargetTable());
            if (table == null) {
                if (!isCompleted()) {
                    String msg = "The table has been deleted. Change job state to cancelled";
                    LOG.warn(new LogBuilder(LogKey.SYNC_JOB, id)
                            .add("dbId", dbId)
                            .add("table", channelDescription.getTargetTable())
                            .add("msg", msg).build());
                    cancel(MsgType.SCHEDULE_FAIL, msg);
                }
                return;
            }
        }

        if (isNeedReschedule()) {
            LOG.info(new LogBuilder(LogKey.SYNC_JOB, id)
                    .add("msg", "Job need to be scheduled")
                    .build());
            updateState(JobState.PENDING, false);
        }
    }

    public void checkAndSetBinlogInfo(BinlogDesc binlogDesc) throws DdlException {
        this.binlogDesc = binlogDesc;
    }

    public abstract void execute() throws UserException;

    public void cancel(MsgType msgType, String errMsg) {
    }

    public void pause() throws UserException {
        throw new UserException("not implemented");
    }

    public void resume() throws UserException {
        throw new UserException("not implemented");
    }

    public String getStatus() {
        return "\\N";
    }

    public String getJobConfig() {
        return "\\N";
    }

    public boolean isExpired(long currentTimeMs) {
        if (!isCompleted()) {
            return false;
        }
        Preconditions.checkState(finishTimeMs != -1L);
        long expireTime = Config.label_keep_max_second * 1000L;
        if ((currentTimeMs - finishTimeMs) > expireTime) {
            return true;
        }
        return false;
    }

    // only use for persist when job state changed
    public static class SyncJobUpdateStateInfo implements Writable {
        @SerializedName(value = "id")
        private long id;
        @SerializedName(value = "lastStartTimeMs")
        protected long lastStartTimeMs;
        @SerializedName(value = "lastStopTimeMs")
        protected long lastStopTimeMs;
        @SerializedName(value = "finishTimeMs")
        protected long finishTimeMs;
        @SerializedName(value = "jobState")
        protected JobState jobState;
        @SerializedName(value = "failMsg")
        protected SyncFailMsg failMsg;

        public SyncJobUpdateStateInfo(long id, JobState jobState, long lastStartTimeMs,
                long lastStopTimeMs, long finishTimeMs, SyncFailMsg failMsg) {
            this.id = id;
            this.jobState = jobState;
            this.lastStartTimeMs = lastStartTimeMs;
            this.lastStopTimeMs = lastStopTimeMs;
            this.finishTimeMs = finishTimeMs;
            this.failMsg = failMsg;
        }

        public long getId() {
            return this.id;
        }

        public long getLastStartTimeMs() {
            return this.lastStartTimeMs;
        }

        public long getLastStopTimeMs() {
            return this.lastStopTimeMs;
        }

        public long getFinishTimeMs() {
            return this.finishTimeMs;
        }

        public JobState getJobState() {
            return this.jobState;
        }

        public SyncFailMsg getFailMsg() {
            return this.failMsg;
        }

        @Override
        public String toString() {
            return GsonUtils.GSON.toJson(this);
        }

        @Override
        public void write(DataOutput out) throws IOException {
            String json = GsonUtils.GSON.toJson(this);
            Text.writeString(out, json);
        }

        public static SyncJobUpdateStateInfo read(DataInput in) throws IOException {
            String json = Text.readString(in);
            return GsonUtils.GSON.fromJson(json, SyncJobUpdateStateInfo.class);
        }
    }

    public List<Comparable> getShowInfo() {
        List<Comparable> jobInfo = Lists.newArrayList();
        // jobId
        jobInfo.add(id);
        // jobName
        jobInfo.add(jobName);
        // type
        jobInfo.add(dataSyncJobType.name());
        // state
        jobInfo.add(jobState.name());
        // channel
        StringBuilder channelInfo = new StringBuilder();
        if (channels != null) {
            for (int i = 0; i < channels.size(); i++) {
                channelInfo.append(channels.get(i).getInfo());
                if (i < channels.size() - 1) {
                    channelInfo.append(", ");
                }
            }
            jobInfo.add(channelInfo.toString());
        } else {
            jobInfo.add(FeConstants.null_string);
        }

        // status
        jobInfo.add(getStatus());
        // jobConfig
        jobInfo.add(getJobConfig());
        // createTimeMs
        jobInfo.add(TimeUtils.longToTimeString(createTimeMs));
        // lastStartTimeMs
        jobInfo.add(TimeUtils.longToTimeString(lastStartTimeMs));
        // lastStopTimeMs
        jobInfo.add(TimeUtils.longToTimeString(lastStopTimeMs));
        // finishTimeMs
        jobInfo.add(TimeUtils.longToTimeString(finishTimeMs));
        // failMsg
        if (failMsg == null) {
            jobInfo.add(FeConstants.null_string);
        } else {
            jobInfo.add(failMsg.toString());
        }
        return jobInfo;
    }

    public void replayUpdateSyncJobState(SyncJobUpdateStateInfo info) {
        lastStartTimeMs = info.getLastStartTimeMs();
        lastStopTimeMs = info.getLastStopTimeMs();
        finishTimeMs = info.getFinishTimeMs();
        try {
            updateState(info.getJobState(), true);
        } catch (UserException e) {
            LOG.error("replay update state error, which should not happen: {}", e.getMessage());
        }
        LOG.info(new LogBuilder(LogKey.SYNC_JOB, info.getId())
                .add("desired_state:", info.getJobState())
                .add("msg", "Replay update sync job state")
                .build());
    }

    @Override
    public void write(DataOutput out) throws IOException {
        Text.writeString(out, GsonUtils.GSON.toJson(this, SyncJob.class));
    }

    public static SyncJob read(DataInput in) throws IOException {
        String json = Text.readString(in);
        return GsonUtils.GSON.fromJson(json, SyncJob.class);
    }

    public void setChannelDescriptions(List<ChannelDescription> channelDescriptions) throws DdlException {
        this.channelDescriptions = channelDescriptions;
        Map<String, String> tableMappings = Maps.newHashMap();
        for (ChannelDescription channelDescription : channelDescriptions) {
            String src = channelDescription.getSrcDatabase() + "." + channelDescription.getSrcTableName();
            String tar = channelDescription.getTargetTable();
            tableMappings.put(src, tar);
        }
        Set<String> mappingSet = Sets.newHashSet(tableMappings.values());
        if (mappingSet.size() != tableMappings.size() || mappingSet.size() != channelDescriptions.size()) {
            throw new DdlException("The mapping relations between tables should be injective.");
        }
        // set channel id
        for (ChannelDescription channelDescription : channelDescriptions) {
            channelDescription.setChannelId(Env.getCurrentEnv().getNextId());
        }
    }

    public long getId() {
        return this.id;
    }

    public long getDbId() {
        return this.dbId;
    }

    public String getJobName() {
        return this.jobName;
    }

    public JobState getJobState() {
        return this.jobState;
    }

    public DataSyncJobType getJobType() {
        return this.dataSyncJobType;
    }

    public SyncFailMsg getFailMsg() {
        return failMsg;
    }

    public void setFailMsg(SyncFailMsg failMsg) {
        this.failMsg = failMsg;
    }

    public List<ChannelDescription> getChannelDescriptions() {
        return this.channelDescriptions;
    }

    public long getFinishTimeMs() {
        return finishTimeMs;
    }
}