SparkResource.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.load.loadv2.SparkRepository;
import org.apache.doris.load.loadv2.SparkYarnConfigFiles;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Spark resource for etl or query.
* working_dir and broker[.xxx] are optional and used in spark ETL.
* working_dir is used to store ETL intermediate files and broker is used to read the intermediate files by BE.
*
* Spark resource example:
* CREATE EXTERNAL RESOURCE "spark0"
* PROPERTIES
* (
* "type" = "spark",
* "spark.master" = "yarn",
* "spark.submit.deployMode" = "cluster",
* "spark.jars" = "xxx.jar,yyy.jar",
* "spark.files" = "/tmp/aaa,/tmp/bbb",
* "spark.executor.memory" = "1g",
* "spark.yarn.queue" = "queue0",
* "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
* "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
* "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
* "broker" = "broker0",
* "broker.username" = "user0",
* "broker.password" = "password0"
* );
*
* DROP RESOURCE "spark0";
*/
@Deprecated
public class SparkResource extends Resource {
private static final Logger LOG = LogManager.getLogger(SparkResource.class);
private static final String SPARK_MASTER = "spark.master";
private static final String SPARK_SUBMIT_DEPLOY_MODE = "spark.submit.deployMode";
private static final String WORKING_DIR = "working_dir";
private static final String BROKER = "broker";
private static final String YARN_MASTER = "yarn";
private static final String SPARK_CONFIG_PREFIX = "spark.";
private static final String BROKER_PROPERTY_PREFIX = "broker.";
private static final String ENV_PREFIX = "env.";
// spark uses hadoop configs in the form of spark.hadoop.*
private static final String SPARK_HADOOP_CONFIG_PREFIX = "spark.hadoop.";
private static final String SPARK_YARN_RESOURCE_MANAGER_ADDRESS = "spark.hadoop.yarn.resourcemanager.address";
private static final String SPARK_FS_DEFAULT_FS = "spark.hadoop.fs.defaultFS";
private static final String YARN_RESOURCE_MANAGER_ADDRESS = "yarn.resourcemanager.address";
private static final String SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED = "spark.hadoop.yarn.resourcemanager.ha.enabled";
private static final String SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS = "spark.hadoop.yarn.resourcemanager.ha.rm-ids";
private static final String YARN_RESOURCE_MANAGER_ADDRESS_FOMART = "spark.hadoop.yarn.resourcemanager.address.%s";
private static final String YARN_RESOURCE_MANAGER_HOSTNAME_FORMAT = "spark.hadoop.yarn.resourcemanager.hostname.%s";
public enum DeployMode {
CLUSTER,
CLIENT;
public static DeployMode fromString(String deployMode) {
for (DeployMode mode : DeployMode.values()) {
if (mode.name().equalsIgnoreCase(deployMode)) {
return mode;
}
}
return null;
}
}
@SerializedName(value = "sparkConfigs")
private Map<String, String> sparkConfigs;
@SerializedName(value = "workingDir")
private String workingDir;
@SerializedName(value = "broker")
private String broker;
// broker username and password
@SerializedName(value = "brokerProperties")
private Map<String, String> brokerProperties;
@SerializedName(value = "envConfigs")
private Map<String, String> envConfigs;
public SparkResource() {
super();
}
public SparkResource(String name) {
this(name, Maps.newHashMap(), null, null, Maps.newHashMap(), Maps.newHashMap());
}
// "public" for testing
public SparkResource(String name, Map<String, String> sparkConfigs, String workingDir, String broker,
Map<String, String> brokerProperties, Map<String, String> envConfigs) {
super(name, ResourceType.SPARK);
this.sparkConfigs = sparkConfigs;
this.workingDir = workingDir;
this.broker = broker;
this.brokerProperties = brokerProperties;
this.envConfigs = envConfigs;
}
public String getMaster() {
return sparkConfigs.get(SPARK_MASTER);
}
public DeployMode getDeployMode() {
return DeployMode.fromString(sparkConfigs.get(SPARK_SUBMIT_DEPLOY_MODE));
}
public String getWorkingDir() {
return workingDir;
}
public String getBroker() {
return broker;
}
public Map<String, String> getBrokerPropertiesWithoutPrefix() {
Map<String, String> properties = Maps.newHashMap();
for (Map.Entry<String, String> entry : brokerProperties.entrySet()) {
String key = entry.getKey();
if (key.startsWith(BROKER_PROPERTY_PREFIX)) {
properties.put(key.substring(key.indexOf(".") + 1), entry.getValue());
}
}
return properties;
}
public Map<String, String> getSparkConfigs() {
return sparkConfigs;
}
public Map<String, String> getEnvConfigsWithoutPrefix() {
return Stream.concat(
getSystemEnvConfigs().entrySet().stream(),
Optional.ofNullable(envConfigs).orElse(Collections.emptyMap()).entrySet().stream()
)
.filter(entry -> entry.getKey().startsWith(ENV_PREFIX))
.collect(Collectors.toMap(
entry -> entry.getKey().substring(ENV_PREFIX.length()),
Entry::getValue,
(oldValue, newValue) -> newValue
));
}
public Map<String, String> getSystemEnvConfigs() {
return System.getenv();
}
public Pair<String, String> getYarnResourcemanagerAddressPair() {
return Pair.of(YARN_RESOURCE_MANAGER_ADDRESS, sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_ADDRESS));
}
public SparkResource getCopiedResource() {
return new SparkResource(name, Maps.newHashMap(sparkConfigs), workingDir, broker, brokerProperties, envConfigs);
}
@Override
public Map<String, String> getCopiedProperties() {
Map<String, String> copiedProperties = Maps.newHashMap(sparkConfigs);
return copiedProperties;
}
/**
* Each SparkResource has and only has one SparkRepository.
* This method get the remote archive which matches the dpp version from remote repository
*/
public synchronized SparkRepository.SparkArchive prepareArchive() throws LoadException {
String remoteRepositoryPath = workingDir + "/" + Env.getCurrentEnv().getClusterId()
+ "/" + SparkRepository.REPOSITORY_DIR + name;
BrokerDesc brokerDesc = new BrokerDesc(broker, getBrokerPropertiesWithoutPrefix());
SparkRepository repository = new SparkRepository(remoteRepositoryPath, brokerDesc);
// This checks and uploads the remote archive.
repository.prepare();
SparkRepository.SparkArchive archive = repository.getCurrentArchive();
// Normally, an archive should contain a DPP library and a SPARK library
Preconditions.checkState(archive.libraries.size() == 2);
SparkRepository.SparkLibrary dppLibrary = archive.getDppLibrary();
SparkRepository.SparkLibrary spark2xLibrary = archive.getSpark2xLibrary();
if (dppLibrary == null || spark2xLibrary == null) {
throw new LoadException("failed to get libraries from remote archive");
}
return archive;
}
// Each SparkResource has and only has one yarn config to run yarn command
// This method will write all the configuration start with "spark.hadoop." into config files in a specific directory
public synchronized String prepareYarnConfig() throws LoadException {
SparkYarnConfigFiles yarnConfigFiles = new SparkYarnConfigFiles(name, getSparkHadoopConfig(sparkConfigs));
yarnConfigFiles.prepare();
return yarnConfigFiles.getConfigDir();
}
public String getYarnClientPath() throws LoadException {
String yarnClientPath = Config.yarn_client_path;
File file = new File(yarnClientPath);
if (!file.exists() || !file.isFile()) {
throw new LoadException("yarn client does not exist in path: " + yarnClientPath);
}
return yarnClientPath;
}
public boolean isYarnMaster() {
return getMaster().equalsIgnoreCase(YARN_MASTER);
}
public void update(ResourceDesc resourceDesc) throws DdlException {
Preconditions.checkState(name.equals(resourceDesc.getName()));
Map<String, String> properties = resourceDesc.getProperties();
if (properties == null) {
return;
}
// update properties
updateProperties(properties);
}
private void updateProperties(Map<String, String> properties) throws DdlException {
// update spark configs
if (properties.containsKey(SPARK_MASTER)) {
throw new DdlException("Cannot change spark master");
}
sparkConfigs.putAll(getSparkConfig(properties));
// update working dir and broker
if (properties.containsKey(WORKING_DIR)) {
workingDir = properties.get(WORKING_DIR);
}
if (properties.containsKey(BROKER)) {
broker = properties.get(BROKER);
}
brokerProperties.putAll(getBrokerProperties(properties));
Map<String, String> env = getEnvConfig(properties);
if (env.size() > 0) {
if (envConfigs == null) {
envConfigs = env;
} else {
envConfigs.putAll(env);
}
}
LOG.info("updateProperties,{},{}", properties, envConfigs);
}
@Override
protected void setProperties(ImmutableMap<String, String> properties) throws DdlException {
Preconditions.checkState(properties != null);
// get spark configs
sparkConfigs = getSparkConfig(properties);
envConfigs = getEnvConfig(properties);
LOG.info("setProperties,{},{}", properties, envConfigs);
// check master and deploy mode
if (getMaster() == null) {
throw new DdlException("Missing " + SPARK_MASTER + " in properties");
}
String deployModeStr = sparkConfigs.get(SPARK_SUBMIT_DEPLOY_MODE);
if (deployModeStr != null) {
DeployMode deployMode = DeployMode.fromString(deployModeStr);
if (deployMode == null) {
throw new DdlException("Unknown deploy mode: " + deployModeStr);
}
} else {
throw new DdlException("Missing " + SPARK_SUBMIT_DEPLOY_MODE + " in properties");
}
// if deploy machines do not set HADOOP_CONF_DIR env, we should set these configs blow
if (isYarnMaster()) {
if (!sparkConfigs.containsKey(SPARK_FS_DEFAULT_FS)) {
throw new DdlException("Missing (" + SPARK_FS_DEFAULT_FS + ") in yarn master");
}
String haEnabled = sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED);
if (StringUtils.isNotEmpty(haEnabled) && "true".equals(haEnabled)) {
if (StringUtils.isEmpty(SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS)) {
throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS + ") in yarn master, "
+ "when " + SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED + "=true.");
}
String[] haIds = sparkConfigs.get(SPARK_YARN_RESOURCE_MANAGER_HA_RMIDS).split(",");
for (String haId : haIds) {
String addressKey = String.format(YARN_RESOURCE_MANAGER_ADDRESS_FOMART, haId);
String hostnameKey = String.format(YARN_RESOURCE_MANAGER_HOSTNAME_FORMAT, haId);
if (!sparkConfigs.containsKey(addressKey) && !sparkConfigs.containsKey(hostnameKey)) {
throw new DdlException("Missing " + addressKey + " or " + hostnameKey + " in yarn master, "
+ "when " + SPARK_YARN_RESOURCE_MANAGER_HA_ENABLED + "=true.");
}
}
} else if (!sparkConfigs.containsKey(SPARK_YARN_RESOURCE_MANAGER_ADDRESS)) {
throw new DdlException("Missing (" + SPARK_YARN_RESOURCE_MANAGER_ADDRESS + ") in yarn master, "
+ "or not turned on ha.");
}
}
// check working dir and broker
workingDir = properties.get(WORKING_DIR);
broker = properties.get(BROKER);
if ((workingDir == null && broker != null) || (workingDir != null && broker == null)) {
throw new DdlException("working_dir and broker should be assigned at the same time");
}
// check broker exist
if (broker != null && !Env.getCurrentEnv().getBrokerMgr().containsBroker(broker)) {
throw new DdlException("Unknown broker name(" + broker + ")");
}
brokerProperties = getBrokerProperties(properties);
}
private Map<String, String> getSparkConfig(Map<String, String> properties) {
Map<String, String> sparkConfig = Maps.newHashMap();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(SPARK_CONFIG_PREFIX)) {
sparkConfig.put(entry.getKey(), entry.getValue());
}
}
return sparkConfig;
}
private Map<String, String> getEnvConfig(Map<String, String> properties) {
Map<String, String> envConfig = Maps.newHashMap();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(ENV_PREFIX)) {
envConfig.put(entry.getKey(), entry.getValue());
}
}
return envConfig;
}
private Map<String, String> getSparkHadoopConfig(Map<String, String> properties) {
Map<String, String> sparkConfig = Maps.newHashMap();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(SPARK_HADOOP_CONFIG_PREFIX)) {
sparkConfig.put(entry.getKey(), entry.getValue());
}
}
return sparkConfig;
}
private Map<String, String> getBrokerProperties(Map<String, String> properties) {
Map<String, String> brokerProperties = Maps.newHashMap();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(BROKER_PROPERTY_PREFIX)) {
brokerProperties.put(entry.getKey(), entry.getValue());
}
}
return brokerProperties;
}
@Override
public void modifyProperties(Map<String, String> properties) throws DdlException {
updateProperties(properties);
super.modifyProperties(properties);
}
@Override
public void checkProperties(Map<String, String> properties) throws AnalysisException {
Map<String, String> copiedProperties = Maps.newHashMap(properties);
copiedProperties.keySet().removeAll(getSparkConfig(properties).keySet());
copiedProperties.keySet().removeAll(getBrokerProperties(properties).keySet());
copiedProperties.remove(BROKER);
copiedProperties.remove(WORKING_DIR);
if (!copiedProperties.isEmpty()) {
throw new AnalysisException("Unknown spark resource properties: " + copiedProperties);
}
}
@Override
protected void getProcNodeData(BaseProcResult result) {
String lowerCaseType = type.name().toLowerCase();
for (Map.Entry<String, String> entry : sparkConfigs.entrySet()) {
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
}
if (workingDir != null) {
result.addRow(Lists.newArrayList(name, lowerCaseType, SparkResource.WORKING_DIR, workingDir));
}
if (broker != null) {
result.addRow(Lists.newArrayList(name, lowerCaseType, SparkResource.BROKER, broker));
}
for (Map.Entry<String, String> entry : brokerProperties.entrySet()) {
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
}
if (envConfigs != null) {
for (Map.Entry<String, String> entry : envConfigs.entrySet()) {
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
}
}
}
}