SparkLauncherMonitor.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Deprecated
public class SparkLauncherMonitor {
private static final Logger LOG = LogManager.getLogger(SparkLauncherMonitor.class);
public static LogMonitor createLogMonitor(SparkLoadAppHandle handle, Map<String, String> resourceSparkConfig) {
return new LogMonitor(handle, resourceSparkConfig);
}
private static SparkLoadAppHandle.State fromYarnState(YarnApplicationState yarnState) {
switch (yarnState) {
case SUBMITTED:
case ACCEPTED:
return SparkLoadAppHandle.State.SUBMITTED;
case RUNNING:
return SparkLoadAppHandle.State.RUNNING;
case FINISHED:
return SparkLoadAppHandle.State.FINISHED;
case FAILED:
return SparkLoadAppHandle.State.FAILED;
case KILLED:
return SparkLoadAppHandle.State.KILLED;
default:
// NEW NEW_SAVING
return SparkLoadAppHandle.State.UNKNOWN;
}
}
// This monitor is use for monitoring the spark launcher process.
// User can use this monitor to get real-time `appId`, `state` and `tracking-url`
// of spark launcher by reading and analyze the output of process.
public static class LogMonitor extends Thread {
private final Process process;
private SparkLoadAppHandle handle;
private long submitTimeoutMs;
private boolean isStop;
private OutputStream outputStream;
private static final String STATE = "state";
private static final String QUEUE = "queue";
private static final String START_TIME = "start time";
private static final String FINAL_STATUS = "final status";
private static final String URL = "tracking URL";
private static final String USER = "user";
// 5min
private static final long DEFAULT_SUBMIT_TIMEOUT_MS = 300000L;
private static final String SUBMIT_TIMEOUT_KEY = "spark.submit.timeout";
public LogMonitor(SparkLoadAppHandle handle, Map<String, String> resourceSparkConfig) {
this.handle = handle;
this.process = handle.getProcess();
this.isStop = false;
if (MapUtils.isNotEmpty(resourceSparkConfig)
&& StringUtils.isNotEmpty(resourceSparkConfig.get(SUBMIT_TIMEOUT_KEY))) {
setSubmitTimeoutMs(Long.parseLong(resourceSparkConfig.get(SUBMIT_TIMEOUT_KEY)));
} else {
setSubmitTimeoutMs(DEFAULT_SUBMIT_TIMEOUT_MS);
}
}
public void setSubmitTimeoutMs(long submitTimeoutMs) {
this.submitTimeoutMs = submitTimeoutMs;
}
public long getSubmitTimeoutMs() {
return submitTimeoutMs;
}
public void setRedirectLogPath(String redirectLogPath) throws IOException {
this.outputStream = new FileOutputStream(new File(redirectLogPath), false);
this.handle.setLogPath(redirectLogPath);
}
// Normally, log monitor will automatically stop if the spark app state changes
// to RUNNING.
// But if the spark app state changes to FAILED/KILLED/LOST, log monitor will stop
// and kill the spark launcher process.
// There is a `submitTimeout` for preventing the spark app state from staying in
// UNKNOWN/SUBMITTED for a long time.
@Override
public void run() {
if (handle.getState() == SparkLoadAppHandle.State.KILLED) {
// If handle has been killed, kill the process
process.destroyForcibly();
return;
}
BufferedReader outReader = null;
String line = null;
long startTime = System.currentTimeMillis();
try {
outReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
while (!isStop && (line = outReader.readLine()) != null) {
if (outputStream != null) {
outputStream.write((line + "\n").getBytes());
}
SparkLoadAppHandle.State oldState = handle.getState();
SparkLoadAppHandle.State newState = oldState;
// parse state and appId
if (line.contains(STATE)) {
// 1. state
String state = regexGetState(line);
if (state != null) {
YarnApplicationState yarnState = YarnApplicationState.valueOf(state);
newState = fromYarnState(yarnState);
if (newState != oldState) {
handle.setState(newState);
}
}
// 2. appId
String appId = regexGetAppId(line);
if (appId != null) {
if (!appId.equals(handle.getAppId())) {
handle.setAppId(appId);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("spark appId that handle get is {}, state: {}",
handle.getAppId(), handle.getState().toString());
}
switch (newState) {
case UNKNOWN:
case CONNECTED:
case SUBMITTED:
// If the app stays in the UNKNOWN/CONNECTED/SUBMITTED state
// for more than submitTimeoutMs stop monitoring and kill the process
if (System.currentTimeMillis() - startTime > submitTimeoutMs) {
isStop = true;
handle.kill();
}
break;
case RUNNING:
case FINISHED:
// There's no need to parse all logs of handle process to get all the information.
// As soon as the state changes to RUNNING/FINISHED,
// stop monitoring but keep the process alive.
isStop = true;
break;
case KILLED:
case FAILED:
case LOST:
// If the state changes to KILLED/FAILED/LOST,
// stop monitoring and kill the process
isStop = true;
handle.kill();
break;
default:
Preconditions.checkState(false, "wrong spark app state");
}
} else if (line.contains(QUEUE) || line.contains(START_TIME) || line.contains(FINAL_STATUS)
|| line.contains(URL) || line.contains(USER)) { // parse other values
String value = getValue(line);
if (!Strings.isNullOrEmpty(value)) {
try {
if (line.contains(QUEUE)) {
handle.setQueue(value);
} else if (line.contains(START_TIME)) {
handle.setStartTime(Long.parseLong(value));
} else if (line.contains(FINAL_STATUS)) {
handle.setFinalStatus(FinalApplicationStatus.valueOf(value));
} else if (line.contains(URL)) {
handle.setUrl(value);
} else if (line.contains(USER)) {
handle.setUser(value);
}
} catch (IllegalArgumentException e) {
LOG.warn("parse log encounter an error, line: {}, msg: {}", line, e.getMessage());
}
}
}
}
} catch (Exception e) {
LOG.warn("Exception monitoring process.", e);
} finally {
try {
if (outReader != null) {
outReader.close();
}
if (outputStream != null) {
outputStream.close();
}
} catch (IOException e) {
LOG.warn("close buffered reader error", e);
}
}
}
// e.g.
// input: "final status: SUCCEEDED"
// output: "SUCCEEDED"
private static String getValue(String line) {
String result = null;
List<String> entry = Splitter.onPattern(":").trimResults().limit(2).splitToList(line);
if (entry.size() == 2) {
result = entry.get(1);
}
return result;
}
// e.g.
// input: "Application report for application_1573630236805_6864759 (state: ACCEPTED)"
// output: "ACCEPTED"
private static String regexGetState(String line) {
String result = null;
Matcher stateMatcher = Pattern.compile("(?<=\\(state: )(.+?)(?=\\))").matcher(line);
if (stateMatcher.find()) {
result = stateMatcher.group();
}
return result;
}
// e.g.
// input: "Application report for application_1573630236805_6864759 (state: ACCEPTED)"
// output: "application_1573630236805_6864759"
private static String regexGetAppId(String line) {
String result = null;
Matcher appIdMatcher = Pattern.compile("application_[0-9]+_[0-9]+").matcher(line);
if (appIdMatcher.find()) {
result = appIdMatcher.group();
}
return result;
}
}
}