EtlJobConfig.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.sparkdpp;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.ExclusionStrategy;
import com.google.gson.FieldAttributes;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
/** jobconfig.json file format
{
"tables": {
10014: {
"indexes": [{
"indexId": 10014,
"columns": [{
"columnName": "k1",
"columnType": "SMALLINT",
"isKey": true,
"isAllowNull": true,
"aggregationType": "NONE"
}, {
"columnName": "k2",
"columnType": "VARCHAR",
"stringLength": 20,
"isKey": true,
"isAllowNull": true,
"aggregationType": "NONE"
}, {
"columnName": "v",
"columnType": "BIGINT",
"isKey": false,
"isAllowNull": false,
"aggregationType": "NONE"
}],
"schemaHash": 1294206574,
"indexType": "DUPLICATE",
"isBaseIndex": true
}, {
"indexId": 10017,
"columns": [{
"columnName": "k1",
"columnType": "SMALLINT",
"isKey": true,
"isAllowNull": true,
"aggregationType": "NONE"
}, {
"columnName": "v",
"columnType": "BIGINT",
"isKey": false,
"isAllowNull": false,
"aggregationType": "BITMAP_UNION",
"defineExpr": "to_bitmap(v)"
}],
"schemaHash": 1294206575,
"indexType": "AGGREGATE",
"isBaseIndex": false
}],
"partitionInfo": {
"partitionType": "RANGE",
"partitionColumnRefs": ["k1"],
"distributionColumnRefs": ["k2"],
"partitions": [{
"partitionId": 10020,
"startKeys": [-100],
"endKeys": [10],
"isMaxPartition": false,
"bucketNum": 3
}, {
"partitionId": 10021,
"startKeys": [10],
"endKeys": [100],
"isMaxPartition": false,
"bucketNum": 3
}]
},
"fileGroups": [{
"partitions": [10020],
"filePaths": ["hdfs://hdfs_host:port/user/palo/test/file"],
"fileFieldNames": ["tmp_k1", "k2"],
"valueSeparator": ",",
"lineDelimiter": "\n",
"columnMappings": {
"k1": {
"functionName": "strftime",
"args": ["%Y-%m-%d %H:%M:%S", "tmp_k1"]
}
},
"where": "k2 > 10",
"isNegative": false,
"hiveDbTableName": "hive_db.table",
"hiveTableProperties": {
"hive.metastore.uris": "thrift://host:port"
}
}]
}
},
"outputPath": "hdfs://hdfs_host:port/user/output/10003/label1/1582599203397",
"outputFilePattern": "V1.label1.%d.%d.%d.%d.%d.parquet",
"label": "label0",
"properties": {
"strictMode": false,
"timezone": "Asia/Shanghai"
},
"version": "V1"
}
*/
public class EtlJobConfig implements Serializable {
// global dict
public static final String GLOBAL_DICT_TABLE_NAME = "doris_global_dict_table_%d";
public static final String DISTINCT_KEY_TABLE_NAME = "doris_distinct_key_table_%d_%s";
public static final String DORIS_INTERMEDIATE_HIVE_TABLE_NAME = "doris_intermediate_hive_table_%d_%s";
// hdfsEtlPath/jobs/dbId/loadLabel/PendingTaskSignature
private static final String ETL_OUTPUT_PATH_FORMAT = "%s/jobs/%d/%s/%d";
private static final String ETL_OUTPUT_FILE_NAME_DESC_V1
= "version.label.tableId.partitionId.indexId.bucket.schemaHash.parquet";
// tableId.partitionId.indexId.bucket.schemaHash
public static final String TABLET_META_FORMAT = "%d.%d.%d.%d.%d";
public static final String ETL_OUTPUT_FILE_FORMAT = "parquet";
// dpp result
public static final String DPP_RESULT_NAME = "dpp_result.json";
@SerializedName(value = "tables")
public Map<Long, EtlTable> tables;
@SerializedName(value = "outputPath")
public String outputPath;
@SerializedName(value = "outputFilePattern")
public String outputFilePattern;
@SerializedName(value = "label")
public String label;
@SerializedName(value = "properties")
public EtlJobProperty properties;
@SerializedName(value = "configVersion")
public ConfigVersion configVersion;
public EtlJobConfig(Map<Long, EtlTable> tables, String outputFilePattern, String label, EtlJobProperty properties) {
this.tables = tables;
// set outputPath when submit etl job
this.outputPath = null;
this.outputFilePattern = outputFilePattern;
this.label = label;
this.properties = properties;
this.configVersion = ConfigVersion.V1;
}
@Override
public String toString() {
return "EtlJobConfig{"
+ "tables=" + tables
+ ", outputPath='" + outputPath + '\''
+ ", outputFilePattern='" + outputFilePattern + '\''
+ ", label='" + label + '\''
+ ", properties=" + properties
+ ", version=" + configVersion
+ '}';
}
public String getOutputPath() {
return outputPath;
}
public static String getOutputPath(String hdfsEtlPath, long dbId, String loadLabel, long taskSignature) {
return String.format(ETL_OUTPUT_PATH_FORMAT, hdfsEtlPath, dbId, loadLabel, taskSignature);
}
public static String getOutputFilePattern(String loadLabel, FilePatternVersion filePatternVersion) {
return String.format("%s.%s.%s.%s", filePatternVersion.name(), loadLabel,
TABLET_META_FORMAT, ETL_OUTPUT_FILE_FORMAT);
}
public static String getDppResultFilePath(String outputPath) {
return outputPath + "/" + DPP_RESULT_NAME;
}
public static String getTabletMetaStr(String filePath) throws Exception {
String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
String[] fileNameArr = fileName.split("\\.");
// check file version
switch (FilePatternVersion.valueOf(fileNameArr[0])) {
case V1:
// version.label.tableId.partitionId.indexId.bucket.schemaHash.parquet
if (fileNameArr.length != ETL_OUTPUT_FILE_NAME_DESC_V1.split("\\.").length) {
throw new Exception("etl output file name error, format: " + ETL_OUTPUT_FILE_NAME_DESC_V1
+ ", name: " + fileName);
}
long tableId = Long.parseLong(fileNameArr[2]);
long partitionId = Long.parseLong(fileNameArr[3]);
long indexId = Long.parseLong(fileNameArr[4]);
int bucket = Integer.parseInt(fileNameArr[5]);
int schemaHash = Integer.parseInt(fileNameArr[6]);
// tableId.partitionId.indexId.bucket.schemaHash
return String.format(TABLET_META_FORMAT, tableId, partitionId, indexId, bucket, schemaHash);
default:
throw new Exception("etl output file version error. version: " + fileNameArr[0]);
}
}
public String configToJson() {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.addDeserializationExclusionStrategy(new HiddenAnnotationExclusionStrategy());
Gson gson = gsonBuilder.create();
return gson.toJson(this);
}
public static EtlJobConfig configFromJson(String jsonConfig) {
GsonBuilder gsonBuilder = new GsonBuilder();
Gson gson = gsonBuilder.create();
return gson.fromJson(jsonConfig, EtlJobConfig.class);
}
public static class EtlJobProperty implements Serializable {
@SerializedName(value = "strictMode")
public boolean strictMode;
@SerializedName(value = "timezone")
public String timezone;
@Override
public String toString() {
return "EtlJobProperty{"
+ "strictMode=" + strictMode
+ ", timezone='" + timezone + '\''
+ '}';
}
}
public enum ConfigVersion {
V1
}
public enum FilePatternVersion {
V1
}
public enum SourceType {
FILE,
HIVE
}
public static class EtlTable implements Serializable {
@SerializedName(value = "indexes")
public List<EtlIndex> indexes;
@SerializedName(value = "partitionInfo")
public EtlPartitionInfo partitionInfo;
@SerializedName(value = "fileGroups")
public List<EtlFileGroup> fileGroups;
public EtlTable(List<EtlIndex> etlIndexes, EtlPartitionInfo etlPartitionInfo) {
this.indexes = etlIndexes;
this.partitionInfo = etlPartitionInfo;
this.fileGroups = Lists.newArrayList();
}
public void addFileGroup(EtlFileGroup etlFileGroup) {
fileGroups.add(etlFileGroup);
}
@Override
public String toString() {
return "EtlTable{"
+ "indexes=" + indexes
+ ", partitionInfo=" + partitionInfo
+ ", fileGroups=" + fileGroups
+ '}';
}
}
public static class EtlColumn implements Serializable {
@SerializedName(value = "columnName")
public String columnName;
@SerializedName(value = "columnType")
public String columnType;
@SerializedName(value = "isAllowNull")
public boolean isAllowNull;
@SerializedName(value = "isKey")
public boolean isKey;
@SerializedName(value = "aggregationType")
public String aggregationType;
@SerializedName(value = "defaultValue")
public String defaultValue;
@SerializedName(value = "stringLength")
public int stringLength;
@SerializedName(value = "precision")
public int precision;
@SerializedName(value = "scale")
public int scale;
@SerializedName(value = "defineExpr")
public String defineExpr;
// for unit test
public EtlColumn() { }
public EtlColumn(String columnName, String columnType, boolean isAllowNull, boolean isKey,
String aggregationType, String defaultValue, int stringLength, int precision, int scale) {
this.columnName = columnName;
this.columnType = columnType;
this.isAllowNull = isAllowNull;
this.isKey = isKey;
this.aggregationType = aggregationType;
this.defaultValue = defaultValue;
this.stringLength = stringLength;
this.precision = precision;
this.scale = scale;
this.defineExpr = null;
}
@Override
public String toString() {
return "EtlColumn{"
+ "columnName='" + columnName + '\''
+ ", columnType='" + columnType + '\''
+ ", isAllowNull=" + isAllowNull
+ ", isKey=" + isKey
+ ", aggregationType='" + aggregationType + '\''
+ ", defaultValue='" + defaultValue + '\''
+ ", stringLength=" + stringLength
+ ", precision=" + precision
+ ", scale=" + scale
+ ", defineExpr='" + defineExpr + '\''
+ '}';
}
}
public static class EtlIndexComparator implements Comparator<EtlIndex> {
@Override
public int compare(EtlIndex a, EtlIndex b) {
int diff = a.columns.size() - b.columns.size();
if (diff == 0) {
return 0;
} else if (diff > 0) {
return 1;
} else {
return -1;
}
}
}
public static class EtlIndex implements Serializable {
@SerializedName(value = "indexId")
public long indexId;
@SerializedName(value = "columns")
public List<EtlColumn> columns;
@SerializedName(value = "schemaHash")
public int schemaHash;
@SerializedName(value = "indexType")
public String indexType;
@SerializedName(value = "isBaseIndex")
public boolean isBaseIndex;
@SerializedName(value = "schemaVersion")
public int schemaVersion;
public EtlIndex(long indexId, List<EtlColumn> etlColumns, int schemaHash,
String indexType, boolean isBaseIndex, int schemaVersion) {
this.indexId = indexId;
this.columns = etlColumns;
this.schemaHash = schemaHash;
this.indexType = indexType;
this.isBaseIndex = isBaseIndex;
this.schemaVersion = schemaVersion;
}
public EtlColumn getColumn(String name) {
for (EtlColumn column : columns) {
if (column.columnName.equals(name)) {
return column;
}
}
return null;
}
@Override
public String toString() {
return "EtlIndex{"
+ "indexId=" + indexId
+ ", columns=" + columns
+ ", schemaHash=" + schemaHash
+ ", indexType='" + indexType + '\''
+ ", isBaseIndex=" + isBaseIndex
+ ", schemaVersion=" + schemaVersion
+ '}';
}
}
public static class EtlPartitionInfo implements Serializable {
@SerializedName(value = "partitionType")
public String partitionType;
@SerializedName(value = "partitionColumnRefs")
public List<String> partitionColumnRefs;
@SerializedName(value = "distributionColumnRefs")
public List<String> distributionColumnRefs;
@SerializedName(value = "partitions")
public List<EtlPartition> partitions;
public EtlPartitionInfo(String partitionType, List<String> partitionColumnRefs,
List<String> distributionColumnRefs, List<EtlPartition> etlPartitions) {
this.partitionType = partitionType;
this.partitionColumnRefs = partitionColumnRefs;
this.distributionColumnRefs = distributionColumnRefs;
this.partitions = etlPartitions;
}
@Override
public String toString() {
return "EtlPartitionInfo{"
+ "partitionType='" + partitionType + '\''
+ ", partitionColumnRefs=" + partitionColumnRefs
+ ", distributionColumnRefs=" + distributionColumnRefs
+ ", partitions=" + partitions
+ '}';
}
}
public static class EtlPartition implements Serializable {
@SerializedName(value = "partitionId")
public long partitionId;
@SerializedName(value = "startKeys")
public List<Object> startKeys;
@SerializedName(value = "endKeys")
public List<Object> endKeys;
@SerializedName(value = "isMaxPartition")
public boolean isMaxPartition;
@SerializedName(value = "bucketNum")
public int bucketNum;
public EtlPartition(long partitionId, List<Object> startKeys, List<Object> endKeys,
boolean isMaxPartition, int bucketNum) {
this.partitionId = partitionId;
this.startKeys = startKeys;
this.endKeys = endKeys;
this.isMaxPartition = isMaxPartition;
this.bucketNum = bucketNum;
}
@Override
public String toString() {
return "EtlPartition{"
+ "partitionId=" + partitionId
+ ", startKeys=" + startKeys
+ ", endKeys=" + endKeys
+ ", isMaxPartition=" + isMaxPartition
+ ", bucketNum=" + bucketNum
+ '}';
}
}
public static class EtlFileGroup implements Serializable {
@SerializedName(value = "sourceType")
public SourceType sourceType = SourceType.FILE;
@SerializedName(value = "filePaths")
public List<String> filePaths;
@SerializedName(value = "fileFieldNames")
public List<String> fileFieldNames;
@SerializedName(value = "columnsFromPath")
public List<String> columnsFromPath;
@SerializedName(value = "columnSeparator")
public String columnSeparator;
@SerializedName(value = "lineDelimiter")
public String lineDelimiter;
@SerializedName(value = "isNegative")
public boolean isNegative;
@SerializedName(value = "fileFormat")
public String fileFormat;
@SerializedName(value = "columnMappings")
public Map<String, EtlColumnMapping> columnMappings;
@SerializedName(value = "where")
public String where;
@SerializedName(value = "partitions")
public List<Long> partitions;
@SerializedName(value = "hiveDbTableName")
public String hiveDbTableName;
@SerializedName(value = "hiveTableProperties")
public Map<String, String> hiveTableProperties;
// hive db table used in dpp, not serialized
// set with hiveDbTableName (no bitmap column) or IntermediateHiveTable (created by global dict builder)
// in spark etl job
public String dppHiveDbTableName;
// for data infile path
public EtlFileGroup(SourceType sourceType, List<String> filePaths, List<String> fileFieldNames,
List<String> columnsFromPath, String columnSeparator, String lineDelimiter,
boolean isNegative, String fileFormat, Map<String, EtlColumnMapping> columnMappings,
String where, List<Long> partitions) {
this.sourceType = sourceType;
this.filePaths = filePaths;
this.fileFieldNames = fileFieldNames;
this.columnsFromPath = columnsFromPath;
this.columnSeparator = Strings.isNullOrEmpty(columnSeparator) ? "\t" : columnSeparator;
this.lineDelimiter = lineDelimiter;
this.isNegative = isNegative;
this.fileFormat = fileFormat;
this.columnMappings = columnMappings;
this.where = where;
this.partitions = partitions;
}
// for data from table
public EtlFileGroup(SourceType sourceType, String hiveDbTableName, Map<String, String> hiveTableProperties,
boolean isNegative, Map<String, EtlColumnMapping> columnMappings,
String where, List<Long> partitions) {
this.sourceType = sourceType;
this.hiveDbTableName = hiveDbTableName;
this.hiveTableProperties = hiveTableProperties;
this.isNegative = isNegative;
this.columnMappings = columnMappings;
this.where = where;
this.partitions = partitions;
}
@Override
public String toString() {
return "EtlFileGroup{"
+ "sourceType=" + sourceType
+ ", filePaths=" + filePaths
+ ", fileFieldNames=" + fileFieldNames
+ ", columnsFromPath=" + columnsFromPath
+ ", columnSeparator='" + columnSeparator + '\''
+ ", lineDelimiter='" + lineDelimiter + '\''
+ ", isNegative=" + isNegative
+ ", fileFormat='" + fileFormat + '\''
+ ", columnMappings=" + columnMappings
+ ", where='" + where + '\''
+ ", partitions=" + partitions
+ ", hiveDbTableName='" + hiveDbTableName + '\''
+ ", hiveTableProperties=" + hiveTableProperties
+ '}';
}
}
/**
* FunctionCallExpr = functionName(args)
* For compatibility with old designed functions used in Hadoop MapReduce etl
*
* expr is more general, like k1 + 1, not just FunctionCall
*/
public static class EtlColumnMapping implements Serializable {
@SerializedName(value = "functionName")
public String functionName;
@SerializedName(value = "args")
public List<String> args;
@SerializedName(value = "expr")
public String expr;
private static Map<String, String> functionMap = new ImmutableMap.Builder<String, String>()
.put("md5sum", "md5").build();
public EtlColumnMapping(String functionName, List<String> args) {
this.functionName = functionName;
this.args = args;
}
public EtlColumnMapping(String expr) {
this.expr = expr;
}
public String toDescription() {
StringBuilder sb = new StringBuilder();
if (functionName == null) {
sb.append(expr);
} else {
if (functionMap.containsKey(functionName)) {
sb.append(functionMap.get(functionName));
} else {
sb.append(functionName);
}
sb.append("(");
if (args != null) {
for (String arg : args) {
sb.append(arg);
sb.append(",");
}
}
sb.deleteCharAt(sb.length() - 1);
sb.append(")");
}
return sb.toString();
}
@Override
public String toString() {
return "EtlColumnMapping{"
+ "functionName='" + functionName + '\''
+ ", args=" + args
+ ", expr=" + expr
+ '}';
}
}
public static class HiddenAnnotationExclusionStrategy implements ExclusionStrategy {
public boolean shouldSkipField(FieldAttributes f) {
return f.getAnnotation(SerializedName.class) == null;
}
@Override
public boolean shouldSkipClass(Class<?> clazz) {
return false;
}
}
}