DppScheduler.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load;
import org.apache.doris.DorisFE;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.util.CommandResult;
import org.apache.doris.common.util.Util;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
public class DppScheduler {
private static final Logger LOG = LogManager.getLogger(DppScheduler.class);
private static final String HADOOP_CLIENT = DorisFE.DORIS_HOME_DIR + Config.dpp_hadoop_client_path;
private static final String DPP_OUTPUT_DIR = "export";
private static final String JOB_CONFIG_DIR = DorisFE.DORIS_HOME_DIR + "/temp/job_conf";
private static final String JOB_CONFIG_FILE = "jobconfig.json";
private static final String LOCAL_DPP_DIR = DorisFE.DORIS_HOME_DIR + "/lib/dpp/" + FeConstants.dpp_version;
private static final int DEFAULT_REDUCE_NUM = 1000;
private static final long GB = 1024 * 1024 * 1024L;
// hdfs://host:port/outputPath/dbId/loadLabel/etlOutputDir
private static final String ETL_OUTPUT_PATH = "%s%s/%d/%s/%s";
private static final String ETL_JOB_NAME = "palo2__%s__%s";
// hadoop command
private static final String HADOOP_BISTREAMING_CMD = "%s bistreaming %s -D mapred.job.name=\"%s\" "
+ "-input %s -output %s "
+ "-mapper \"sh mapred/mapper.sh\" "
+ "-reducer \"sh mapred/reducer.sh '\\\"%s\\\"'\" "
+ "-partitioner com.baidu.sos.mapred.lib.MapIntPartitioner "
+ "-cacheArchive %s/dpp/x86_64-scm-linux-gnu.tar.gz#tc "
+ "-cacheArchive %s/dpp/pypy.tar.gz#pypy "
+ "-cacheArchive %s/dpp/palo_dpp_mr.tar.gz#mapred "
+ "-numReduceTasks %d -file \"%s\" ";
private static final String HADOOP_STATUS_CMD = "%s job %s -status %s";
private static final String HADOOP_KILL_CMD = "%s job %s -kill %s";
private static final String HADOOP_LS_CMD = "%s fs %s -ls %s";
private static final String HADOOP_COUNT_CMD = "%s fs %s -count %s";
private static final String HADOOP_TEST_CMD = "%s fs %s -test %s %s";
private static final String HADOOP_MKDIR_CMD = "%s fs %s -mkdir %s";
private static final String HADOOP_RMR_CMD = "%s fs %s -rmr %s";
private static final String HADOOP_PUT_CMD = "%s fs %s -put %s %s";
private static final long HADOOP_SPEED_LIMIT_KB = 10240L; // 10M
private static final ConcurrentMap<String, Object> DPP_LOCK_MAP = Maps.newConcurrentMap();
private String hadoopConfig;
private String applicationsPath;
public DppScheduler(DppConfig dppConfig) {
hadoopConfig = getHadoopConfigsStr(dppConfig.getHadoopConfigs());
applicationsPath = dppConfig.getFsDefaultName() + dppConfig.getApplicationsPath();
}
private String getHadoopConfigsStr(Map<String, String> hadoopConfigs) {
List<String> configs = Lists.newArrayList();
for (Map.Entry<String, String> entry : hadoopConfigs.entrySet()) {
configs.add(String.format("%s=%s", entry.getKey(), entry.getValue()));
}
return String.format("-D %s", StringUtils.join(configs, " -D "));
}
public EtlSubmitResult submitEtlJob(long jobId, String loadLabel, String clusterName,
String dbName, Map<String, Object> jobConf, int retry) {
String etlJobId = null;
TStatus status = new TStatus();
status.setStatusCode(TStatusCode.OK);
List<String> failMsgs = Lists.newArrayList();
status.setErrorMsgs(failMsgs);
// check dpp lock map
if (retry > 0) {
// failed once, try check dpp application
LOG.warn("submit etl retry[{}] > 0. check dpp application", retry);
// prepare dpp applications
DPP_LOCK_MAP.putIfAbsent(clusterName, new Object());
Preconditions.checkState(DPP_LOCK_MAP.containsKey(clusterName));
synchronized (DPP_LOCK_MAP.get(clusterName)) {
try {
prepareDppApplications();
} catch (LoadException e) {
status.setStatusCode(TStatusCode.CANCELLED);
failMsgs.add(e.getMessage());
return new EtlSubmitResult(status, null);
}
}
}
// create job config file
String configDirPath = JOB_CONFIG_DIR + "/" + jobId;
File configDir = new File(configDirPath);
if (!Util.deleteDirectory(configDir)) {
String errMsg = "delete config dir error. job: " + jobId;
LOG.warn(errMsg + ", path: {}", configDirPath);
status.setStatusCode(TStatusCode.CANCELLED);
failMsgs.add(errMsg);
return new EtlSubmitResult(status, null);
}
if (!configDir.mkdirs()) {
String errMsg = "create config file dir error. job: " + jobId;
LOG.warn(errMsg + ", path: {}", configDirPath);
status.setStatusCode(TStatusCode.CANCELLED);
failMsgs.add(errMsg);
return new EtlSubmitResult(status, null);
}
File configFile = new File(configDirPath + "/" + JOB_CONFIG_FILE);
BufferedWriter bw = null;
try {
bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(configFile), "UTF-8"));
Gson gson = new Gson();
bw.write(gson.toJson(jobConf));
bw.flush();
} catch (IOException e) {
Util.deleteDirectory(configDir);
String errMsg = "create config file error. job: " + jobId;
LOG.warn(errMsg + ", file: {}", configDirPath + "/" + JOB_CONFIG_FILE);
status.setStatusCode(TStatusCode.CANCELLED);
failMsgs.add(errMsg);
return new EtlSubmitResult(status, null);
} finally {
if (bw != null) {
try {
bw.close();
} catch (IOException e) {
LOG.warn("close buffered writer error", e);
status.setStatusCode(TStatusCode.CANCELLED);
failMsgs.add(e.getMessage());
return new EtlSubmitResult(status, null);
}
}
}
// create input path
Set<String> inputPaths = getInputPaths(jobConf);
String inputPath = StringUtils.join(inputPaths, " -input ");
// reduce num
int reduceNumByInputSize = 0;
try {
reduceNumByInputSize = calcReduceNumByInputSize(inputPaths);
} catch (InputSizeInvalidException e) {
status.setStatusCode(TStatusCode.CANCELLED);
failMsgs.add(e.getMessage());
return new EtlSubmitResult(status, null);
}
int reduceNumByTablet = calcReduceNumByTablet(jobConf);
int reduceNum = Math.min(reduceNumByInputSize, reduceNumByTablet);
LOG.info("calculate reduce num. reduceNum: {}, reduceNumByInputSize: {}, reduceNumByTablet: {}",
reduceNum, reduceNumByInputSize, reduceNumByTablet);
// rm path
String outputPath = (String) jobConf.get("output_path");
deleteEtlOutputPath(outputPath);
// submit etl job
String etlJobName = String.format(ETL_JOB_NAME, dbName, loadLabel);
String hadoopRunCmd = String.format(HADOOP_BISTREAMING_CMD, HADOOP_CLIENT, hadoopConfig, etlJobName, inputPath,
outputPath, hadoopConfig, applicationsPath, applicationsPath, applicationsPath, reduceNum,
configFile.getAbsolutePath());
LOG.info(hadoopRunCmd);
String outputLine = null;
List<String> hadoopRunCmdList = Util.shellSplit(hadoopRunCmd);
String[] hadoopRunCmds = hadoopRunCmdList.toArray(new String[0]);
BufferedReader errorReader = null;
Process p = null;
long startTime = System.currentTimeMillis();
try {
p = Runtime.getRuntime().exec(hadoopRunCmds);
errorReader = new BufferedReader(new InputStreamReader(p.getErrorStream()));
for (int i = 0; i < 1000; i++) {
outputLine = errorReader.readLine();
LOG.info(outputLine);
if (Strings.isNullOrEmpty(outputLine)) {
LOG.warn("submit etl job fail. job id: {}, label: {}", jobId, loadLabel);
break;
}
if (outputLine.toLowerCase().contains("error")
|| outputLine.toLowerCase().contains("exception")) {
failMsgs.add(outputLine);
}
if (outputLine.indexOf("Running job") != -1) {
String[] arr = outputLine.split(":");
etlJobId = arr[arr.length - 1].trim();
break;
}
}
} catch (IOException e) {
LOG.warn("submit etl job error", e);
status.setStatusCode(TStatusCode.CANCELLED);
failMsgs.add(e.getMessage());
return new EtlSubmitResult(status, null);
} finally {
Util.deleteDirectory(configDir);
long endTime = System.currentTimeMillis();
LOG.info("finished submit hadoop job: {}. cost: {} ms", jobId, endTime - startTime);
if (p != null) {
p.destroy();
}
if (errorReader != null) {
try {
errorReader.close();
} catch (IOException e) {
LOG.warn("close buffered reader error", e);
status.setStatusCode(TStatusCode.CANCELLED);
failMsgs.add(e.getMessage());
return new EtlSubmitResult(status, null);
}
}
}
if (etlJobId == null) {
status.setStatusCode(TStatusCode.CANCELLED);
}
return new EtlSubmitResult(status, etlJobId);
}
private void prepareDppApplications() throws LoadException {
String[] envp = { "LC_ALL=" + Config.locale };
String hadoopDppDir = applicationsPath + "/dpp";
boolean needUpload = false;
// get local files
File dppDir = new File(LOCAL_DPP_DIR);
if (!dppDir.exists() || !dppDir.isDirectory()) {
LOG.warn("dpp dir does not exist");
throw new LoadException("dpp dir does not exist");
}
File[] localFiles = dppDir.listFiles();
// test hadoop dpp dir
String hadoopTestCmd = String.format(HADOOP_TEST_CMD, HADOOP_CLIENT, hadoopConfig, "-d", hadoopDppDir);
LOG.info(hadoopTestCmd);
CommandResult testResult = Util.executeCommand(hadoopTestCmd, envp);
if (testResult.getReturnCode() == 0) {
String hadoopDppFilePath = hadoopDppDir + "/*";
String hadoopCountCmd = String.format(HADOOP_COUNT_CMD, HADOOP_CLIENT, hadoopConfig, hadoopDppFilePath);
LOG.info(hadoopCountCmd);
CommandResult countResult = Util.executeCommand(hadoopCountCmd, envp);
if (countResult.getReturnCode() != 0) {
LOG.warn("hadoop count error, result: {}", countResult);
throw new LoadException("hadoop count error. msg: " + countResult.getStderr());
}
Map<String, Long> fileMap = Maps.newHashMap();
String[] fileInfos = countResult.getStdout().split("\n");
for (String fileInfo : fileInfos) {
String[] fileInfoArr = fileInfo.trim().split(" +");
if (fileInfoArr.length == 4) {
String filePath = fileInfoArr[3];
String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
long size = Long.parseLong(fileInfoArr[2]);
fileMap.put(fileName, size);
}
}
// diff files
for (File file : localFiles) {
if (!file.isFile()) {
continue;
}
String fileName = file.getName();
if (!fileMap.containsKey(fileName)) {
LOG.info("hadoop dpp file does not exist. file: {}", fileName);
needUpload = true;
break;
}
long localSize = file.length();
long hadoopSize = fileMap.get(fileName);
if (localSize != hadoopSize) {
LOG.info("dpp files size are different. file: {}, local: {}, hadoop: {}", fileName, localSize,
hadoopSize);
needUpload = true;
break;
}
}
} else {
LOG.info("hadoop dir does not exist. dir: {}", hadoopDppDir);
needUpload = true;
}
if (needUpload) {
// rmdir and mkdir
String hadoopRmrCmd = String.format(HADOOP_RMR_CMD, HADOOP_CLIENT, hadoopConfig, hadoopDppDir);
LOG.info(hadoopRmrCmd);
Util.executeCommand(hadoopRmrCmd, envp);
String hadoopMkdirCmd = String.format(HADOOP_MKDIR_CMD, HADOOP_CLIENT, hadoopConfig, hadoopDppDir);
LOG.info(hadoopMkdirCmd);
Util.executeCommand(hadoopMkdirCmd, envp);
// upload dpp applications
String hadoopPutConfig = hadoopConfig + String.format(" -D speed.limit.kb=%d", HADOOP_SPEED_LIMIT_KB);
String hadoopPutCmd = null;
CommandResult putResult = null;
for (File file : localFiles) {
hadoopPutCmd = String.format(HADOOP_PUT_CMD, HADOOP_CLIENT, hadoopPutConfig,
LOCAL_DPP_DIR + "/" + file.getName(), hadoopDppDir);
LOG.info(hadoopPutCmd);
putResult = Util.executeCommand(hadoopPutCmd, envp);
if (putResult.getReturnCode() != 0) {
LOG.warn("hadoop put fail. result: {}", putResult);
throw new LoadException("hadoop put fail. msg: " + putResult.getStderr());
}
}
}
}
private Set<String> getInputPaths(Map<String, Object> jobConf) {
Set<String> inputPaths = new HashSet<String>();
Map<String, Map> tables = (Map<String, Map>) jobConf.get("tables");
for (Map<String, Map> table : tables.values()) {
Map<String, Map> sourceFileSchema = (Map<String, Map>) table.get("source_file_schema");
for (Map<String, List<String>> schema : sourceFileSchema.values()) {
List<String> fileUrls = schema.get("file_urls");
inputPaths.addAll(fileUrls);
}
}
return inputPaths;
}
private int calcReduceNumByInputSize(Set<String> inputPaths) throws InputSizeInvalidException {
String[] envp = { "LC_ALL=" + Config.locale };
int reduceNum = 0;
String hadoopCountCmd = String.format(HADOOP_COUNT_CMD, HADOOP_CLIENT, hadoopConfig,
StringUtils.join(inputPaths, " "));
LOG.info(hadoopCountCmd);
CommandResult result = Util.executeCommand(hadoopCountCmd, envp);
if (result.getReturnCode() != 0) {
LOG.warn("hadoop count error, result: {}", result);
return DEFAULT_REDUCE_NUM;
}
// calc total size
long totalSizeB = 0L;
String[] fileInfos = result.getStdout().split("\n");
for (String fileInfo : fileInfos) {
String[] fileInfoArr = fileInfo.trim().split(" +");
if (fileInfoArr.length == 4) {
totalSizeB += Long.parseLong(fileInfoArr[2]);
}
}
// check input size limit
int inputSizeLimitGB = 0;
if (inputSizeLimitGB != 0) {
if (totalSizeB > inputSizeLimitGB * GB) {
String failMsg = "Input file size[" + (float) totalSizeB / GB + "GB]"
+ " exceeds system limit[" + inputSizeLimitGB + "GB]";
LOG.warn(failMsg);
throw new InputSizeInvalidException(failMsg);
}
}
if (totalSizeB != 0) {
reduceNum = (int) (totalSizeB / Config.dpp_bytes_per_reduce) + 1;
}
return reduceNum;
}
private int calcReduceNumByTablet(Map<String, Object> jobConf) {
int reduceNum = 0;
Map<String, Map> tables = (Map<String, Map>) jobConf.get("tables");
for (Map<String, Map> table : tables.values()) {
Map<String, Map> views = (Map<String, Map>) table.get("views");
for (Map<String, Object> view : views.values()) {
if (view.containsKey("hash_mod")) {
// hash or random
reduceNum += (int) view.get("hash_mod");
} else if (view.containsKey("key_ranges")) {
// key range
List<Object> rangeList = (List<Object>) view.get("key_ranges");
reduceNum += rangeList.size();
}
}
}
return reduceNum;
}
public EtlStatus getEtlJobStatus(String etlJobId) {
EtlStatus status = new EtlStatus();
status.setState(TEtlState.RUNNING);
String hadoopStatusCmd = String.format(HADOOP_STATUS_CMD, HADOOP_CLIENT, hadoopConfig, etlJobId);
LOG.info(hadoopStatusCmd);
String[] envp = { "LC_ALL=" + Config.locale };
CommandResult result = Util.executeCommand(hadoopStatusCmd, envp);
String stdout = result.getStdout();
if (result.getReturnCode() != 0) {
if (stdout != null && stdout.contains("Could not find job")) {
LOG.warn("cannot find hadoop etl job: {}", etlJobId);
status.setState(TEtlState.CANCELLED);
}
return status;
}
// stats and counters
Map<String, String> stats = new HashMap<String, String>();
Map<String, String> counters = new HashMap<String, String>();
String[] stdoutLines = stdout.split("\n");
String[] array = null;
for (String line : stdoutLines) {
array = line.split(":");
if (array.length == 2) {
stats.put(array[0].trim(), array[1].trim());
}
array = line.split("=");
if (array.length == 2) {
counters.put(array[0].trim(), array[1].trim());
}
}
status.setStats(stats);
status.setCounters(counters);
// tracking url
for (String key : counters.keySet()) {
if (key.startsWith("tracking URL")) {
// remove "tracking URL: ", total 14 chars
status.setTrackingUrl(key.substring(14) + "=" + counters.get(key));
break;
}
}
// job state
if (stats.containsKey("job state")) {
int jobState = Integer.parseInt(stats.get("job state"));
if (jobState == 3 || jobState == 5 || jobState == 6) {
// 3:failed 5or6:killed --> cancelled
status.setState(TEtlState.CANCELLED);
} else if (jobState == 2) {
// 2:success --> finished
status.setState(TEtlState.FINISHED);
} else {
// 0:init 1:running 4:prepare --> running
status.setState(TEtlState.RUNNING);
}
}
return status;
}
public Map<String, Long> getEtlFiles(String outputPath) {
String[] envp = { "LC_ALL=" + Config.locale };
Map<String, Long> fileMap = Maps.newHashMap();
String fileDir = outputPath + "/" + DPP_OUTPUT_DIR;
String hadoopLsCmd = String.format(HADOOP_LS_CMD, HADOOP_CLIENT, hadoopConfig, fileDir);
LOG.info(hadoopLsCmd);
CommandResult lsResult = Util.executeCommand(hadoopLsCmd, envp);
if (lsResult.getReturnCode() != 0) {
// check outputPath exist
String hadoopTestCmd = String.format(HADOOP_TEST_CMD, HADOOP_CLIENT, hadoopConfig, "-d", outputPath);
LOG.info(hadoopTestCmd);
CommandResult testResult = Util.executeCommand(hadoopTestCmd, envp);
if (testResult.getReturnCode() != 0) {
LOG.info("hadoop dir does not exist. dir: {}", outputPath);
return null;
}
// check outputPath + DPP_OUTPUT_DIR exist
hadoopTestCmd = String.format(HADOOP_TEST_CMD, HADOOP_CLIENT, hadoopConfig, "-d", fileDir);
LOG.info(hadoopTestCmd);
testResult = Util.executeCommand(hadoopTestCmd, envp);
if (testResult.getReturnCode() != 0) {
LOG.info("hadoop dir does not exist. dir: {}", fileDir);
return fileMap;
} else {
return null;
}
}
String stdout = lsResult.getStdout();
String[] lsFileResults = stdout.split("\n");
for (String line : lsFileResults) {
// drwxr-xr-x 3 palo palo 0 2014-12-08 14:37 /tmp/file
String[] fileInfos = line.split(" +");
if (fileInfos.length == 8) {
String filePath = fileInfos[fileInfos.length - 1];
long fileSize = -1;
try {
fileSize = Long.parseLong(fileInfos[4]);
} catch (NumberFormatException e) {
LOG.warn("file size format error. line: {}", line);
}
fileMap.put(filePath, fileSize);
}
}
return fileMap;
}
public void killEtlJob(String etlJobId) {
String[] envp = { "LC_ALL=" + Config.locale };
String hadoopKillCmd = String.format(HADOOP_KILL_CMD, HADOOP_CLIENT, hadoopConfig, etlJobId);
LOG.info(hadoopKillCmd);
Util.executeCommand(hadoopKillCmd, envp);
}
public void deleteEtlOutputPath(String outputPath) {
String[] envp = { "LC_ALL=" + Config.locale };
String hadoopRmCmd = String.format(HADOOP_RMR_CMD, HADOOP_CLIENT, hadoopConfig, outputPath);
LOG.info(hadoopRmCmd);
Util.executeCommand(hadoopRmCmd, envp);
}
public static String getEtlOutputPath(String fsDefaultName, String outputPath, long dbId, String loadLabel,
String etlOutputDir) {
return String.format(ETL_OUTPUT_PATH, fsDefaultName, outputPath, dbId, loadLabel, etlOutputDir);
}
private static class InputSizeInvalidException extends LoadException {
public InputSizeInvalidException(String msg) {
super(msg);
}
}
}