SparkLoadPendingTask.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.HiveTable;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.Load;
import org.apache.doris.sparkdpp.EtlJobConfig;
import org.apache.doris.sparkdpp.EtlJobConfig.EtlColumn;
import org.apache.doris.sparkdpp.EtlJobConfig.EtlColumnMapping;
import org.apache.doris.sparkdpp.EtlJobConfig.EtlFileGroup;
import org.apache.doris.sparkdpp.EtlJobConfig.EtlIndex;
import org.apache.doris.sparkdpp.EtlJobConfig.EtlJobProperty;
import org.apache.doris.sparkdpp.EtlJobConfig.EtlPartition;
import org.apache.doris.sparkdpp.EtlJobConfig.EtlPartitionInfo;
import org.apache.doris.sparkdpp.EtlJobConfig.EtlTable;
import org.apache.doris.sparkdpp.EtlJobConfig.FilePatternVersion;
import org.apache.doris.sparkdpp.EtlJobConfig.SourceType;
import org.apache.doris.transaction.TransactionState;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
// 1. create etl job config and write it into jobconfig.json file
// 2. submit spark etl job
@Deprecated
public class SparkLoadPendingTask extends LoadTask {
private static final Logger LOG = LogManager.getLogger(SparkLoadPendingTask.class);
private final Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToBrokerFileGroups;
private final SparkResource resource;
private final BrokerDesc brokerDesc;
private final long dbId;
private final String loadLabel;
private final long loadJobId;
private final long transactionId;
private EtlJobConfig etlJobConfig;
private SparkLoadAppHandle sparkLoadAppHandle;
public SparkLoadPendingTask(SparkLoadJob loadTaskCallback,
Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToBrokerFileGroups,
SparkResource resource, BrokerDesc brokerDesc, Priority priority) {
super(loadTaskCallback, TaskType.PENDING, priority);
this.retryTime = 3;
this.attachment = new SparkPendingTaskAttachment(signature);
this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups;
this.resource = resource;
this.brokerDesc = brokerDesc;
this.dbId = loadTaskCallback.getDbId();
this.loadJobId = loadTaskCallback.getId();
this.loadLabel = loadTaskCallback.getLabel();
this.transactionId = loadTaskCallback.getTransactionId();
this.sparkLoadAppHandle = loadTaskCallback.getHandle();
this.failMsg = new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL);
toLowCaseForFileGroups();
}
void toLowCaseForFileGroups() {
aggKeyToBrokerFileGroups.values().forEach(fgs -> {
fgs.forEach(fg -> {
fg.getColumnExprList()
.forEach(expr -> expr.setColumnName(expr.getColumnName().toLowerCase(Locale.ROOT)));
});
});
}
@Override
void executeTask() throws UserException {
LOG.info("begin to execute spark pending task. load job id: {}", loadJobId);
submitEtlJob();
}
private void submitEtlJob() throws LoadException {
SparkPendingTaskAttachment sparkAttachment = (SparkPendingTaskAttachment) attachment;
// retry different output path
etlJobConfig.outputPath = EtlJobConfig.getOutputPath(resource.getWorkingDir(), dbId, loadLabel, signature);
sparkAttachment.setOutputPath(etlJobConfig.outputPath);
// handler submit etl job
SparkEtlJobHandler handler = new SparkEtlJobHandler();
handler.submitEtlJob(loadJobId, loadLabel, etlJobConfig, resource,
brokerDesc, sparkLoadAppHandle, sparkAttachment);
LOG.info("submit spark etl job success. load job id: {}, attachment: {}", loadJobId, sparkAttachment);
}
@Override
public void init() throws LoadException {
createEtlJobConf();
}
private void createEtlJobConf() throws LoadException {
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new LoadException("db does not exist. id: " + s));
Map<Long, EtlTable> tables = Maps.newHashMap();
Map<Long, Set<Long>> tableIdToPartitionIds = Maps.newHashMap();
Set<Long> allPartitionsTableIds = Sets.newHashSet();
prepareTablePartitionInfos(db, tableIdToPartitionIds, allPartitionsTableIds);
List<Table> tableList;
try {
tableList = db.getTablesOnIdOrderOrThrowException(Lists.newArrayList(allPartitionsTableIds));
} catch (MetaNotFoundException e) {
throw new LoadException(e.getMessage());
}
MetaLockUtils.readLockTables(tableList);
try {
for (Map.Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : aggKeyToBrokerFileGroups.entrySet()) {
FileGroupAggKey aggKey = entry.getKey();
long tableId = aggKey.getTableId();
OlapTable table = (OlapTable) db.getTableOrException(
tableId, s -> new LoadException("table does not exist. id: " + s));
EtlTable etlTable = null;
if (tables.containsKey(tableId)) {
etlTable = tables.get(tableId);
} else {
// indexes
List<EtlIndex> etlIndexes = createEtlIndexes(table);
// partition info
EtlPartitionInfo etlPartitionInfo = createEtlPartitionInfo(table,
tableIdToPartitionIds.get(tableId));
etlTable = new EtlTable(etlIndexes, etlPartitionInfo);
tables.put(tableId, etlTable);
// add table indexes to transaction state
TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, transactionId);
if (txnState == null) {
throw new LoadException("txn does not exist. id: " + transactionId);
}
txnState.addTableIndexes(table);
}
// file group
for (BrokerFileGroup fileGroup : entry.getValue()) {
etlTable.addFileGroup(createEtlFileGroup(fileGroup, tableIdToPartitionIds.get(tableId), db, table));
}
}
} finally {
MetaLockUtils.readUnlockTables(tableList);
}
String outputFilePattern = EtlJobConfig.getOutputFilePattern(loadLabel, FilePatternVersion.V1);
// strictMode timezone properties
EtlJobProperty properties = new EtlJobProperty();
properties.strictMode = ((LoadJob) callback).isStrictMode();
properties.timezone = ((LoadJob) callback).getTimeZone();
etlJobConfig = new EtlJobConfig(tables, outputFilePattern, loadLabel, properties);
}
private void prepareTablePartitionInfos(Database db, Map<Long, Set<Long>> tableIdToPartitionIds,
Set<Long> allPartitionsTableIds) throws LoadException {
for (FileGroupAggKey aggKey : aggKeyToBrokerFileGroups.keySet()) {
long tableId = aggKey.getTableId();
if (allPartitionsTableIds.contains(tableId)) {
continue;
}
OlapTable table = (OlapTable) db.getTableOrException(
tableId, s -> new LoadException("table does not exist. id: " + s));
table.readLock();
try {
Set<Long> partitionIds;
if (tableIdToPartitionIds.containsKey(tableId)) {
partitionIds = tableIdToPartitionIds.get(tableId);
} else {
partitionIds = Sets.newHashSet();
tableIdToPartitionIds.put(tableId, partitionIds);
}
Set<Long> groupPartitionIds = aggKey.getPartitionIds();
// if not assign partition, use all partitions
if (groupPartitionIds == null || groupPartitionIds.isEmpty()) {
for (Partition partition : table.getPartitions()) {
partitionIds.add(partition.getId());
}
allPartitionsTableIds.add(tableId);
} else {
partitionIds.addAll(groupPartitionIds);
}
} finally {
table.readUnlock();
}
}
}
private List<EtlIndex> createEtlIndexes(OlapTable table) throws LoadException {
List<EtlIndex> etlIndexes = Lists.newArrayList();
for (Map.Entry<Long, List<Column>> entry : table.getIndexIdToSchema().entrySet()) {
long indexId = entry.getKey();
MaterializedIndexMeta indexMeta = table.getIndexMetaByIndexId(indexId);
int schemaHash = indexMeta.getSchemaHash();
int schemaVersion = indexMeta.getSchemaVersion();
boolean changeAggType = table.getKeysTypeByIndexId(indexId).equals(KeysType.UNIQUE_KEYS)
&& table.getTableProperty().getEnableUniqueKeyMergeOnWrite();
// columns
List<EtlColumn> etlColumns = Lists.newArrayList();
for (Column column : entry.getValue()) {
etlColumns.add(createEtlColumn(column, changeAggType));
}
// check distribution type
DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
if (distributionInfo.getType() != DistributionInfoType.HASH) {
// RANDOM not supported
String errMsg = "Unsupported distribution type. type: " + distributionInfo.getType().name();
LOG.warn(errMsg);
throw new LoadException(errMsg);
}
// index type
String indexType = null;
KeysType keysType = table.getKeysTypeByIndexId(indexId);
switch (keysType) {
case DUP_KEYS:
indexType = "DUPLICATE";
break;
case AGG_KEYS:
indexType = "AGGREGATE";
break;
case UNIQUE_KEYS:
indexType = "UNIQUE";
break;
default:
String errMsg = "unknown keys type. type: " + keysType.name();
LOG.warn(errMsg);
throw new LoadException(errMsg);
}
// is base index
boolean isBaseIndex = indexId == table.getBaseIndexId() ? true : false;
etlIndexes.add(new EtlIndex(indexId, etlColumns, schemaHash, indexType, isBaseIndex, schemaVersion));
}
return etlIndexes;
}
private EtlColumn createEtlColumn(Column column, boolean changeAggType) {
// column name
String name = column.getName().toLowerCase(Locale.ROOT);
// column type
PrimitiveType type = column.getDataType();
String columnType = column.getDataType().toString();
// is allow null
boolean isAllowNull = column.isAllowNull();
// is key
boolean isKey = column.isKey();
// aggregation type
String aggregationType = null;
if (column.getAggregationType() != null) {
if (changeAggType && !column.isKey()) {
aggregationType = AggregateType.REPLACE.toSql();
} else {
aggregationType = column.getAggregationType().toString();
}
}
// default value
String defaultValue = null;
if (column.getDefaultValue() != null) {
defaultValue = column.getDefaultValue();
}
if (column.isAllowNull() && column.getDefaultValue() == null) {
defaultValue = "\\N";
}
// string length
int stringLength = 0;
if (type.isStringType()) {
stringLength = column.getStrLen();
}
// decimal precision scale
int precision = 0;
int scale = 0;
if (type.isDecimalV2Type() || type.isDecimalV3Type()) {
precision = column.getPrecision();
scale = column.getScale();
}
return new EtlColumn(name, columnType, isAllowNull, isKey, aggregationType, defaultValue,
stringLength, precision, scale);
}
private EtlPartitionInfo createEtlPartitionInfo(OlapTable table, Set<Long> partitionIds) throws LoadException {
PartitionType type = table.getPartitionInfo().getType();
List<String> partitionColumnRefs = Lists.newArrayList();
List<EtlPartition> etlPartitions = Lists.newArrayList();
if (type == PartitionType.RANGE) {
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) table.getPartitionInfo();
for (Column column : rangePartitionInfo.getPartitionColumns()) {
partitionColumnRefs.add(column.getName());
}
for (Map.Entry<Long, PartitionItem> entry : rangePartitionInfo.getAllPartitionItemEntryList(true)) {
long partitionId = entry.getKey();
if (!partitionIds.contains(partitionId)) {
continue;
}
Partition partition = table.getPartition(partitionId);
if (partition == null) {
throw new LoadException("partition does not exist. id: " + partitionId);
}
// bucket num
int bucketNum = partition.getDistributionInfo().getBucketNum();
// is max partition
Range<PartitionKey> range = entry.getValue().getItems();
boolean isMaxPartition = range.upperEndpoint().isMaxValue();
// start keys
List<LiteralExpr> rangeKeyExprs = range.lowerEndpoint().getKeys();
List<Object> startKeys = Lists.newArrayList();
for (int i = 0; i < rangeKeyExprs.size(); ++i) {
LiteralExpr literalExpr = rangeKeyExprs.get(i);
Object keyValue = literalExpr.getRealValue();
startKeys.add(keyValue);
}
// end keys
// is empty list when max partition
List<Object> endKeys = Lists.newArrayList();
if (!isMaxPartition) {
rangeKeyExprs = range.upperEndpoint().getKeys();
for (int i = 0; i < rangeKeyExprs.size(); ++i) {
LiteralExpr literalExpr = rangeKeyExprs.get(i);
Object keyValue = literalExpr.getRealValue();
endKeys.add(keyValue);
}
}
etlPartitions.add(new EtlPartition(partitionId, startKeys, endKeys, isMaxPartition, bucketNum));
}
} else if (type == PartitionType.UNPARTITIONED) {
Preconditions.checkState(partitionIds.size() == 1);
for (Long partitionId : partitionIds) {
Partition partition = table.getPartition(partitionId);
if (partition == null) {
throw new LoadException("partition does not exist. id: " + partitionId);
}
// bucket num
int bucketNum = partition.getDistributionInfo().getBucketNum();
etlPartitions.add(new EtlPartition(partitionId, Lists.newArrayList(), Lists.newArrayList(),
true, bucketNum));
}
} else {
throw new LoadException("Spark Load does not support list partition yet");
}
// distribution column refs
List<String> distributionColumnRefs = Lists.newArrayList();
DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
Preconditions.checkState(distributionInfo.getType() == DistributionInfoType.HASH);
for (Column column : ((HashDistributionInfo) distributionInfo).getDistributionColumns()) {
distributionColumnRefs.add(column.getName());
}
return new EtlPartitionInfo(type.typeString, partitionColumnRefs, distributionColumnRefs, etlPartitions);
}
private EtlFileGroup createEtlFileGroup(BrokerFileGroup fileGroup, Set<Long> tablePartitionIds,
Database db, OlapTable table) throws LoadException {
List<ImportColumnDesc> copiedColumnExprList = Lists.newArrayList(fileGroup.getColumnExprList());
Map<String, Expr> exprByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (ImportColumnDesc columnDesc : copiedColumnExprList) {
if (!columnDesc.isColumn()) {
exprByName.put(columnDesc.getColumnName(), columnDesc.getExpr());
}
}
// check columns
try {
Load.initColumns(table, copiedColumnExprList, fileGroup.getColumnToHadoopFunction());
} catch (UserException e) {
throw new LoadException(e.getMessage());
}
// add shadow column mapping when schema change
for (ImportColumnDesc columnDesc : Load.getSchemaChangeShadowColumnDesc(table, exprByName)) {
copiedColumnExprList.add(columnDesc);
exprByName.put(columnDesc.getColumnName(), columnDesc.getExpr());
}
// check negative for sum aggregate type
if (fileGroup.isNegative()) {
for (Column column : table.getBaseSchema()) {
if (!column.isKey() && column.getAggregationType() != AggregateType.SUM) {
throw new LoadException("Column is not SUM AggreateType. column:" + column.getName());
}
}
}
// fill file field names if empty
List<String> fileFieldNames = fileGroup.getFileFieldNames();
if (fileFieldNames == null || fileFieldNames.isEmpty()) {
fileFieldNames = Lists.newArrayList();
for (Column column : table.getBaseSchema()) {
fileFieldNames.add(column.getName());
}
}
// column mappings
Map<String, Pair<String, List<String>>> columnToHadoopFunction = fileGroup.getColumnToHadoopFunction();
Map<String, EtlColumnMapping> columnMappings = Maps.newHashMap();
if (columnToHadoopFunction != null) {
for (Map.Entry<String, Pair<String, List<String>>> entry : columnToHadoopFunction.entrySet()) {
columnMappings.put(entry.getKey(),
new EtlColumnMapping(entry.getValue().first, entry.getValue().second));
}
}
for (ImportColumnDesc columnDesc : copiedColumnExprList) {
if (columnDesc.isColumn() || columnMappings.containsKey(columnDesc.getColumnName())) {
continue;
}
// the left must be column expr
columnMappings.put(columnDesc.getColumnName(), new EtlColumnMapping(columnDesc.getExpr().toSql()));
}
// partition ids
List<Long> partitionIds = fileGroup.getPartitionIds();
if (partitionIds == null || partitionIds.isEmpty()) {
partitionIds = Lists.newArrayList(tablePartitionIds);
}
// where
// TODO: check
String where = "";
if (fileGroup.getWhereExpr() != null) {
where = fileGroup.getWhereExpr().toSql();
}
// load from table
String hiveDbTableName = "";
Map<String, String> hiveTableProperties = Maps.newHashMap();
if (fileGroup.isLoadFromTable()) {
long srcTableId = fileGroup.getSrcTableId();
HiveTable srcHiveTable = (HiveTable) db.getTableOrException(
srcTableId, s -> new LoadException("table does not exist. id: " + s));
hiveDbTableName = srcHiveTable.getHiveDbTable();
hiveTableProperties.putAll(srcHiveTable.getHiveProperties());
}
// check hll and bitmap func
// TODO: more check
for (Column column : table.getBaseSchema()) {
String columnName = column.getName();
PrimitiveType columnType = column.getDataType();
Expr expr = exprByName.get(columnName);
if (columnType == PrimitiveType.HLL) {
checkHllMapping(columnName, expr);
}
if (columnType == PrimitiveType.BITMAP) {
checkBitmapMapping(columnName, expr, fileGroup.isLoadFromTable());
}
}
EtlFileGroup etlFileGroup = null;
FileFormatProperties fileFormatProperties = fileGroup.getFileFormatProperties();
if (fileGroup.isLoadFromTable()) {
etlFileGroup = new EtlFileGroup(SourceType.HIVE, hiveDbTableName, hiveTableProperties,
fileGroup.isNegative(), columnMappings, where, partitionIds);
} else {
String columnSeparator = CsvFileFormatProperties.DEFAULT_COLUMN_SEPARATOR;
String lineDelimiter = CsvFileFormatProperties.DEFAULT_LINE_DELIMITER;
if (fileFormatProperties instanceof CsvFileFormatProperties) {
columnSeparator = ((CsvFileFormatProperties) fileFormatProperties).getColumnSeparator();
lineDelimiter = ((CsvFileFormatProperties) fileFormatProperties).getLineDelimiter();
}
etlFileGroup = new EtlFileGroup(SourceType.FILE, fileGroup.getFilePaths(), fileFieldNames,
fileGroup.getColumnNamesFromPath(), columnSeparator, lineDelimiter,
fileGroup.isNegative(), fileFormatProperties.getFormatName(),
columnMappings, where, partitionIds);
}
return etlFileGroup;
}
private void checkHllMapping(String columnName, Expr expr) throws LoadException {
if (expr == null) {
throw new LoadException("HLL column func is not assigned. column:" + columnName);
}
String msg = "HLL column must use hll function, like " + columnName + "=hll_hash(xxx) or "
+ columnName + "=hll_empty()";
if (!(expr instanceof FunctionCallExpr)) {
throw new LoadException(msg);
}
FunctionCallExpr fn = (FunctionCallExpr) expr;
String functionName = fn.getFnName().getFunction();
if (!functionName.equalsIgnoreCase("hll_hash")
&& !functionName.equalsIgnoreCase("hll_empty")) {
throw new LoadException(msg);
}
}
private void checkBitmapMapping(String columnName, Expr expr, boolean isLoadFromTable) throws LoadException {
if (expr == null) {
throw new LoadException("BITMAP column func is not assigned. column:" + columnName);
}
String msg = "BITMAP column must use bitmap function, like " + columnName + "=to_bitmap(xxx) or "
+ columnName + "=bitmap_hash() or " + columnName + "=bitmap_dict()";
if (!(expr instanceof FunctionCallExpr)) {
throw new LoadException(msg);
}
FunctionCallExpr fn = (FunctionCallExpr) expr;
String functionName = fn.getFnName().getFunction();
if (!functionName.equalsIgnoreCase("to_bitmap")
&& !functionName.equalsIgnoreCase("bitmap_hash")
&& !functionName.equalsIgnoreCase("bitmap_dict")
&& !functionName.equalsIgnoreCase("binary_bitmap")) {
throw new LoadException(msg);
}
if (functionName.equalsIgnoreCase("bitmap_dict") && !isLoadFromTable) {
throw new LoadException("Bitmap global dict should load data from hive table");
}
}
}