FileLoadScanNode.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.ArithmeticExpr;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.FederationBackendPolicy;
import org.apache.doris.datasource.FileGroupInfo;
import org.apache.doris.datasource.FileScanNode;
import org.apache.doris.datasource.LoadScanProvider;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.nereids.load.NereidsFileGroupInfo;
import org.apache.doris.nereids.load.NereidsLoadPlanInfoCollector;
import org.apache.doris.nereids.load.NereidsParamCreateContext;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TUniqueKeyUpdateMode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
/**
* FileLoadScanNode for broker load and stream load.
*/
public class FileLoadScanNode extends FileScanNode {
private static final Logger LOG = LogManager.getLogger(FileLoadScanNode.class);
public static class ParamCreateContext {
public BrokerFileGroup fileGroup;
public TupleDescriptor destTupleDescriptor;
// === Set when init ===
public TupleDescriptor srcTupleDescriptor;
public Map<String, SlotDescriptor> srcSlotDescByName;
public Map<String, Expr> exprMap;
public String timezone;
// === Set when init ===
public TFileScanRangeParams params;
}
// Save all info about load attributes and files.
// Each DataDescription in a load stmt conreponding to a FileGroupInfo in this list.
private final List<FileGroupInfo> fileGroupInfos = Lists.newArrayList();
// For load, the num of providers equals to the num of file group infos.
private final List<LoadScanProvider> scanProviders = Lists.newArrayList();
// For load, the num of ParamCreateContext equals to the num of file group infos.
private final List<ParamCreateContext> contexts = Lists.newArrayList();
/**
* External file scan node for load from file
* These scan nodes do not have corresponding catalog/database/table info, so no need to do priv check
*/
public FileLoadScanNode(PlanNodeId id, TupleDescriptor desc) {
super(id, desc, "FILE_LOAD_SCAN_NODE", StatisticalType.FILE_SCAN_NODE, false);
}
// Only for broker load job.
public void setLoadInfo(long loadJobId, long txnId, Table targetTable, BrokerDesc brokerDesc,
List<BrokerFileGroup> fileGroups, List<List<TBrokerFileStatus>> fileStatusesList,
int filesAdded, boolean strictMode, int loadParallelism, UserIdentity userIdentity) {
Preconditions.checkState(fileGroups.size() == fileStatusesList.size());
for (int i = 0; i < fileGroups.size(); ++i) {
FileGroupInfo fileGroupInfo = new FileGroupInfo(loadJobId, txnId, targetTable, brokerDesc,
fileGroups.get(i), fileStatusesList.get(i), filesAdded, strictMode, loadParallelism);
fileGroupInfos.add(fileGroupInfo);
}
}
// Only for stream load/routine load job.
public void setLoadInfo(TUniqueId loadId, long txnId, Table targetTable, BrokerDesc brokerDesc,
BrokerFileGroup fileGroup, TBrokerFileStatus fileStatus, boolean strictMode,
TFileType fileType, List<String> hiddenColumns, TUniqueKeyUpdateMode uniquekeyUpdateMode,
String sequenceMapCol) {
FileGroupInfo fileGroupInfo = new FileGroupInfo(loadId, txnId, targetTable, brokerDesc, fileGroup,
fileStatus, strictMode, fileType, hiddenColumns, uniquekeyUpdateMode, sequenceMapCol);
fileGroupInfos.add(fileGroupInfo);
}
@Override
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
for (FileGroupInfo fileGroupInfo : fileGroupInfos) {
this.scanProviders.add(new LoadScanProvider(fileGroupInfo, desc));
}
initParamCreateContexts(analyzer);
}
// For each scan provider, create a corresponding ParamCreateContext
private void initParamCreateContexts(Analyzer analyzer) throws UserException {
for (LoadScanProvider scanProvider : scanProviders) {
ParamCreateContext context = scanProvider.createContext(analyzer);
// set where and preceding filter.
// FIXME(cmy): we should support set different expr for different file group.
initAndSetPrecedingFilter(context.fileGroup.getPrecedingFilterExpr(), context.srcTupleDescriptor, analyzer);
initAndSetWhereExpr(context.fileGroup.getWhereExpr(), context.destTupleDescriptor, analyzer);
setDefaultValueExprs(scanProvider.getTargetTable(), context.srcSlotDescByName,
context.exprMap, context.params, true);
this.contexts.add(context);
}
}
private void initAndSetPrecedingFilter(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer)
throws UserException {
Expr newWhereExpr = initWhereExpr(whereExpr, tupleDesc, analyzer);
if (newWhereExpr != null) {
addPreFilterConjuncts(newWhereExpr.getConjuncts());
}
}
private void initAndSetWhereExpr(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer)
throws UserException {
Expr newWhereExpr = initWhereExpr(whereExpr, tupleDesc, analyzer);
if (newWhereExpr != null) {
addConjuncts(newWhereExpr.getConjuncts());
}
}
private Expr initWhereExpr(Expr whereExpr, TupleDescriptor tupleDesc, Analyzer analyzer) throws UserException {
if (whereExpr == null) {
return null;
}
Map<String, SlotDescriptor> dstDescMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (SlotDescriptor slotDescriptor : tupleDesc.getSlots()) {
dstDescMap.put(slotDescriptor.getColumn().getName(), slotDescriptor);
}
// substitute SlotRef in filter expression
// where expr must be equal first to transfer some predicates(eg: BetweenPredicate to BinaryPredicate)
Expr newWhereExpr = analyzer.getExprRewriter()
.rewrite(whereExpr, analyzer, ExprRewriter.ClauseType.WHERE_CLAUSE);
List<SlotRef> slots = Lists.newArrayList();
newWhereExpr.collect(SlotRef.class, slots);
ExprSubstitutionMap smap = new ExprSubstitutionMap();
for (SlotRef slot : slots) {
SlotDescriptor slotDesc = dstDescMap.get(slot.getColumnName());
if (slotDesc == null) {
throw new UserException(
"unknown column reference in where statement, reference=" + slot.getColumnName());
}
smap.getLhs().add(slot);
smap.getRhs().add(new SlotRef(slotDesc));
}
newWhereExpr = newWhereExpr.clone(smap);
newWhereExpr.analyze(analyzer);
if (!newWhereExpr.getType().equals(org.apache.doris.catalog.Type.BOOLEAN)) {
throw new UserException("where statement is not a valid statement return bool");
}
return newWhereExpr;
}
@Override
public void finalize(Analyzer analyzer) throws UserException {
Preconditions.checkState(contexts.size() == scanProviders.size(),
contexts.size() + " vs. " + scanProviders.size());
// ATTN: for load scan node, do not use backend policy in ExternalScanNode.
// Because backend policy in ExternalScanNode may only contain compute backend.
// But for load job, we should select backends from all backends, both compute and mix.
BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
.needQueryAvailable()
.needLoadAvailable()
.build();
FederationBackendPolicy localBackendPolicy = new FederationBackendPolicy();
localBackendPolicy.init(policy);
for (int i = 0; i < contexts.size(); ++i) {
FileLoadScanNode.ParamCreateContext context = contexts.get(i);
LoadScanProvider scanProvider = scanProviders.get(i);
finalizeParamsForLoad(context, analyzer);
createScanRangeLocations(context, scanProvider, localBackendPolicy);
this.selectedSplitNum += scanProvider.getInputSplitNum();
this.totalFileSize += scanProvider.getInputFileSize();
}
}
public void finalizeForNereids(TUniqueId loadId, List<NereidsFileGroupInfo> fileGroupInfos,
List<NereidsParamCreateContext> contexts, NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo)
throws UserException {
Preconditions.checkState(contexts.size() == fileGroupInfos.size(),
contexts.size() + " vs. " + fileGroupInfos.size());
List<Expr> preFilterList = loadPlanInfo.getPreFilterExprList();
if (preFilterList != null) {
addPreFilterConjuncts(preFilterList);
}
List<Expr> postFilterList = loadPlanInfo.getPostFilterExprList();
if (postFilterList != null) {
addConjuncts(postFilterList);
}
// ATTN: for load scan node, do not use backend policy in ExternalScanNode.
// Because backend policy in ExternalScanNode may only contain compute backend.
// But for load job, we should select backends from all backends, both compute and mix.
BeSelectionPolicy policy = new BeSelectionPolicy.Builder()
.needQueryAvailable()
.needLoadAvailable()
.build();
FederationBackendPolicy localBackendPolicy = new FederationBackendPolicy();
localBackendPolicy.init(policy);
for (int i = 0; i < contexts.size(); ++i) {
NereidsParamCreateContext context = contexts.get(i);
NereidsFileGroupInfo fileGroupInfo = fileGroupInfos.get(i);
context.params = loadPlanInfo.toFileScanRangeParams(loadId, fileGroupInfo);
createScanRangeLocations(context, fileGroupInfo, localBackendPolicy);
this.selectedSplitNum += fileGroupInfo.getFileStatuses().size();
for (TBrokerFileStatus fileStatus : fileGroupInfo.getFileStatuses()) {
this.totalFileSize += fileStatus.getSize();
}
}
}
private void createScanRangeLocations(NereidsParamCreateContext context,
NereidsFileGroupInfo fileGroupInfo, FederationBackendPolicy backendPolicy)
throws UserException {
fileGroupInfo.getFileStatusAndCalcInstance(backendPolicy);
fileGroupInfo.createScanRangeLocations(context, backendPolicy, scanRangeLocations);
}
// TODO: This api is for load job only. Will remove it later.
private void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context,
LoadScanProvider scanProvider, FederationBackendPolicy backendPolicy)
throws UserException {
scanProvider.createScanRangeLocations(context, backendPolicy, scanRangeLocations);
}
@Override
protected void createScanRangeLocations() throws UserException {
// do nothing, we have already created scan range locations in finalize
}
protected void finalizeParamsForLoad(ParamCreateContext context,
Analyzer analyzer) throws UserException {
Map<String, SlotDescriptor> slotDescByName = context.srcSlotDescByName;
Map<String, Expr> exprMap = context.exprMap;
TupleDescriptor srcTupleDesc = context.srcTupleDescriptor;
boolean negative = context.fileGroup.isNegative();
TFileScanRangeParams params = context.params;
Map<Integer, Integer> destSidToSrcSidWithoutTrans = Maps.newHashMap();
for (SlotDescriptor destSlotDesc : desc.getSlots()) {
if (!destSlotDesc.isMaterialized()) {
continue;
}
Expr expr = null;
if (exprMap != null) {
expr = exprMap.get(destSlotDesc.getColumn().getName());
}
if (expr == null) {
SlotDescriptor srcSlotDesc = slotDescByName.get(destSlotDesc.getColumn().getName());
if (srcSlotDesc != null) {
destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt());
// If dest is allow null, we set source to nullable
if (destSlotDesc.getColumn().isAllowNull()) {
srcSlotDesc.setIsNullable(true);
}
expr = new SlotRef(srcSlotDesc);
} else {
Column column = destSlotDesc.getColumn();
if (column.getDefaultValue() != null) {
if (column.getDefaultValueExprDef() != null) {
expr = column.getDefaultValueExpr();
expr.analyze(analyzer);
} else {
expr = new StringLiteral(destSlotDesc.getColumn().getDefaultValue());
}
} else {
if (column.isAllowNull()) {
expr = NullLiteral.create(column.getType());
} else if (column.isAutoInc()) {
// auto-increment column should be non-nullable
// however, here we use `NullLiteral` to indicate that a cell should
// be filled with generated value in `VOlapTableSink::_fill_auto_inc_cols()`
expr = NullLiteral.create(column.getType());
} else {
throw new AnalysisException("column has no source field, column=" + column.getName());
}
}
}
}
// check hll_hash
if (destSlotDesc.getType().getPrimitiveType() == PrimitiveType.HLL) {
if (!(expr instanceof FunctionCallExpr)) {
throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like "
+ destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx)");
}
FunctionCallExpr fn = (FunctionCallExpr) expr;
if (!fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_HASH) && !fn.getFnName()
.getFunction().equalsIgnoreCase("hll_empty")
&& !fn.getFnName().getFunction().equalsIgnoreCase(FunctionSet.HLL_FROM_BASE64)) {
throw new AnalysisException("HLL column must use " + FunctionSet.HLL_HASH + " function, like "
+ destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_HASH + "(xxx) or "
+ destSlotDesc.getColumn().getName() + "=" + FunctionSet.HLL_FROM_BASE64 + "(xxx) or "
+ destSlotDesc.getColumn().getName() + "=hll_empty()");
}
expr.setType(org.apache.doris.catalog.Type.HLL);
}
checkBitmapCompatibility(analyzer, destSlotDesc, expr);
checkQuantileStateCompatibility(analyzer, destSlotDesc, expr);
if (negative && destSlotDesc.getColumn().getAggregationType() == AggregateType.SUM) {
expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, expr, new IntLiteral(-1));
expr.analyze(analyzer);
}
// for jsonb type, use jsonb_parse_xxx to parse src string to jsonb.
// and if input string is not a valid json string, return null.
PrimitiveType dstType = destSlotDesc.getType().getPrimitiveType();
PrimitiveType srcType = expr.getType().getPrimitiveType();
if (dstType == PrimitiveType.JSONB
&& (srcType == PrimitiveType.VARCHAR || srcType == PrimitiveType.STRING)) {
List<Expr> args = Lists.newArrayList();
args.add(expr);
String nullable = "notnull";
if (destSlotDesc.getIsNullable() || expr.isNullable()) {
nullable = "nullable";
}
String name = "jsonb_parse_" + nullable + "_error_to_null";
expr = new FunctionCallExpr(name, args);
expr.analyze(analyzer);
} else {
expr = castToSlot(destSlotDesc, expr);
}
params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift());
}
params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans);
params.setDestTupleId(desc.getId().asInt());
params.setSrcTupleId(srcTupleDesc.getId().asInt());
// Need re compute memory layout after set some slot descriptor to nullable
srcTupleDesc.computeStatAndMemLayout();
for (Expr conjunct : preFilterConjuncts) {
params.addToPreFilterExprsList(conjunct.treeToThrift());
}
}
protected void checkBitmapCompatibility(Analyzer analyzer, SlotDescriptor slotDesc, Expr expr)
throws AnalysisException {
if (slotDesc.getColumn().getAggregationType() == AggregateType.BITMAP_UNION) {
expr.analyze(analyzer);
if (!expr.getType().isBitmapType()) {
String errorMsg = String.format("bitmap column %s require the function return type is BITMAP",
slotDesc.getColumn().getName());
throw new AnalysisException(errorMsg);
}
}
}
protected void checkQuantileStateCompatibility(Analyzer analyzer, SlotDescriptor slotDesc, Expr expr)
throws AnalysisException {
if (slotDesc.getColumn().getAggregationType() == AggregateType.QUANTILE_UNION) {
expr.analyze(analyzer);
if (!expr.getType().isQuantileStateType()) {
String errorMsg = "quantile_state column %s require the function return type is QUANTILE_STATE";
throw new AnalysisException(errorMsg);
}
}
}
}