Load.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.FunctionName;
import org.apache.doris.analysis.FunctionParams;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.IsNullPredicate;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.GeneratedColumnInfo;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.ExprUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.LoadJob.JobState;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TUniqueKeyUpdateMode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Load {
private static final Logger LOG = LogManager.getLogger(Load.class);
public static final String VERSION = "v1";
// valid state change map
private static final Map<JobState, Set<JobState>> STATE_CHANGE_MAP = Maps.newHashMap();
// system dpp config
public static DppConfig dppDefaultConfig = null;
public static Map<String, DppConfig> clusterToDppConfig = Maps.newHashMap();
// load job meta
private Map<Long, LoadJob> idToLoadJob; // loadJobId to loadJob
private Map<Long, List<LoadJob>> dbToLoadJobs; // db to loadJob list
private Map<Long, Map<String, List<LoadJob>>> dbLabelToLoadJobs; // db label to loadJob list
private Map<Long, LoadJob> idToPendingLoadJob; // loadJobId to pending loadJob
private Map<Long, LoadJob> idToEtlLoadJob; // loadJobId to etl loadJob
private Map<Long, LoadJob> idToLoadingLoadJob; // loadJobId to loading loadJob
private Map<Long, LoadJob> idToQuorumFinishedLoadJob; // loadJobId to quorum finished loadJob
private Set<Long> loadingPartitionIds; // loading partition id set
// dbId -> set of (label, timestamp)
private Map<Long, Map<String, Long>> dbToMiniLabels; // db to mini uncommitted label
// lock for load job
// lock is private and must use after db lock
private ReentrantReadWriteLock lock;
static {
Set<JobState> pendingDestStates = Sets.newHashSet();
pendingDestStates.add(JobState.ETL);
pendingDestStates.add(JobState.CANCELLED);
STATE_CHANGE_MAP.put(JobState.PENDING, pendingDestStates);
Set<JobState> etlDestStates = Sets.newHashSet();
etlDestStates.add(JobState.LOADING);
etlDestStates.add(JobState.CANCELLED);
STATE_CHANGE_MAP.put(JobState.ETL, etlDestStates);
Set<JobState> loadingDestStates = Sets.newHashSet();
loadingDestStates.add(JobState.FINISHED);
loadingDestStates.add(JobState.QUORUM_FINISHED);
loadingDestStates.add(JobState.CANCELLED);
STATE_CHANGE_MAP.put(JobState.LOADING, loadingDestStates);
Set<JobState> quorumFinishedDestStates = Sets.newHashSet();
quorumFinishedDestStates.add(JobState.FINISHED);
STATE_CHANGE_MAP.put(JobState.QUORUM_FINISHED, quorumFinishedDestStates);
// system dpp config
Gson gson = new Gson();
try {
Map<String, String> defaultConfig =
(HashMap<String, String>) gson.fromJson(Config.dpp_default_config_str, HashMap.class);
dppDefaultConfig = DppConfig.create(defaultConfig);
Map<String, Map<String, String>> clusterToConfig =
(HashMap<String, Map<String, String>>) gson.fromJson(Config.dpp_config_str, HashMap.class);
for (Entry<String, Map<String, String>> entry : clusterToConfig.entrySet()) {
String cluster = entry.getKey();
DppConfig dppConfig = dppDefaultConfig.getCopiedDppConfig();
dppConfig.update(DppConfig.create(entry.getValue()));
dppConfig.check();
clusterToDppConfig.put(cluster, dppConfig);
}
if (!clusterToDppConfig.containsKey(Config.dpp_default_cluster)) {
throw new LoadException("Default cluster not exist");
}
} catch (Throwable e) {
LOG.error("dpp default config ill-formed", e);
System.exit(-1);
}
}
public Load() {
idToLoadJob = Maps.newHashMap();
dbToLoadJobs = Maps.newHashMap();
dbLabelToLoadJobs = Maps.newHashMap();
idToPendingLoadJob = Maps.newLinkedHashMap();
idToEtlLoadJob = Maps.newLinkedHashMap();
idToLoadingLoadJob = Maps.newLinkedHashMap();
idToQuorumFinishedLoadJob = Maps.newLinkedHashMap();
loadingPartitionIds = Sets.newHashSet();
dbToMiniLabels = Maps.newHashMap();
lock = new ReentrantReadWriteLock(true);
}
public void readLock() {
lock.readLock().lock();
}
public void readUnlock() {
lock.readLock().unlock();
}
private void writeLock() {
lock.writeLock().lock();
}
private void writeUnlock() {
lock.writeLock().unlock();
}
/**
* When doing schema change, there may have some 'shadow' columns, with prefix '__doris_shadow_' in
* their names. These columns are invisible to user, but we need to generate data for these columns.
* So we add column mappings for these column.
* eg1:
* base schema is (A, B, C), and B is under schema change, so there will be a shadow column: '__doris_shadow_B'
* So the final column mapping should looks like: (A, B, C, __doris_shadow_B = substitute(B));
*/
public static List<ImportColumnDesc> getSchemaChangeShadowColumnDesc(Table tbl, Map<String, Expr> columnExprMap) {
List<ImportColumnDesc> shadowColumnDescs = Lists.newArrayList();
for (Column column : tbl.getFullSchema()) {
if (!column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
continue;
}
String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX);
if (columnExprMap.containsKey(originCol)) {
Expr mappingExpr = columnExprMap.get(originCol);
if (mappingExpr != null) {
/*
* eg:
* (A, C) SET (B = func(xx))
* ->
* (A, C) SET (B = func(xx), __doris_shadow_B = func(xx))
*/
ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), mappingExpr);
shadowColumnDescs.add(importColumnDesc);
} else {
/*
* eg:
* (A, B, C)
* ->
* (A, B, C) SET (__doris_shadow_B = B)
*/
SlotRef slot = new SlotRef(null, originCol);
slot.setType(column.getType());
ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), slot);
shadowColumnDescs.add(importColumnDesc);
}
} else {
/*
* There is a case that if user does not specify the related origin column, eg:
* COLUMNS (A, C), and B is not specified, but B is being modified
* so there is a shadow column '__doris_shadow_B'.
* We can not just add a mapping function "__doris_shadow_B = substitute(B)",
* because Doris can not find column B.
* In this case, __doris_shadow_B can use its default value, so no need to add it to column mapping
*/
// do nothing
}
}
return shadowColumnDescs;
}
/*
* used for spark load job
* not init slot desc and analyze exprs
*/
public static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
Map<String, Pair<String, List<String>>> columnToHadoopFunction) throws UserException {
initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, null, null, false,
TUniqueKeyUpdateMode.UPSERT);
}
/*
* This function should be used for broker load v2 and stream load.
* And it must be called in same db lock when planing.
*/
public static void initColumns(Table tbl, LoadTaskInfo.ImportColumnDescs columnDescs,
Map<String, Pair<String, List<String>>> columnToHadoopFunction, Map<String, Expr> exprsByName,
Analyzer analyzer, TupleDescriptor srcTupleDesc, Map<String, SlotDescriptor> slotDescByName,
List<Integer> srcSlotIds, TFileFormatType formatType, List<String> hiddenColumns,
TUniqueKeyUpdateMode uniquekeyUpdateMode)
throws UserException {
rewriteColumns(columnDescs);
initColumns(tbl, columnDescs.descs, columnToHadoopFunction, exprsByName, analyzer, srcTupleDesc, slotDescByName,
srcSlotIds, formatType, hiddenColumns, true, uniquekeyUpdateMode);
}
/*
* This function will do followings:
* 1. fill the column exprs if user does not specify any column or column mapping.
* 2. For not specified columns, check if they have default value or they are auto-increment columns.
* 3. Add any shadow columns if have.
* 4. validate hadoop functions
* 5. init slot descs and expr map for load plan
*/
private static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
Map<String, Pair<String, List<String>>> columnToHadoopFunction, Map<String, Expr> exprsByName,
Analyzer analyzer, TupleDescriptor srcTupleDesc, Map<String, SlotDescriptor> slotDescByName,
List<Integer> srcSlotIds, TFileFormatType formatType, List<String> hiddenColumns,
boolean needInitSlotAndAnalyzeExprs, TUniqueKeyUpdateMode uniquekeyUpdateMode) throws UserException {
// We make a copy of the columnExprs so that our subsequent changes
// to the columnExprs will not affect the original columnExprs.
// skip the mapping columns not exist in schema
// eg: the origin column list is:
// (k1, k2, tmpk3 = k1 + k2, k3 = tmpk3)
// after calling rewriteColumns(), it will become
// (k1, k2, tmpk3 = k1 + k2, k3 = k1 + k2)
// so "tmpk3 = k1 + k2" is not needed anymore, we can skip it.
List<ImportColumnDesc> copiedColumnExprs = new ArrayList<>();
for (ImportColumnDesc importColumnDesc : columnExprs) {
String mappingColumnName = importColumnDesc.getColumnName();
if (importColumnDesc.isColumn() || tbl.getColumn(mappingColumnName) != null) {
copiedColumnExprs.add(importColumnDesc);
}
}
// check whether the OlapTable has sequenceCol and skipBitmapCol
boolean hasSequenceCol = false;
boolean hasSequenceMapCol = false;
boolean hasSkipBitmapColumn = false;
if (tbl instanceof OlapTable) {
OlapTable olapTable = (OlapTable) tbl;
hasSequenceCol = olapTable.hasSequenceCol();
hasSequenceMapCol = (olapTable.getSequenceMapCol() != null);
hasSkipBitmapColumn = olapTable.hasSkipBitmapColumn();
}
// If user does not specify the file field names, generate it by using base schema of table.
// So that the following process can be unified
boolean specifyFileFieldNames = copiedColumnExprs.stream().anyMatch(p -> p.isColumn());
if (!specifyFileFieldNames) {
List<Column> columns = tbl.getBaseSchema(false);
for (Column column : columns) {
// columnExprs has sequence column, don't need to generate the sequence column
if (hasSequenceCol && column.isSequenceColumn()) {
continue;
}
ImportColumnDesc columnDesc = null;
if (formatType == TFileFormatType.FORMAT_JSON) {
columnDesc = new ImportColumnDesc(column.getName());
} else {
columnDesc = new ImportColumnDesc(column.getName().toLowerCase());
}
if (LOG.isDebugEnabled()) {
LOG.debug("add base column {} to stream load task", column.getName());
}
copiedColumnExprs.add(columnDesc);
}
if (hasSkipBitmapColumn && uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS) {
Preconditions.checkArgument(!specifyFileFieldNames);
Preconditions.checkArgument(hiddenColumns == null);
if (LOG.isDebugEnabled()) {
LOG.debug("add hidden column {} to stream load task", Column.DELETE_SIGN);
}
copiedColumnExprs.add(new ImportColumnDesc(Column.DELETE_SIGN));
if (LOG.isDebugEnabled()) {
LOG.debug("add hidden column {} to stream load task", Column.SKIP_BITMAP_COL);
}
// allow to specify __DORIS_SEQUENCE_COL__ if table has sequence type column
if (hasSequenceCol && !hasSequenceMapCol) {
copiedColumnExprs.add(new ImportColumnDesc(Column.SEQUENCE_COL));
if (LOG.isDebugEnabled()) {
LOG.debug("add hidden column {} to stream load task", Column.SEQUENCE_COL);
}
}
copiedColumnExprs.add(new ImportColumnDesc(Column.SKIP_BITMAP_COL));
}
if (hiddenColumns != null) {
for (String columnName : hiddenColumns) {
Column column = tbl.getColumn(columnName);
if (column != null && !column.isVisible()) {
ImportColumnDesc columnDesc = new ImportColumnDesc(column.getName());
if (LOG.isDebugEnabled()) {
LOG.debug("add hidden column {} to stream load task", column.getName());
}
copiedColumnExprs.add(columnDesc);
}
}
}
}
// generate a map for checking easily
Map<String, Expr> columnExprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
columnExprMap.put(importColumnDesc.getColumnName(), importColumnDesc.getExpr());
}
HashMap<String, Type> colToType = new HashMap<>();
// check default value and auto-increment column
for (Column column : tbl.getBaseSchema()) {
String columnName = column.getName();
colToType.put(columnName, column.getType());
if (column.getGeneratedColumnInfo() != null) {
GeneratedColumnInfo info = column.getGeneratedColumnInfo();
exprsByName.put(column.getName(), info.getExpandExprForLoad());
continue;
}
if (columnExprMap.containsKey(columnName)) {
continue;
}
if (column.getDefaultValue() != null) {
exprsByName.put(column.getName(), column.getDefaultValueExpr());
continue;
}
if (column.isAllowNull()) {
exprsByName.put(column.getName(), NullLiteral.create(column.getType()));
continue;
}
if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
continue;
}
if (column.isAutoInc()) {
continue;
}
throw new DdlException("Column has no default value. column: " + columnName);
}
// get shadow column desc when table schema change
copiedColumnExprs.addAll(getSchemaChangeShadowColumnDesc(tbl, columnExprMap));
// validate hadoop functions
if (columnToHadoopFunction != null) {
Map<String, String> columnNameMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
if (importColumnDesc.isColumn()) {
columnNameMap.put(importColumnDesc.getColumnName(), importColumnDesc.getColumnName());
}
}
for (Entry<String, Pair<String, List<String>>> entry : columnToHadoopFunction.entrySet()) {
String mappingColumnName = entry.getKey();
Column mappingColumn = tbl.getColumn(mappingColumnName);
if (mappingColumn == null) {
throw new DdlException("Mapping column is not in table. column: " + mappingColumnName);
}
Pair<String, List<String>> function = entry.getValue();
try {
DataDescription.validateMappingFunction(function.first, function.second, columnNameMap,
mappingColumn, false);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
}
}
if (!needInitSlotAndAnalyzeExprs) {
return;
}
Set<String> exprSrcSlotName = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
if (importColumnDesc.isColumn()) {
continue;
}
List<SlotRef> slots = Lists.newArrayList();
importColumnDesc.getExpr().collect(SlotRef.class, slots);
for (SlotRef slot : slots) {
String slotColumnName = slot.getColumnName();
exprSrcSlotName.add(slotColumnName);
}
}
// init slot desc add expr map, also transform hadoop functions
for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
// make column name case match with real column name
String columnName = importColumnDesc.getColumnName();
Column tblColumn = tbl.getColumn(columnName);
String realColName;
if (tblColumn == null || tblColumn.getName() == null || importColumnDesc.getExpr() == null) {
realColName = columnName;
} else {
realColName = tblColumn.getName();
}
if (importColumnDesc.getExpr() != null) {
Expr expr = transformHadoopFunctionExpr(tbl, realColName, importColumnDesc.getExpr());
exprsByName.put(realColName, expr);
} else {
SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc);
if (formatType == TFileFormatType.FORMAT_ARROW) {
slotDesc.setColumn(new Column(realColName, colToType.get(realColName)));
} else {
if (uniquekeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && hasSkipBitmapColumn) {
// we store the unique ids of missing columns in skip bitmap column in flexible partial update
int colUniqueId = tblColumn.getUniqueId();
Column slotColumn = null;
if (realColName.equals(Column.SKIP_BITMAP_COL)) {
// don't change the skip_bitmap_col's type to varchar becasue we will fill this column
// in NewJsonReader manually rather than reading them from files as varchar type and then
// converting them to their real type
slotColumn = new Column(realColName, PrimitiveType.BITMAP);
} else {
// columns default be varchar type
slotColumn = new Column(realColName, PrimitiveType.VARCHAR);
}
// In flexible partial update, every row can update different columns, we should check
// key columns intergrity for every row in XXXReader on BE rather than checking it on FE
// directly for all rows like in fixed columns partial update. So we should set if a slot
// is key column here
slotColumn.setIsKey(tblColumn.isKey());
slotColumn.setUniqueId(colUniqueId);
slotDesc.setAutoInc(tblColumn.isAutoInc());
slotDesc.setColumn(slotColumn);
} else {
// columns default be varchar type
slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
slotDesc.setColumn(new Column(realColName, PrimitiveType.VARCHAR));
}
}
// ISSUE A: src slot should be nullable even if the column is not nullable.
// because src slot is what we read from file, not represent to real column value.
// If column is not nullable, error will be thrown when filling the dest slot,
// which is not nullable.
slotDesc.setIsNullable(true);
slotDesc.setIsMaterialized(true);
srcSlotIds.add(slotDesc.getId().asInt());
slotDescByName.put(realColName, slotDesc);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("plan srcTupleDesc {}", srcTupleDesc.toString());
}
/*
* The extension column of the materialized view is added to the expression evaluation of load
* To avoid nested expressions. eg : column(a, tmp_c, c = expr(tmp_c)) ,
* __doris_materialized_view_bitmap_union_c need be analyzed after exprsByName
* So the columns of the materialized view are stored separately here
*/
Map<String, Expr> mvDefineExpr = Maps.newHashMap();
for (Column column : tbl.getFullSchema()) {
if (column.getDefineExpr() != null) {
if (column.getDefineExpr().getType().isInvalid()) {
column.getDefineExpr().setType(column.getType());
}
mvDefineExpr.put(column.getName(), column.getDefineExpr());
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("slotDescByName: {}, exprsByName: {}, mvDefineExpr: {}",
slotDescByName, exprsByName, mvDefineExpr);
}
// in vectorized load, reanalyze exprs with castExpr type
// otherwise analyze exprs with varchar type
analyzeAllExprs(tbl, analyzer, exprsByName, mvDefineExpr, slotDescByName);
if (LOG.isDebugEnabled()) {
LOG.debug("after init column, exprMap: {}", exprsByName);
}
}
private static SlotRef getSlotFromDesc(SlotDescriptor slotDesc) {
SlotRef slot = new SlotRef(slotDesc);
slot.setType(slotDesc.getType());
return slot;
}
public static Expr getExprFromDesc(Analyzer analyzer, Expr rhs, SlotRef slot) throws AnalysisException {
Type rhsType = rhs.getType();
rhs = rhs.castTo(slot.getType());
if (slot.getDesc() == null) {
// shadow column
return rhs;
}
if (rhs.isNullable() && !slot.isNullable()) {
rhs = new FunctionCallExpr("non_nullable", Lists.newArrayList(rhs));
rhs.setType(rhsType);
rhs.analyze(analyzer);
} else if (!rhs.isNullable() && slot.isNullable()) {
rhs = new FunctionCallExpr("nullable", Lists.newArrayList(rhs));
rhs.setType(rhsType);
rhs.analyze(analyzer);
}
return rhs;
}
private static void analyzeAllExprs(Table tbl, Analyzer analyzer, Map<String, Expr> exprsByName,
Map<String, Expr> mvDefineExpr, Map<String, SlotDescriptor> slotDescByName) throws UserException {
// analyze all exprs
for (Map.Entry<String, Expr> entry : exprsByName.entrySet()) {
ExprSubstitutionMap smap = new ExprSubstitutionMap();
List<SlotRef> slots = Lists.newArrayList();
entry.getValue().collect(SlotRef.class, slots);
for (SlotRef slot : slots) {
SlotDescriptor slotDesc = slotDescByName.get(slot.getColumnName());
if (slotDesc == null) {
if (entry.getKey() != null) {
if (entry.getKey().equalsIgnoreCase(Column.DELETE_SIGN)) {
throw new UserException("unknown reference column in DELETE ON clause:"
+ slot.getColumnName());
} else if (entry.getKey().equalsIgnoreCase(Column.SEQUENCE_COL)) {
throw new UserException("unknown reference column in ORDER BY clause:"
+ slot.getColumnName());
}
}
throw new UserException("unknown reference column, column=" + entry.getKey()
+ ", reference=" + slot.getColumnName());
}
smap.getLhs().add(slot);
smap.getRhs().add(new SlotRef(slotDesc));
}
Expr expr = entry.getValue().clone(smap);
expr.analyze(analyzer);
// check if contain aggregation
List<FunctionCallExpr> funcs = Lists.newArrayList();
expr.collect(FunctionCallExpr.class, funcs);
for (FunctionCallExpr fn : funcs) {
if (fn.isAggregateFunction()) {
throw new AnalysisException("Don't support aggregation function in load expression");
}
}
// Array/Map/Struct type do not support cast now
Type exprReturnType = expr.getType();
if (exprReturnType.isComplexType()) {
Type schemaType = tbl.getColumn(entry.getKey()).getType();
if (!exprReturnType.matchesType(schemaType)) {
throw new AnalysisException("Don't support load from type:" + exprReturnType + " to type:"
+ schemaType + " for column:" + entry.getKey());
}
}
exprsByName.put(entry.getKey(), expr);
}
for (Map.Entry<String, Expr> entry : mvDefineExpr.entrySet()) {
ExprSubstitutionMap smap = new ExprSubstitutionMap();
List<SlotRef> slots = Lists.newArrayList();
entry.getValue().collect(SlotRef.class, slots);
for (SlotRef slot : slots) {
if (exprsByName.get(slot.getColumnName()) != null) {
smap.getLhs().add(slot);
if (tbl.getColumn(slot.getColumnName()).getType()
.equals(exprsByName.get(slot.getColumnName()).getType())) {
smap.getRhs().add(exprsByName.get(slot.getColumnName()));
} else {
smap.getRhs().add(new CastExpr(tbl.getColumn(slot.getColumnName()).getType(),
exprsByName.get(slot.getColumnName())));
}
} else if (slotDescByName.get(slot.getColumnName()) != null) {
smap.getLhs().add(slot);
smap.getRhs().add(
getExprFromDesc(analyzer, getSlotFromDesc(slotDescByName.get(slot.getColumnName())), slot));
} else {
if (entry.getKey().equalsIgnoreCase(Column.DELETE_SIGN)) {
throw new UserException("unknown reference column in DELETE ON clause:" + slot.getColumnName());
} else if (entry.getKey().equalsIgnoreCase(Column.SEQUENCE_COL)) {
throw new UserException("unknown reference column in ORDER BY clause:" + slot.getColumnName());
} else {
throw new UserException("unknown reference column, column=" + entry.getKey()
+ ", reference=" + slot.getColumnName());
}
}
}
Expr expr = entry.getValue().clone(smap);
expr.analyze(analyzer);
exprsByName.put(entry.getKey(), expr);
}
}
public static void rewriteColumns(LoadTaskInfo.ImportColumnDescs columnDescs) {
if (columnDescs.isColumnDescsRewrited) {
return;
}
Map<String, Expr> derivativeColumns = new HashMap<>();
// find and rewrite the derivative columns
// e.g. (v1,v2=v1+1,v3=v2+1) --> (v1, v2=v1+1, v3=v1+1+1)
// 1. find the derivative columns
for (ImportColumnDesc importColumnDesc : columnDescs.descs) {
if (!importColumnDesc.isColumn()) {
if (importColumnDesc.getExpr() instanceof SlotRef) {
String columnName = ((SlotRef) importColumnDesc.getExpr()).getColumnName();
if (derivativeColumns.containsKey(columnName)) {
importColumnDesc.setExpr(derivativeColumns.get(columnName));
}
} else {
ExprUtil.recursiveRewrite(importColumnDesc.getExpr(), derivativeColumns);
}
derivativeColumns.put(importColumnDesc.getColumnName(), importColumnDesc.getExpr());
}
}
columnDescs.isColumnDescsRewrited = true;
}
/**
* This method is used to transform hadoop function.
* The hadoop function includes: replace_value, strftime, time_format, alignment_timestamp, default_value, now.
* It rewrites those function with real function name and param.
* For the other function, the expr only go through this function and the origin expr is returned.
*
* @param columnName
* @param originExpr
* @return
* @throws UserException
*/
private static Expr transformHadoopFunctionExpr(Table tbl, String columnName, Expr originExpr)
throws UserException {
Column column = tbl.getColumn(columnName);
if (column == null) {
// the unknown column will be checked later.
return originExpr;
}
// To compatible with older load version
if (originExpr instanceof FunctionCallExpr) {
FunctionCallExpr funcExpr = (FunctionCallExpr) originExpr;
String funcName = funcExpr.getFnName().getFunction();
if (funcName.equalsIgnoreCase("replace_value")) {
List<Expr> exprs = Lists.newArrayList();
SlotRef slotRef = new SlotRef(null, columnName);
// We will convert this to IF(`col` != child0, `col`, child1),
// because we need the if return type equal to `col`, we use NE
/*
* We will convert this based on different cases:
* case 1: k1 = replace_value(null, anyval);
* to: k1 = if (k1 is not null, k1, anyval);
*
* case 2: k1 = replace_value(anyval1, anyval2);
* to: k1 = if (k1 is not null, if(k1 != anyval1, k1, anyval2), null);
*/
if (funcExpr.getChild(0) instanceof NullLiteral) {
// case 1
exprs.add(new IsNullPredicate(slotRef, true));
exprs.add(slotRef);
if (funcExpr.hasChild(1)) {
exprs.add(funcExpr.getChild(1));
} else {
if (column.getDefaultValue() != null) {
if (column.getDefaultValueExprDef() != null) {
exprs.add(column.getDefaultValueExpr());
} else {
exprs.add(new StringLiteral(column.getDefaultValue()));
}
} else {
if (column.isAllowNull()) {
exprs.add(NullLiteral.create(Type.VARCHAR));
} else {
throw new UserException("Column(" + columnName + ") has no default value.");
}
}
}
} else {
// case 2
exprs.add(new IsNullPredicate(slotRef, true));
List<Expr> innerIfExprs = Lists.newArrayList();
innerIfExprs.add(new BinaryPredicate(BinaryPredicate.Operator.NE, slotRef, funcExpr.getChild(0)));
innerIfExprs.add(slotRef);
if (funcExpr.hasChild(1)) {
innerIfExprs.add(funcExpr.getChild(1));
} else {
if (column.getDefaultValue() != null) {
if (column.getDefaultValueExprDef() != null) {
innerIfExprs.add(column.getDefaultValueExpr());
} else {
innerIfExprs.add(new StringLiteral(column.getDefaultValue()));
}
} else {
if (column.isAllowNull()) {
innerIfExprs.add(NullLiteral.create(Type.VARCHAR));
} else {
throw new UserException("Column(" + columnName + ") has no default value.");
}
}
}
FunctionCallExpr innerIfFn = new FunctionCallExpr("if", innerIfExprs);
exprs.add(innerIfFn);
exprs.add(NullLiteral.create(Type.VARCHAR));
}
if (LOG.isDebugEnabled()) {
LOG.debug("replace_value expr: {}", exprs);
}
FunctionCallExpr newFn = new FunctionCallExpr("if", exprs);
return newFn;
} else if (funcName.equalsIgnoreCase("strftime")) {
// FROM_UNIXTIME(val)
FunctionName fromUnixName = new FunctionName("FROM_UNIXTIME");
List<Expr> fromUnixArgs = Lists.newArrayList(funcExpr.getChild(1));
FunctionCallExpr fromUnixFunc = new FunctionCallExpr(
fromUnixName, new FunctionParams(false, fromUnixArgs));
return fromUnixFunc;
} else if (funcName.equalsIgnoreCase("time_format")) {
// DATE_FORMAT(STR_TO_DATE(dt_str, dt_fmt))
FunctionName strToDateName = new FunctionName("STR_TO_DATE");
List<Expr> strToDateExprs = Lists.newArrayList(funcExpr.getChild(2), funcExpr.getChild(1));
FunctionCallExpr strToDateFuncExpr = new FunctionCallExpr(
strToDateName, new FunctionParams(false, strToDateExprs));
FunctionName dateFormatName = new FunctionName("DATE_FORMAT");
List<Expr> dateFormatArgs = Lists.newArrayList(strToDateFuncExpr, funcExpr.getChild(0));
FunctionCallExpr dateFormatFunc = new FunctionCallExpr(
dateFormatName, new FunctionParams(false, dateFormatArgs));
return dateFormatFunc;
} else if (funcName.equalsIgnoreCase("alignment_timestamp")) {
/*
* change to:
* UNIX_TIMESTAMP(DATE_FORMAT(FROM_UNIXTIME(ts), "%Y-01-01 00:00:00"));
*
*/
// FROM_UNIXTIME
FunctionName fromUnixName = new FunctionName("FROM_UNIXTIME");
List<Expr> fromUnixArgs = Lists.newArrayList(funcExpr.getChild(1));
FunctionCallExpr fromUnixFunc = new FunctionCallExpr(
fromUnixName, new FunctionParams(false, fromUnixArgs));
// DATE_FORMAT
StringLiteral precision = (StringLiteral) funcExpr.getChild(0);
StringLiteral format;
if (precision.getStringValue().equalsIgnoreCase("year")) {
format = new StringLiteral("%Y-01-01 00:00:00");
} else if (precision.getStringValue().equalsIgnoreCase("month")) {
format = new StringLiteral("%Y-%m-01 00:00:00");
} else if (precision.getStringValue().equalsIgnoreCase("day")) {
format = new StringLiteral("%Y-%m-%d 00:00:00");
} else if (precision.getStringValue().equalsIgnoreCase("hour")) {
format = new StringLiteral("%Y-%m-%d %H:00:00");
} else {
throw new UserException("Unknown precision(" + precision.getStringValue() + ")");
}
FunctionName dateFormatName = new FunctionName("DATE_FORMAT");
List<Expr> dateFormatArgs = Lists.newArrayList(fromUnixFunc, format);
FunctionCallExpr dateFormatFunc = new FunctionCallExpr(
dateFormatName, new FunctionParams(false, dateFormatArgs));
// UNIX_TIMESTAMP
FunctionName unixTimeName = new FunctionName("UNIX_TIMESTAMP");
List<Expr> unixTimeArgs = Lists.newArrayList();
unixTimeArgs.add(dateFormatFunc);
FunctionCallExpr unixTimeFunc = new FunctionCallExpr(
unixTimeName, new FunctionParams(false, unixTimeArgs));
return unixTimeFunc;
} else if (funcName.equalsIgnoreCase("default_value")) {
return funcExpr.getChild(0);
} else if (funcName.equalsIgnoreCase("now")) {
FunctionName nowFunctionName = new FunctionName("NOW");
FunctionCallExpr newFunc = new FunctionCallExpr(nowFunctionName, new FunctionParams(null));
return newFunc;
} else if (funcName.equalsIgnoreCase("substitute")) {
return funcExpr.getChild(0);
}
}
return originExpr;
}
// return true if we truly register a mini load label
// return false otherwise (eg: a retry request)
public boolean registerMiniLabel(String fullDbName, String label, long timestamp) throws DdlException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(fullDbName);
long dbId = db.getId();
writeLock();
try {
if (unprotectIsLabelUsed(dbId, label, timestamp, true)) {
// label is used and this is a retry request.
// no need to do further operation, just return.
return false;
}
Map<String, Long> miniLabels = null;
if (dbToMiniLabels.containsKey(dbId)) {
miniLabels = dbToMiniLabels.get(dbId);
} else {
miniLabels = Maps.newHashMap();
dbToMiniLabels.put(dbId, miniLabels);
}
miniLabels.put(label, timestamp);
return true;
} finally {
writeUnlock();
}
}
public void deregisterMiniLabel(String fullDbName, String label) throws DdlException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(fullDbName);
long dbId = db.getId();
writeLock();
try {
if (!dbToMiniLabels.containsKey(dbId)) {
return;
}
Map<String, Long> miniLabels = dbToMiniLabels.get(dbId);
miniLabels.remove(label);
if (miniLabels.isEmpty()) {
dbToMiniLabels.remove(dbId);
}
} finally {
writeUnlock();
}
}
public boolean isUncommittedLabel(long dbId, String label) throws DdlException {
readLock();
try {
if (dbToMiniLabels.containsKey(dbId)) {
Map<String, Long> uncommittedLabels = dbToMiniLabels.get(dbId);
return uncommittedLabels.containsKey(label);
}
} finally {
readUnlock();
}
return false;
}
public boolean isLabelUsed(long dbId, String label) throws DdlException {
readLock();
try {
return unprotectIsLabelUsed(dbId, label, -1, true);
} finally {
readUnlock();
}
}
/*
* 1. if label is already used, and this is not a retry request,
* throw exception ("Label already used")
* 2. if label is already used, but this is a retry request,
* return true
* 3. if label is not used, return false
* 4. throw exception if encounter error.
*/
private boolean unprotectIsLabelUsed(long dbId, String label, long timestamp, boolean checkMini)
throws DdlException {
// check dbLabelToLoadJobs
if (dbLabelToLoadJobs.containsKey(dbId)) {
Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(dbId);
if (labelToLoadJobs.containsKey(label)) {
List<LoadJob> labelLoadJobs = labelToLoadJobs.get(label);
for (LoadJob oldJob : labelLoadJobs) {
JobState oldJobState = oldJob.getState();
if (oldJobState != JobState.CANCELLED) {
if (timestamp == -1) {
// timestamp == -1 is for compatibility
throw new LabelAlreadyUsedException(label);
} else {
if (timestamp == oldJob.getTimestamp()) {
// this timestamp is used to verify if this label check is a retry request from backend.
// if the timestamp in request is same as timestamp in existing load job,
// which means this load job is already submitted
LOG.info("get a retry request with label: {}, timestamp: {}. return ok",
label, timestamp);
return true;
} else {
throw new LabelAlreadyUsedException(label);
}
}
}
}
}
}
// check dbToMiniLabel
if (checkMini) {
return checkMultiLabelUsed(dbId, label, timestamp);
}
return false;
}
private boolean checkMultiLabelUsed(long dbId, String label, long timestamp) throws DdlException {
if (dbToMiniLabels.containsKey(dbId)) {
Map<String, Long> uncommittedLabels = dbToMiniLabels.get(dbId);
if (uncommittedLabels.containsKey(label)) {
if (timestamp == -1) {
throw new LabelAlreadyUsedException(label);
} else {
if (timestamp == uncommittedLabels.get(label)) {
// this timestamp is used to verify if this label check is a retry request from backend.
// if the timestamp in request is same as timestamp in existing load job,
// which means this load job is already submitted
LOG.info("get a retry mini load request with label: {}, timestamp: {}. return ok",
label, timestamp);
return true;
} else {
throw new LabelAlreadyUsedException(label);
}
}
}
}
return false;
}
public Map<Long, LoadJob> getIdToLoadJob() {
return idToLoadJob;
}
public Map<Long, List<LoadJob>> getDbToLoadJobs() {
return dbToLoadJobs;
}
public List<LoadJob> getLoadJobs(JobState jobState) {
List<LoadJob> jobs = new ArrayList<LoadJob>();
Collection<LoadJob> stateJobs = null;
readLock();
try {
switch (jobState) {
case PENDING:
stateJobs = idToPendingLoadJob.values();
break;
case ETL:
stateJobs = idToEtlLoadJob.values();
break;
case LOADING:
stateJobs = idToLoadingLoadJob.values();
break;
case QUORUM_FINISHED:
stateJobs = idToQuorumFinishedLoadJob.values();
break;
default:
break;
}
if (stateJobs != null) {
jobs.addAll(stateJobs);
}
} finally {
readUnlock();
}
return jobs;
}
public long getLoadJobNum(JobState jobState, long dbId) {
readLock();
try {
List<LoadJob> loadJobs = this.dbToLoadJobs.get(dbId);
if (loadJobs == null) {
return 0;
}
int jobNum = 0;
for (LoadJob job : loadJobs) {
if (job.getState() == jobState) {
++jobNum;
}
}
return jobNum;
} finally {
readUnlock();
}
}
public long getLoadJobNum(JobState jobState) {
readLock();
try {
List<LoadJob> loadJobs = new ArrayList<>();
for (Long dbId : dbToLoadJobs.keySet()) {
if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
InternalCatalog.INTERNAL_CATALOG_NAME,
Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(),
PrivPredicate.LOAD)) {
continue;
}
loadJobs.addAll(this.dbToLoadJobs.get(dbId));
}
int jobNum = 0;
for (LoadJob job : loadJobs) {
if (job.getState() == jobState) {
++jobNum;
}
}
return jobNum;
} finally {
readUnlock();
}
}
public LoadJob getLoadJob(long jobId) {
readLock();
try {
return idToLoadJob.get(jobId);
} finally {
readUnlock();
}
}
public LinkedList<List<Comparable>> getAllLoadJobInfos() {
LinkedList<List<Comparable>> loadJobInfos = new LinkedList<List<Comparable>>();
readLock();
try {
List<LoadJob> loadJobs = new ArrayList<>();
for (Long dbId : dbToLoadJobs.keySet()) {
if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
InternalCatalog.INTERNAL_CATALOG_NAME,
Env.getCurrentEnv().getCatalogMgr().getDbNullable(dbId).getFullName(),
PrivPredicate.LOAD)) {
continue;
}
loadJobs.addAll(this.dbToLoadJobs.get(dbId));
}
if (loadJobs.size() == 0) {
return loadJobInfos;
}
long start = System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug("begin to get load job info, size: {}", loadJobs.size());
}
for (LoadJob loadJob : loadJobs) {
// filter first
String dbName = Env.getCurrentEnv().getCatalogMgr().getDbNullable(loadJob.getDbId()).getFullName();
// check auth
Set<String> tableNames = loadJob.getTableNames();
boolean auth = true;
for (String tblName : tableNames) {
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName,
tblName, PrivPredicate.LOAD)) {
auth = false;
break;
}
}
if (!auth) {
continue;
}
loadJobInfos.add(composeJobInfoByLoadJob(loadJob));
} // end for loadJobs
if (LOG.isDebugEnabled()) {
LOG.debug("finished to get load job info, cost: {}", (System.currentTimeMillis() - start));
}
} finally {
readUnlock();
}
return loadJobInfos;
}
private List<Comparable> composeJobInfoByLoadJob(LoadJob loadJob) {
List<Comparable> jobInfo = new ArrayList<Comparable>();
// jobId
jobInfo.add(loadJob.getId());
// label
jobInfo.add(loadJob.getLabel());
// state
jobInfo.add(loadJob.getState().name());
// progress
switch (loadJob.getState()) {
case PENDING:
jobInfo.add("ETL:0%; LOAD:0%");
break;
case ETL:
jobInfo.add("ETL:" + loadJob.getProgress() + "%; LOAD:0%");
break;
case LOADING:
jobInfo.add("ETL:100%; LOAD:" + loadJob.getProgress() + "%");
break;
case QUORUM_FINISHED:
case FINISHED:
jobInfo.add("ETL:100%; LOAD:100%");
break;
case CANCELLED:
default:
jobInfo.add("ETL:N/A; LOAD:N/A");
break;
}
// type
jobInfo.add(loadJob.getEtlJobType().name());
// etl info
EtlStatus status = loadJob.getEtlJobStatus();
if (status == null || status.getState() == TEtlState.CANCELLED) {
jobInfo.add(FeConstants.null_string);
} else {
Map<String, String> counters = status.getCounters();
List<String> info = Lists.newArrayList();
for (String key : counters.keySet()) {
// XXX: internal etl job return all counters
if (key.equalsIgnoreCase("HDFS bytes read")
|| key.equalsIgnoreCase("Map input records")
|| key.startsWith("dpp.")
|| loadJob.getEtlJobType() == EtlJobType.MINI) {
info.add(key + "=" + counters.get(key));
}
} // end for counters
if (info.isEmpty()) {
jobInfo.add(FeConstants.null_string);
} else {
jobInfo.add(StringUtils.join(info, "; "));
}
}
// task info
jobInfo.add("cluster:" + loadJob.getHadoopCluster()
+ "; timeout(s):" + loadJob.getTimeoutSecond()
+ "; max_filter_ratio:" + loadJob.getMaxFilterRatio());
// error msg
if (loadJob.getState() == JobState.CANCELLED) {
FailMsg failMsg = loadJob.getFailMsg();
jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg());
} else {
jobInfo.add(FeConstants.null_string);
}
// create time
jobInfo.add(TimeUtils.longToTimeString(loadJob.getCreateTimeMs()));
// etl start time
jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlStartTimeMs()));
// etl end time
jobInfo.add(TimeUtils.longToTimeString(loadJob.getEtlFinishTimeMs()));
// load start time
jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadStartTimeMs()));
// load end time
jobInfo.add(TimeUtils.longToTimeString(loadJob.getLoadFinishTimeMs()));
// tracking url
jobInfo.add(status.getTrackingUrl());
// job detail(not used for hadoop load, just return an empty string)
jobInfo.add("");
// transaction id
jobInfo.add(loadJob.getTransactionId());
// error tablets(not used for hadoop load, just return an empty string)
jobInfo.add("");
// user
jobInfo.add(loadJob.getUser());
// comment
jobInfo.add(loadJob.getComment());
return jobInfo;
}
public LinkedList<List<Comparable>> getLoadJobInfosByDb(long dbId, String dbName, String labelValue,
boolean accurateMatch, Set<JobState> states) throws AnalysisException {
LinkedList<List<Comparable>> loadJobInfos = new LinkedList<List<Comparable>>();
readLock();
try {
List<LoadJob> loadJobs = this.dbToLoadJobs.get(dbId);
if (loadJobs == null) {
return loadJobInfos;
}
long start = System.currentTimeMillis();
if (LOG.isDebugEnabled()) {
LOG.debug("begin to get load job info, size: {}", loadJobs.size());
}
PatternMatcher matcher = null;
if (labelValue != null && !accurateMatch) {
matcher = PatternMatcherWrapper.createMysqlPattern(labelValue,
CaseSensibility.LABEL.getCaseSensibility());
}
for (LoadJob loadJob : loadJobs) {
// filter first
String label = loadJob.getLabel();
JobState state = loadJob.getState();
if (labelValue != null) {
if (accurateMatch) {
if (!label.equals(labelValue)) {
continue;
}
} else {
if (!matcher.match(label)) {
continue;
}
}
}
if (states != null) {
if (!states.contains(state)) {
continue;
}
}
// check auth
Set<String> tableNames = loadJob.getTableNames();
if (tableNames.isEmpty()) {
// forward compatibility
if (!Env.getCurrentEnv().getAccessManager()
.checkDbPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName,
PrivPredicate.LOAD)) {
continue;
}
} else {
boolean auth = true;
for (String tblName : tableNames) {
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName,
tblName, PrivPredicate.LOAD)) {
auth = false;
break;
}
}
if (!auth) {
continue;
}
}
loadJobInfos.add(composeJobInfoByLoadJob(loadJob));
} // end for loadJobs
if (LOG.isDebugEnabled()) {
LOG.debug("finished to get load job info, cost: {}", (System.currentTimeMillis() - start));
}
} finally {
readUnlock();
}
return loadJobInfos;
}
public long getLatestJobIdByLabel(long dbId, String labelValue) {
long jobId = 0;
readLock();
try {
List<LoadJob> loadJobs = this.dbToLoadJobs.get(dbId);
if (loadJobs == null) {
return 0;
}
for (LoadJob loadJob : loadJobs) {
String label = loadJob.getLabel();
if (labelValue != null) {
if (!label.equals(labelValue)) {
continue;
}
}
long currJobId = loadJob.getId();
if (currJobId > jobId) {
jobId = currJobId;
}
}
} finally {
readUnlock();
}
return jobId;
}
public static class JobInfo {
public String dbName;
public Set<String> tblNames = Sets.newHashSet();
public String label;
public JobState state;
public String failMsg;
public String trackingUrl;
public JobInfo(String dbName, String label) {
this.dbName = dbName;
this.label = label;
}
}
// Get job state
// result saved in info
public void getJobInfo(JobInfo info) throws DdlException, MetaNotFoundException {
String fullDbName = info.dbName;
info.dbName = fullDbName;
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(fullDbName);
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
throw new DdlException("No jobs belong to database(" + info.dbName + ")");
}
List<LoadJob> loadJobs = labelToLoadJobs.get(info.label);
if (loadJobs == null) {
throw new DdlException("Unknown job(" + info.label + ")");
}
// only the last one should be running
LoadJob job = loadJobs.get(loadJobs.size() - 1);
if (!job.getTableNames().isEmpty()) {
info.tblNames.addAll(job.getTableNames());
}
info.state = job.getState();
if (info.state == JobState.QUORUM_FINISHED) {
info.state = JobState.FINISHED;
}
info.failMsg = job.getFailMsg().getMsg();
info.trackingUrl = job.getEtlJobStatus().getTrackingUrl();
} finally {
readUnlock();
}
}
public void replayClearRollupInfo(ReplicaPersistInfo info, Env env) throws MetaNotFoundException {
Database db = env.getInternalCatalog().getDbOrMetaException(info.getDbId());
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
olapTable.writeLock();
try {
Partition partition = olapTable.getPartition(info.getPartitionId());
MaterializedIndex index = partition.getIndex(info.getIndexId());
index.clearRollupIndexInfo();
} finally {
olapTable.writeUnlock();
}
}
// remove all db jobs from dbToLoadJobs and dbLabelToLoadJobs
// only remove finished or cancelled job from idToLoadJob
// LoadChecker will update other state jobs to cancelled or finished,
// and they will be removed by removeOldLoadJobs periodically
public void removeDbLoadJob(long dbId) {
writeLock();
try {
if (dbToLoadJobs.containsKey(dbId)) {
List<LoadJob> dbLoadJobs = dbToLoadJobs.remove(dbId);
for (LoadJob job : dbLoadJobs) {
JobState state = job.getState();
if (state == JobState.CANCELLED || state == JobState.FINISHED) {
idToLoadJob.remove(job.getId());
}
}
}
if (dbLabelToLoadJobs.containsKey(dbId)) {
dbLabelToLoadJobs.remove(dbId);
}
} finally {
writeUnlock();
}
}
// Added by ljb. Remove old load jobs from idToLoadJob, dbToLoadJobs and dbLabelToLoadJobs
// This function is called periodically. every Configure.label_clean_interval_second seconds
public void removeOldLoadJobs() {
long currentTimeMs = System.currentTimeMillis();
writeLock();
try {
Iterator<Map.Entry<Long, LoadJob>> iter = idToLoadJob.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Long, LoadJob> entry = iter.next();
LoadJob job = entry.getValue();
if (job.isExpired(currentTimeMs)) {
long dbId = job.getDbId();
String label = job.getLabel();
// Remove job from idToLoadJob
iter.remove();
// Remove job from dbToLoadJobs
List<LoadJob> loadJobs = dbToLoadJobs.get(dbId);
if (loadJobs != null) {
loadJobs.remove(job);
if (loadJobs.size() == 0) {
dbToLoadJobs.remove(dbId);
}
}
// Remove job from dbLabelToLoadJobs
Map<String, List<LoadJob>> mapLabelToJobs = dbLabelToLoadJobs.get(dbId);
if (mapLabelToJobs != null) {
loadJobs = mapLabelToJobs.get(label);
if (loadJobs != null) {
loadJobs.remove(job);
if (loadJobs.isEmpty()) {
mapLabelToJobs.remove(label);
if (mapLabelToJobs.isEmpty()) {
dbLabelToLoadJobs.remove(dbId);
}
}
}
}
}
}
} finally {
writeUnlock();
}
}
// clear dpp output and kill etl job
public void clearJob(LoadJob job, JobState srcState) {
JobState state = job.getState();
if (state != JobState.CANCELLED && state != JobState.FINISHED) {
LOG.warn("job state error. state: {}", state);
return;
}
EtlJobType etlJobType = job.getEtlJobType();
switch (etlJobType) {
case HADOOP:
DppScheduler dppScheduler = new DppScheduler(job.getHadoopDppConfig());
// kill etl job
if (state == JobState.CANCELLED && srcState == JobState.ETL) {
try {
dppScheduler.killEtlJob(job.getHadoopEtlJobId());
} catch (Exception e) {
LOG.warn("kill etl job error", e);
}
}
// delete all dirs related to job label, use "" instead of job.getEtlOutputDir()
// hdfs://host:port/outputPath/dbId/loadLabel/
DppConfig dppConfig = job.getHadoopDppConfig();
String outputPath = DppScheduler.getEtlOutputPath(dppConfig.getFsDefaultName(),
dppConfig.getOutputPath(), job.getDbId(), job.getLabel(), "");
try {
dppScheduler.deleteEtlOutputPath(outputPath);
} catch (Exception e) {
LOG.warn("delete etl output path error", e);
}
break;
case MINI:
break;
case INSERT:
break;
case BROKER:
break;
case DELETE:
break;
default:
LOG.warn("unknown etl job type. type: {}, job id: {}", etlJobType.name(), job.getId());
break;
}
}
public boolean addLoadingPartitions(Set<Long> partitionIds) {
writeLock();
try {
for (long partitionId : partitionIds) {
if (loadingPartitionIds.contains(partitionId)) {
LOG.info("partition {} is loading", partitionId);
return false;
}
}
loadingPartitionIds.addAll(partitionIds);
return true;
} finally {
writeUnlock();
}
}
public void removeLoadingPartitions(Set<Long> partitionIds) {
writeLock();
try {
loadingPartitionIds.removeAll(partitionIds);
} finally {
writeUnlock();
}
}
public boolean checkPartitionLoadFinished(long partitionId, List<LoadJob> quorumFinishedLoadJobs) {
readLock();
try {
for (JobState state : JobState.values()) {
if (state == JobState.FINISHED || state == JobState.CANCELLED) {
continue;
}
// we check PENDING / ETL / LOADING
List<LoadJob> loadJobs = this.getLoadJobs(state);
for (LoadJob loadJob : loadJobs) {
Preconditions.checkNotNull(loadJob.getIdToTableLoadInfo());
for (TableLoadInfo tableLoadInfo : loadJob.getIdToTableLoadInfo().values()) {
if (tableLoadInfo.getIdToPartitionLoadInfo().containsKey(partitionId)) {
if (state == JobState.QUORUM_FINISHED) {
if (quorumFinishedLoadJobs != null) {
quorumFinishedLoadJobs.add(loadJob);
} else {
return false;
}
} else {
return false;
}
}
}
}
}
return true;
} finally {
readUnlock();
}
}
}