SyncJobManager.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load.sync;

import org.apache.doris.analysis.CreateDataSyncJobStmt;
import org.apache.doris.analysis.PauseSyncJobStmt;
import org.apache.doris.analysis.ResumeSyncJobStmt;
import org.apache.doris.analysis.StopSyncJobStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.sync.canal.CanalDestination;
import org.apache.doris.load.sync.canal.CanalSyncJob;
import org.apache.doris.nereids.trees.plans.commands.load.CreateDataSyncJobCommand;
import org.apache.doris.nereids.trees.plans.commands.load.PauseDataSyncJobCommand;
import org.apache.doris.nereids.trees.plans.commands.load.ResumeDataSyncJobCommand;
import org.apache.doris.nereids.trees.plans.commands.load.StopDataSyncJobCommand;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;

public class SyncJobManager implements Writable {
    private static final Logger LOG = LogManager.getLogger(SyncJobManager.class);

    private Map<Long, SyncJob> idToSyncJob;

    private Map<Long, Map<String, List<SyncJob>>> dbIdToJobNameToSyncJobs;

    private ReentrantReadWriteLock lock;

    public SyncJobManager() {
        idToSyncJob = Maps.newConcurrentMap();
        dbIdToJobNameToSyncJobs = Collections.synchronizedMap(Maps.newLinkedHashMap());
        lock = new ReentrantReadWriteLock(true);
    }

    public void addDataSyncJob(CreateDataSyncJobCommand command) throws DdlException {
        if (!Config.enable_feature_data_sync_job) {
            throw new DdlException("Data sync job is deprecated and disabled by default. You can enable it by setting "
                + "'enable_feature_data_sync_job=true' in fe.conf. "
                + "But it's not recommended to use it in production.");
        }
        long jobId = Env.getCurrentEnv().getNextId();
        SyncJob syncJob = SyncJob.fromCommand(jobId, command);
        writeLock();
        try {
            checkDuplicateRemote(syncJob);
            unprotectedAddSyncJob(syncJob);
            Env.getCurrentEnv().getEditLog().logCreateSyncJob(syncJob);
        } finally {
            writeUnlock();
        }
        LOG.info(new LogBuilder(LogKey.SYNC_JOB, syncJob.getId())
                .add("name", syncJob.getJobName())
                .add("type", syncJob.getJobType())
                .add("config", syncJob.getJobConfig())
                .add("msg", "add sync job.")
                .build());
    }

    public void addDataSyncJob(CreateDataSyncJobStmt stmt) throws DdlException {
        if (!Config.enable_feature_data_sync_job) {
            throw new DdlException("Data sync job is deprecated and disabled by default. You can enable it by setting "
                    + "'enable_feature_data_sync_job=true' in fe.conf. "
                    + "But it's not recommended to use it in production.");
        }
        long jobId = Env.getCurrentEnv().getNextId();
        SyncJob syncJob = SyncJob.fromStmt(jobId, stmt);
        writeLock();
        try {
            checkDuplicateRemote(syncJob);
            unprotectedAddSyncJob(syncJob);
            Env.getCurrentEnv().getEditLog().logCreateSyncJob(syncJob);
        } finally {
            writeUnlock();
        }
        LOG.info(new LogBuilder(LogKey.SYNC_JOB, syncJob.getId())
                .add("name", syncJob.getJobName())
                .add("type", syncJob.getJobType())
                .add("config", syncJob.getJobConfig())
                .add("msg", "add sync job.")
                .build());
    }

    private void checkDuplicateRemote(SyncJob syncJob) throws DdlException {
        if (syncJob.getJobType() == DataSyncJobType.CANAL) {
            CanalDestination remote = ((CanalSyncJob) syncJob).getRemote();
            List<SyncJob> unCompletedJobs = idToSyncJob.values().stream().filter(job -> !job.isCompleted())
                    .collect(Collectors.toList());
            for (SyncJob job : unCompletedJobs) {
                if (job instanceof CanalSyncJob && ((CanalSyncJob) job).getRemote().equals(remote)) {
                    throw new DdlException("Remote Canal instance already exists. conflict destination: " + remote);
                }
            }
        }
    }

    private void unprotectedAddSyncJob(SyncJob syncJob) {
        idToSyncJob.put(syncJob.getId(), syncJob);
        long dbId = syncJob.getDbId();
        if (!dbIdToJobNameToSyncJobs.containsKey(dbId)) {
            dbIdToJobNameToSyncJobs.put(syncJob.getDbId(), Maps.newConcurrentMap());
        }
        Map<String, List<SyncJob>> map = dbIdToJobNameToSyncJobs.get(dbId);
        if (!map.containsKey(syncJob.getJobName())) {
            map.put(syncJob.getJobName(), Lists.newArrayList());
        }
        map.get(syncJob.getJobName()).add(syncJob);
    }

    public void pauseSyncJob(PauseDataSyncJobCommand command) throws UserException {
        String dbName = command.getDbFullName();
        String jobName = command.getJobName();

        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);

        List<SyncJob> syncJobs = Lists.newArrayList();
        readLock();
        try {
            List<SyncJob> matchJobs = getSyncJobsByDbAndJobName(db.getId(), jobName);
            if (matchJobs.isEmpty()) {
                throw new DdlException("Load job does not exist");
            }

            List<SyncJob> runningSyncJobs = matchJobs.stream().filter(SyncJob::isRunning)
                    .collect(Collectors.toList());
            if (runningSyncJobs.isEmpty()) {
                throw new DdlException("There is no running job with jobName `"
                    + command.getJobName() + "` to pause");
            }

            syncJobs.addAll(runningSyncJobs);
        } finally {
            readUnlock();
        }

        for (SyncJob syncJob : syncJobs) {
            syncJob.pause();
        }
    }

    public void resumeSyncJob(ResumeDataSyncJobCommand command) throws UserException {
        String dbName = command.getDbFullName();
        String jobName = command.getJobName();

        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);

        List<SyncJob> syncJobs = Lists.newArrayList();
        readLock();
        try {
            List<SyncJob> matchJobs = getSyncJobsByDbAndJobName(db.getId(), jobName);
            if (matchJobs.isEmpty()) {
                throw new DdlException("Load job does not exist");
            }

            List<SyncJob> pausedSyncJob = matchJobs.stream().filter(SyncJob::isPaused)
                    .collect(Collectors.toList());
            if (pausedSyncJob.isEmpty()) {
                throw new DdlException("There is no paused job with jobName `"
                    + command.getJobName() + "` to resume");
            }

            syncJobs.addAll(pausedSyncJob);
        } finally {
            readUnlock();
        }

        for (SyncJob syncJob : syncJobs) {
            syncJob.resume();
        }
    }

    public void stopSyncJob(StopDataSyncJobCommand command) throws UserException {
        String dbName = command.getDbFullName();
        String jobName = command.getJobName();

        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);

        // List of sync jobs waiting to be cancelled
        List<SyncJob> syncJobs = Lists.newArrayList();
        readLock();
        try {
            List<SyncJob> matchJobs = getSyncJobsByDbAndJobName(db.getId(), jobName);
            if (matchJobs.isEmpty()) {
                throw new DdlException("Load job does not exist");
            }

            List<SyncJob> uncompletedSyncJob = matchJobs.stream().filter(entity -> !entity.isCompleted())
                    .collect(Collectors.toList());
            if (uncompletedSyncJob.isEmpty()) {
                throw new DdlException("There is no uncompleted job with jobName `"
                    + command.getJobName() + "`");
            }

            syncJobs.addAll(uncompletedSyncJob);
        } finally {
            readUnlock();
        }

        for (SyncJob syncJob : syncJobs) {
            syncJob.cancel(SyncFailMsg.MsgType.USER_CANCEL, "user cancel");
        }
    }

    public void pauseSyncJob(PauseSyncJobStmt stmt) throws UserException {
        String dbName = stmt.getDbFullName();
        String jobName = stmt.getJobName();

        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);

        List<SyncJob> syncJobs = Lists.newArrayList();
        readLock();
        try {
            List<SyncJob> matchJobs = getSyncJobsByDbAndJobName(db.getId(), jobName);
            if (matchJobs.isEmpty()) {
                throw new DdlException("Load job does not exist");
            }

            List<SyncJob> runningSyncJob = matchJobs.stream().filter(SyncJob::isRunning)
                    .collect(Collectors.toList());
            if (runningSyncJob.isEmpty()) {
                throw new DdlException("There is no running job with jobName `"
                        + stmt.getJobName() + "` to pause");
            }

            syncJobs.addAll(runningSyncJob);
        } finally {
            readUnlock();
        }

        for (SyncJob syncJob : syncJobs) {
            syncJob.pause();
        }
    }

    public void resumeSyncJob(ResumeSyncJobStmt stmt) throws UserException {
        String dbName = stmt.getDbFullName();
        String jobName = stmt.getJobName();

        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);

        List<SyncJob> syncJobs = Lists.newArrayList();
        readLock();
        try {
            List<SyncJob> matchJobs = getSyncJobsByDbAndJobName(db.getId(), jobName);
            if (matchJobs.isEmpty()) {
                throw new DdlException("Load job does not exist");
            }

            List<SyncJob> pausedSyncJob = matchJobs.stream().filter(SyncJob::isPaused)
                    .collect(Collectors.toList());
            if (pausedSyncJob.isEmpty()) {
                throw new DdlException("There is no paused job with jobName `"
                        + stmt.getJobName() + "` to resume");
            }

            syncJobs.addAll(pausedSyncJob);
        } finally {
            readUnlock();
        }

        for (SyncJob syncJob : syncJobs) {
            syncJob.resume();
        }
    }

    public void stopSyncJob(StopSyncJobStmt stmt) throws UserException {
        String dbName = stmt.getDbFullName();
        String jobName = stmt.getJobName();

        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);

        // List of sync jobs waiting to be cancelled
        List<SyncJob> syncJobs = Lists.newArrayList();
        readLock();
        try {
            List<SyncJob> matchJobs = getSyncJobsByDbAndJobName(db.getId(), jobName);
            if (matchJobs.isEmpty()) {
                throw new DdlException("Load job does not exist");
            }

            List<SyncJob> uncompletedSyncJob = matchJobs.stream().filter(entity -> !entity.isCompleted())
                    .collect(Collectors.toList());
            if (uncompletedSyncJob.isEmpty()) {
                throw new DdlException("There is no uncompleted job with jobName `"
                        + stmt.getJobName() + "`");
            }

            syncJobs.addAll(uncompletedSyncJob);
        } finally {
            readUnlock();
        }

        for (SyncJob syncJob : syncJobs) {
            syncJob.cancel(SyncFailMsg.MsgType.USER_CANCEL, "user cancel");
        }
    }

    // caller should hold the db lock
    private List<SyncJob> getSyncJobsByDbAndJobName(long dbId, String jobName) {
        List<SyncJob> syncJobs = Lists.newArrayList();
        Map<String, List<SyncJob>> jobNameToSyncJobs = dbIdToJobNameToSyncJobs.get(dbId);
        if (jobNameToSyncJobs != null) {
            if (jobNameToSyncJobs.containsKey(jobName)) {
                syncJobs.addAll(jobNameToSyncJobs.get(jobName));
            }
        }
        return syncJobs;
    }

    public List<List<Comparable>> getSyncJobsInfoByDbId(long dbId) {
        LinkedList<List<Comparable>> syncJobInfos = new LinkedList<List<Comparable>>();

        readLock();
        try {
            if (!dbIdToJobNameToSyncJobs.containsKey(dbId)) {
                return syncJobInfos;
            }
            Map<String, List<SyncJob>> jobNameToLoadJobs = dbIdToJobNameToSyncJobs.get(dbId);
            List<SyncJob> syncJobs = Lists.newArrayList();
            syncJobs.addAll(jobNameToLoadJobs.values()
                    .stream().flatMap(Collection::stream).collect(Collectors.toList()));
            for (SyncJob syncJob : syncJobs) {
                syncJobInfos.add(syncJob.getShowInfo());
            }
            return syncJobInfos;
        } finally {
            readUnlock();
        }
    }

    public List<SyncJob> getSyncJobs(SyncJob.JobState state) {
        List<SyncJob> result = Lists.newArrayList();
        readLock();
        try {
            for (SyncJob job : idToSyncJob.values()) {
                if (job.getJobState() == state) {
                    result.add(job);
                }
            }
        } finally {
            readUnlock();
        }

        return result;
    }

    public boolean isJobNameExist(String dbName, String jobName) throws DdlException {
        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
        boolean result = false;
        readLock();
        try {
            Map<String, List<SyncJob>> jobNameToSyncJobs = dbIdToJobNameToSyncJobs.get(db.getId());
            if (jobNameToSyncJobs != null && jobNameToSyncJobs.containsKey(jobName)) {
                List<SyncJob> matchJobs = jobNameToSyncJobs.get(jobName);
                for (SyncJob syncJob : matchJobs) {
                    if (!syncJob.isCancelled()) {
                        result = true;
                    }
                }
            }
        } finally {
            readUnlock();
        }

        return result;
    }

    public void updateNeedSchedule() throws UserException {
        for (SyncJob syncJob : idToSyncJob.values()) {
            if (!syncJob.isCompleted()) {
                syncJob.checkAndDoUpdate();
            }
        }
    }

    // Remove old sync jobs. Called periodically.
    // Stopped jobs will be removed after Config.label_keep_max_second.
    public void cleanOldSyncJobs() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("begin to clean old sync jobs ");
        }
        cleanFinishedSyncJobsIf(job -> job.isExpired(System.currentTimeMillis()));
    }

    /**
     * Remove completed jobs if total job num exceed Config.label_num_threshold
     */
    public void cleanOverLimitSyncJobs() {
        if (Config.label_num_threshold < 0 || idToSyncJob.size() <= Config.label_num_threshold) {
            return;
        }
        writeLock();
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("begin to clean finished sync jobs ");
            }
            Deque<SyncJob> finishedJobs = idToSyncJob
                    .values()
                    .stream()
                    .filter(SyncJob::isCompleted)
                    .sorted(Comparator.comparingLong(o -> o.finishTimeMs))
                    .collect(Collectors.toCollection(ArrayDeque::new));
            while (!finishedJobs.isEmpty() && idToSyncJob.size() > Config.label_num_threshold) {
                SyncJob syncJob = finishedJobs.pollFirst();
                if (!dbIdToJobNameToSyncJobs.containsKey(syncJob.getDbId())) {
                    continue;
                }
                idToSyncJob.remove(syncJob.getId());
                jobRemovedTrigger(syncJob);
            }
        } finally {
            writeUnlock();
        }
    }

    private void jobRemovedTrigger(SyncJob syncJob) {
        Map<String, List<SyncJob>> map = dbIdToJobNameToSyncJobs.get(syncJob.getDbId());
        List<SyncJob> list = map.get(syncJob.getJobName());
        list.remove(syncJob);
        if (list.isEmpty()) {
            map.remove(syncJob.getJobName());
        }
        if (map.isEmpty()) {
            dbIdToJobNameToSyncJobs.remove(syncJob.getDbId());
        }
    }

    public void cleanFinishedSyncJobsIf(Predicate<SyncJob> pred) {
        long currentTimeMs = System.currentTimeMillis();
        writeLock();
        try {
            Iterator<Map.Entry<Long, SyncJob>> iterator = idToSyncJob.entrySet().iterator();
            while (iterator.hasNext()) {
                SyncJob syncJob = iterator.next().getValue();
                if (pred.test(syncJob)) {
                    if (!dbIdToJobNameToSyncJobs.containsKey(syncJob.getDbId())) {
                        continue;
                    }
                    jobRemovedTrigger(syncJob);
                    iterator.remove();
                    LOG.info(new LogBuilder(LogKey.SYNC_JOB, syncJob.getId())
                            .add("finishTimeMs", syncJob.getFinishTimeMs())
                            .add("currentTimeMs", currentTimeMs)
                            .add("jobState", syncJob.getJobState())
                            .add("msg", "sync job has been cleaned")
                    );
                }
            }
        } finally {
            writeUnlock();
        }
    }

    public SyncJob getSyncJobById(long jobId) {
        return idToSyncJob.get(jobId);
    }

    public void readLock() {
        lock.readLock().lock();
    }

    public void readUnlock() {
        lock.readLock().unlock();
    }

    private void writeLock() {
        lock.writeLock().lock();
    }

    private void writeUnlock() {
        lock.writeLock().unlock();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        Collection<SyncJob> syncJobs = idToSyncJob.values();
        out.writeInt(syncJobs.size());
        for (SyncJob syncJob : syncJobs) {
            syncJob.write(out);
        }
    }

    public void readField(DataInput in) throws IOException {
        int size = in.readInt();
        for (int i = 0; i < size; i++) {
            SyncJob syncJob = SyncJob.read(in);
            unprotectedAddSyncJob(syncJob);
        }
    }

    public void replayAddSyncJob(SyncJob syncJob) {
        writeLock();
        try {
            unprotectedAddSyncJob(syncJob);
            LOG.info(new LogBuilder(LogKey.SYNC_JOB, syncJob.getId())
                    .add("msg", "replay create sync job.")
                    .build());
        } finally {
            writeUnlock();
        }
    }

    public void replayUpdateSyncJobState(SyncJob.SyncJobUpdateStateInfo info) {
        writeLock();
        try {
            long jobId = info.getId();
            SyncJob job = idToSyncJob.get(jobId);
            if (job == null) {
                LOG.warn(new LogBuilder(LogKey.SYNC_JOB, jobId)
                        .add("msg", "replay update sync job state failed. Job was not found.")
                        .build());
                return;
            }
            job.replayUpdateSyncJobState(info);
        } finally {
            writeUnlock();
        }
    }
}