SparkEtlJobHandler.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load.loadv2;

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.CommandResult;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.loadv2.SparkLoadAppHandle.State;
import org.apache.doris.sparkdpp.DppResult;
import org.apache.doris.sparkdpp.EtlJobConfig;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TEtlState;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;

import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;

/**
 * SparkEtlJobHandler is responsible for
 * 1. submit spark etl job
 * 2. get spark etl job status
 * 3. kill spark etl job
 * 4. get spark etl file paths
 * 5. delete etl output path
 */
@Deprecated
public class SparkEtlJobHandler {
    private static final Logger LOG = LogManager.getLogger(SparkEtlJobHandler.class);

    private static final String CONFIG_FILE_NAME = "jobconfig.json";
    private static final String JOB_CONFIG_DIR = "configs";
    private static final String ETL_JOB_NAME = "doris__%s";
    private static final String LAUNCHER_LOG = "spark_launcher_%s_%s.log";
    // 5min
    private static final long GET_APPID_TIMEOUT_MS = 300000L;
    // 30s
    private static final long EXEC_CMD_TIMEOUT_MS = 30000L;
    // yarn command
    private static final String YARN_STATUS_CMD = "%s --config %s application -status %s";
    private static final String YARN_KILL_CMD = "%s --config %s application -kill %s";

    private static final String SPARK_ETL_JOB_CLASS = "org.apache.doris.load.loadv2.etl.SparkEtlJob";

    public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobConfig, SparkResource resource,
            BrokerDesc brokerDesc, SparkLoadAppHandle handle, SparkPendingTaskAttachment attachment)
            throws LoadException {
        // delete outputPath
        deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc);

        // init local dir
        if (!FeConstants.runningUnitTest) {
            initLocalDir();
        }

        // prepare dpp archive
        SparkRepository.SparkArchive archive = resource.prepareArchive();
        SparkRepository.SparkLibrary dppLibrary = archive.getDppLibrary();
        SparkRepository.SparkLibrary spark2xLibrary = archive.getSpark2xLibrary();

        // spark home
        String sparkHome = Config.spark_home_default_dir;
        // etl config path
        String configsHdfsDir = etlJobConfig.outputPath + "/" + JOB_CONFIG_DIR + "/";
        // etl config json path
        String jobConfigHdfsPath = configsHdfsDir + CONFIG_FILE_NAME;
        // spark submit app resource path
        String appResourceHdfsPath = dppLibrary.remotePath;
        // spark yarn archive path
        String jobArchiveHdfsPath = spark2xLibrary.remotePath;
        // spark yarn stage dir
        String jobStageHdfsPath = resource.getWorkingDir();
        // spark launcher log path
        String logFilePath = Config.spark_launcher_log_dir + "/" + String.format(LAUNCHER_LOG, loadJobId, loadLabel);

        // update archive and stage configs here
        Map<String, String> sparkConfigs = resource.getSparkConfigs();
        if (Strings.isNullOrEmpty(sparkConfigs.get("spark.yarn.archive"))) {
            sparkConfigs.put("spark.yarn.archive", jobArchiveHdfsPath);
        }
        if (Strings.isNullOrEmpty(sparkConfigs.get("spark.yarn.stage.dir"))) {
            sparkConfigs.put("spark.yarn.stage.dir", jobStageHdfsPath);
        }

        LOG.info("submit etl spark job, sparkConfigs:{}", sparkConfigs);

        try {
            byte[] configData = etlJobConfig.configToJson().getBytes("UTF-8");
            BrokerUtil.writeFile(configData, jobConfigHdfsPath, brokerDesc);
        } catch (UserException | UnsupportedEncodingException e) {
            throw new LoadException(e.getMessage());
        }

        Map<String, String> envParams = resource.getEnvConfigsWithoutPrefix();
        LOG.info("submit etl job,env:{}", envParams);

        SparkLauncher launcher = new SparkLauncher(envParams);
        // master      |  deployMode
        // ------------|-------------
        // yarn        |  cluster
        // spark://xx  |  client
        launcher.setMaster(resource.getMaster())
                .setDeployMode(resource.getDeployMode().name().toLowerCase())
                .setAppResource(appResourceHdfsPath)
                .setMainClass(SPARK_ETL_JOB_CLASS)
                .setAppName(String.format(ETL_JOB_NAME, loadLabel))
                .setSparkHome(sparkHome)
                .addAppArgs(jobConfigHdfsPath)
                .redirectError();

        // spark configs
        for (Map.Entry<String, String> entry : resource.getSparkConfigs().entrySet()) {
            launcher.setConf(entry.getKey(), entry.getValue());
        }

        // start app
        State state = null;
        String appId = null;
        String errMsg = "start spark app failed. error: ";
        try {
            Process process = launcher.launch();
            handle.setProcess(process);
            if (!FeConstants.runningUnitTest) {
                SparkLauncherMonitor.LogMonitor logMonitor =
                        SparkLauncherMonitor.createLogMonitor(handle, sparkConfigs);
                logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS);
                logMonitor.setRedirectLogPath(logFilePath);
                logMonitor.start();
                try {
                    logMonitor.join();
                } catch (InterruptedException e) {
                    logMonitor.interrupt();
                    throw new LoadException(errMsg + e.getMessage());
                }
            }
            appId = handle.getAppId();
            state = handle.getState();
        } catch (IOException e) {
            LOG.warn(errMsg, e);
            throw new LoadException(errMsg + e.getMessage());
        }

        if (fromSparkState(state) == TEtlState.CANCELLED) {
            throw new LoadException(errMsg + "spark app state: " + state.toString() + ", loadJobId:" + loadJobId);
        }

        if (appId == null) {
            throw new LoadException(errMsg + "Waiting too much time to get appId from handle. spark app state: "
                    + state.toString() + ", loadJobId:" + loadJobId);
        }

        // success
        attachment.setAppId(appId);
        attachment.setHandle(handle);
    }

    public EtlStatus getEtlJobStatus(SparkLoadAppHandle handle, String appId, long loadJobId, String etlOutputPath,
                                     SparkResource resource, BrokerDesc brokerDesc) throws LoadException {
        EtlStatus status = new EtlStatus();

        Preconditions.checkState(appId != null && !appId.isEmpty());
        if (resource.isYarnMaster()) {
            // prepare yarn config
            String configDir = resource.prepareYarnConfig();
            // yarn client path
            String yarnClient = resource.getYarnClientPath();
            // command: yarn --config configDir application -status appId
            String yarnStatusCmd = String.format(YARN_STATUS_CMD, yarnClient, configDir, appId);
            LOG.info(yarnStatusCmd);

            Map<String, String> envParams = resource.getEnvConfigsWithoutPrefix();
            int envNums = envParams.size() + 1;
            String[] envp = new String[envNums];
            int idx = 0;
            envp[idx++] = "LC_ALL=" + Config.locale;
            if (envParams.size() > 0) {
                for (Map.Entry<String, String> entry : envParams.entrySet()) {
                    String envItem = entry.getKey() + "=" + entry.getValue();
                    envp[idx++] = envItem;
                }
            }
            LOG.info("getEtlJobStatus,appId:{}, loadJobId:{}, env:{},resource:{}", appId, loadJobId, envp, resource);
            CommandResult result = Util.executeCommand(yarnStatusCmd, envp, EXEC_CMD_TIMEOUT_MS);
            if (result.getReturnCode() != 0) {
                String stderr = result.getStderr();
                if (stderr != null) {
                    // case application not exists
                    if (stderr.contains("doesn't exist in RM")) {
                        LOG.warn("spark app not found. spark app id: {}, load job id: {}", appId, loadJobId);
                        status.setState(TEtlState.CANCELLED);
                        status.setFailMsg(stderr);
                    }
                }
                LOG.warn("yarn application status failed. spark app id: {}, load job id: {}, timeout: {}, msg: {}",
                            appId, loadJobId, EXEC_CMD_TIMEOUT_MS, stderr);
                status.setState(TEtlState.CANCELLED);
                status.setFailMsg(stderr);
                return status;
            }
            ApplicationReport report = new YarnApplicationReport(result.getStdout()).getReport();
            LOG.info("yarn application -status {}. load job id: {}, output: {}, report: {}",
                    appId, loadJobId, result.getStdout(), report);
            YarnApplicationState state = report.getYarnApplicationState();
            FinalApplicationStatus faStatus = report.getFinalApplicationStatus();
            status.setState(fromYarnState(state, faStatus));
            if (status.getState() == TEtlState.CANCELLED) {
                if (state == YarnApplicationState.FINISHED) {
                    status.setFailMsg("spark app state: " + faStatus.toString());
                } else {
                    status.setFailMsg("yarn app state: " + state.toString());
                }
            }
            status.setTrackingUrl(handle.getUrl() != null ? handle.getUrl() : report.getTrackingUrl());
            status.setProgress((int) (report.getProgress() * 100));
        } else {
            // state from handle
            if (handle == null) {
                status.setFailMsg("spark app handle is null");
                status.setState(TEtlState.CANCELLED);
                return status;
            }

            State state = handle.getState();
            status.setState(fromSparkState(state));
            if (status.getState() == TEtlState.CANCELLED) {
                status.setFailMsg("spark app state: " + state.toString());
            }
            LOG.info("spark app id: {}, load job id: {}, app state: {}", appId, loadJobId, state);
        }

        if (status.getState() == TEtlState.FINISHED || status.getState() == TEtlState.CANCELLED) {
            // get dpp result
            String dppResultFilePath = EtlJobConfig.getDppResultFilePath(etlOutputPath);
            try {
                byte[] data = BrokerUtil.readFile(dppResultFilePath, brokerDesc, 0);
                String dppResultStr = new String(data, "UTF-8");
                DppResult dppResult = new Gson().fromJson(dppResultStr, DppResult.class);
                if (dppResult != null) {
                    status.setDppResult(dppResult);
                    if (status.getState() == TEtlState.CANCELLED && !Strings.isNullOrEmpty(dppResult.failedReason)) {
                        status.setFailMsg(dppResult.failedReason);
                    }
                }
            } catch (UserException | JsonSyntaxException | UnsupportedEncodingException e) {
                LOG.warn("read broker file failed. path: {}", dppResultFilePath, e);
            }
        }

        return status;
    }

    public void killEtlJob(SparkLoadAppHandle handle, String appId,
            long loadJobId, SparkResource resource) throws LoadException {
        if (resource.isYarnMaster()) {
            // The appId may be empty when the load job is in PENDING phase. This is because the appId is
            // parsed from the spark launcher process's output (spark launcher process submit job and then
            // return appId). In this case, the spark job has still not been submitted, we only need to kill
            // the spark launcher process.
            if (Strings.isNullOrEmpty(appId)) {
                appId = handle.getAppId();
                if (Strings.isNullOrEmpty(appId)) {
                    handle.kill();
                    return;
                }
            }
            // prepare yarn config
            String configDir = resource.prepareYarnConfig();
            // yarn client path
            String yarnClient = resource.getYarnClientPath();
            // command: yarn --config configDir application -kill appId
            String yarnKillCmd = String.format(YARN_KILL_CMD, yarnClient, configDir, appId);
            LOG.info(yarnKillCmd);
            Map<String, String> envParams = resource.getEnvConfigsWithoutPrefix();
            int envNums = envParams.size() + 1;
            String[] envp = new String[envNums];
            int idx = 0;
            envp[idx++] = "LC_ALL=" + Config.locale;
            if (envParams.size() > 0) {
                for (Map.Entry<String, String> entry : envParams.entrySet()) {
                    String envItem = entry.getKey() + "=" + entry.getValue();
                    envp[idx++] = envItem;
                }
            }
            LOG.info("killEtlJob, env:{}", envp);
            CommandResult result = Util.executeCommand(yarnKillCmd, envp, EXEC_CMD_TIMEOUT_MS);
            LOG.info("yarn application -kill {}, output: {}", appId, result.getStdout());
            if (result.getReturnCode() != 0) {
                String stderr = result.getStderr();
                LOG.warn("yarn application kill failed. app id: {}, load job id: {}, msg: {}",
                        appId, loadJobId, stderr);
            }
        } else {
            if (handle != null) {
                handle.stop();
            }
        }
    }

    public Map<String, Long> getEtlFilePaths(String outputPath, BrokerDesc brokerDesc) throws Exception {
        Map<String, Long> filePathToSize = Maps.newHashMap();

        List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
        String etlFilePaths = outputPath + "/*";
        try {
            BrokerUtil.parseFile(etlFilePaths, brokerDesc, fileStatuses);
        } catch (UserException e) {
            throw new Exception(e);
        }

        for (TBrokerFileStatus fstatus : fileStatuses) {
            if (fstatus.isDir) {
                continue;
            }
            filePathToSize.put(fstatus.getPath(), fstatus.getSize());
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("get spark etl file paths. files map: {}", filePathToSize);
        }

        return filePathToSize;
    }

    public static synchronized void initLocalDir() {
        String logDir = Config.spark_launcher_log_dir;
        File file = new File(logDir);
        if (!file.exists()) {
            file.mkdirs();
        }
    }

    public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) {
        try {
            BrokerUtil.deletePathWithBroker(outputPath, brokerDesc);
            LOG.info("delete path success. path: {}", outputPath);
        } catch (UserException e) {
            LOG.warn("delete path failed. path: {}", outputPath, e);
        }
    }

    private TEtlState fromYarnState(YarnApplicationState state, FinalApplicationStatus faStatus) {
        switch (state) {
            case FINISHED:
                if (faStatus == FinalApplicationStatus.SUCCEEDED) {
                    // finish and success
                    return TEtlState.FINISHED;
                } else {
                    // finish but fail
                    return TEtlState.CANCELLED;
                }
            case FAILED:
            case KILLED:
                // not finish
                return TEtlState.CANCELLED;
            default:
                // ACCEPTED NEW NEW_SAVING RUNNING SUBMITTED
                return TEtlState.RUNNING;
        }
    }

    private TEtlState fromSparkState(State state) {
        switch (state) {
            case FINISHED:
                return TEtlState.FINISHED;
            case FAILED:
            case KILLED:
            case LOST:
                return TEtlState.CANCELLED;
            default:
                // UNKNOWN CONNECTED SUBMITTED RUNNING
                return TEtlState.RUNNING;
        }
    }
}