SparkYarnConfigFiles.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import java.io.File;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.OutputKeys;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
@Deprecated
public class SparkYarnConfigFiles {
private static final Logger LOG = LogManager.getLogger(SparkYarnConfigFiles.class);
private static final String HADOOP_CONF_FILE = "core-site.xml";
private static final String YARN_CONF_FILE = "yarn-site.xml";
private static final String SPARK_HADOOP_PREFIX = "spark.hadoop.";
private static final String HADOOP_PREFIX = "hadoop.";
private static final String YARN_PREFIX = "yarn.";
private String configDir;
private List<ConfigFile> configFiles;
public String getConfigDir() {
return this.configDir;
}
public SparkYarnConfigFiles(String resourceName, Map<String, String> properties) {
this.configDir = Config.yarn_config_dir + "/" + resourceName;
this.configFiles = Lists.newArrayList();
createConfigFiles(properties);
}
// for unit test
public SparkYarnConfigFiles(String resourceName, String parentDir, Map<String, String> properties) {
this.configDir = parentDir + "/" + resourceName;
this.configFiles = Lists.newArrayList();
createConfigFiles(properties);
}
private void createConfigFiles(Map<String, String> properties) {
LOG.info("create config file, properties size: {}", properties.size());
configFiles.add(new XMLConfigFile(configDir + "/" + HADOOP_CONF_FILE,
getPropertiesByPrefix(properties, HADOOP_PREFIX)));
configFiles.add(new XMLConfigFile(configDir + "/" + YARN_CONF_FILE,
getPropertiesByPrefix(properties, YARN_PREFIX)));
}
public void prepare() throws LoadException {
initConfigFile();
}
private void initConfigFile() throws LoadException {
LOG.info("start to init config file. config dir: {}", this.configDir);
Preconditions.checkState(!Strings.isNullOrEmpty(configDir));
boolean needUpdate = false;
boolean needReplace = false;
CHECK: {
if (!checkConfigDirExists(this.configDir)) {
needUpdate = true;
break CHECK;
}
for (ConfigFile configFile : configFiles) {
String filePath = configFile.getFilePath();
if (!checkConfigFileExists(filePath)) {
needUpdate = true;
needReplace = true;
break CHECK;
}
}
}
if (needUpdate) {
updateConfig(needReplace);
}
LOG.info("init spark yarn config success, config dir={}, config file size={}",
configDir, configFiles.size());
}
private boolean checkConfigDirExists(String dir) {
boolean result = true;
File configDir = new File(dir);
if (!configDir.exists() || !configDir.isDirectory()) {
result = false;
}
LOG.info("check yarn client config dir exists, result: {}", result);
return result;
}
private boolean checkConfigFileExists(String filePath) {
boolean result = true;
File configFile = new File(filePath);
if (!configFile.exists() || !configFile.isFile()) {
result = false;
}
LOG.info("check yarn client config file path exists, result: {}, path: {}", result, filePath);
return result;
}
private void updateConfig(boolean needReplace) throws LoadException {
if (needReplace) {
clearAndDelete(this.configDir);
}
mkdir(this.configDir);
for (ConfigFile configFile : configFiles) {
configFile.createFile();
}
LOG.info("finished to update yarn client config dir, dir={}", configDir);
}
private Map<String, String> getPropertiesByPrefix(Map<String, String> properties, String prefix) {
Map<String, String> result = Maps.newHashMap();
Iterator<Map.Entry<String, String>> iterator = properties.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, String> property = iterator.next();
String key = property.getKey();
if (key.startsWith(SPARK_HADOOP_PREFIX)) {
String newKey = key.substring(SPARK_HADOOP_PREFIX.length());
if (newKey.startsWith(prefix)) {
result.put(newKey, property.getValue());
iterator.remove();
}
}
}
return result;
}
private void clearAndDelete(String deletePath) {
File file = new File(deletePath);
if (!file.exists()) {
return;
}
if (file.isFile()) {
file.delete();
return;
}
File[] files = file.listFiles();
for (File file1 : files) {
clearAndDelete(file1.getAbsolutePath());
}
file.delete();
}
private void mkdir(String configDir) {
File file = new File(configDir);
file.mkdirs();
}
// xml config file
public static class XMLConfigFile implements ConfigFile {
private static final String CONFIGURATION = "configuration";
private static final String PROPERTY = "property";
private static final String NAME = "name";
private static final String VALUE = "value";
private String filePath;
private Map<String, String> configProperties;
public XMLConfigFile(String filePath, Map<String, String> configProperties) {
this.filePath = filePath;
this.configProperties = configProperties;
}
@Override
public String getFilePath() {
return filePath;
}
@Override
public void createFile() throws LoadException {
createXML(this.filePath, this.configProperties);
}
private void createXML(String filePath, Map<String, String> properties) throws LoadException {
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder db = factory.newDocumentBuilder();
Document document = db.newDocument();
document.setXmlStandalone(true);
Element configuration = (Element) appendNode(document, CONFIGURATION, null);
for (Map.Entry<String, String> pair : properties.entrySet()) {
Element property = (Element) appendNode(configuration, PROPERTY, null);
appendNode(property, NAME, pair.getKey());
appendNode(property, VALUE, pair.getValue());
}
TransformerFactory tff = TransformerFactory.newInstance();
Transformer tf = tff.newTransformer();
tf.setOutputProperty(OutputKeys.INDENT, "yes");
tf.transform(new DOMSource(document), new StreamResult(new File(filePath)));
} catch (Exception e) {
throw new LoadException(e.getMessage());
}
}
private Node appendNode(Node parent, String tag, String content) {
Element child = null;
if (parent instanceof Document) {
child = ((Document) parent).createElement(tag);
} else {
child = parent.getOwnerDocument().createElement(tag);
}
if (content != null && !content.equals("")) {
child.setTextContent(content);
}
return parent.appendChild(child);
}
}
}