NereidsRoutineLoadTaskInfo.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.load;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.Separator;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import com.google.common.base.Strings;
import java.util.List;
import java.util.Map;
/**
* NereidsRoutineLoadTaskInfo
*/
public class NereidsRoutineLoadTaskInfo implements NereidsLoadTaskInfo {
private static final String PROPS_FORMAT = "format";
private static final String PROPS_STRIP_OUTER_ARRAY = "strip_outer_array";
private static final String PROPS_NUM_AS_STRING = "num_as_string";
private static final String PROPS_JSONPATHS = "jsonpaths";
private static final String PROPS_JSONROOT = "json_root";
private static final String PROPS_FUZZY_PARSE = "fuzzy_parse";
private static final boolean DEFAULT_STRICT_MODE = false;
protected long execMemLimit;
protected Map<String, String> jobProperties;
protected long maxBatchIntervalS;
protected PartitionNames partitions;
protected LoadTask.MergeType mergeType;
protected Expression deleteCondition;
protected String sequenceCol;
protected double maxFilterRatio;
protected NereidsImportColumnDescs columnDescs;
protected Expression precedingFilter;
protected Expression whereExpr;
protected Separator columnSeparator;
protected Separator lineDelimiter;
protected byte enclose;
protected byte escape;
protected int sendBatchParallelism;
protected boolean loadToSingleTablet;
protected boolean isPartialUpdate;
protected boolean memtableOnSinkNode;
/**
* NereidsRoutineLoadTaskInfo
*/
public NereidsRoutineLoadTaskInfo(long execMemLimit, Map<String, String> jobProperties, long maxBatchIntervalS,
PartitionNames partitions, LoadTask.MergeType mergeType, Expression deleteCondition,
String sequenceCol, double maxFilterRatio, NereidsImportColumnDescs columnDescs,
Expression precedingFilter, Expression whereExpr, Separator columnSeparator,
Separator lineDelimiter, byte enclose, byte escape, int sendBatchParallelism,
boolean loadToSingleTablet, boolean isPartialUpdate, boolean memtableOnSinkNode) {
this.execMemLimit = execMemLimit;
this.jobProperties = jobProperties;
this.maxBatchIntervalS = maxBatchIntervalS;
this.partitions = partitions;
this.mergeType = mergeType;
this.deleteCondition = deleteCondition;
this.sequenceCol = sequenceCol;
this.maxFilterRatio = maxFilterRatio;
this.columnDescs = columnDescs;
this.precedingFilter = precedingFilter;
this.whereExpr = whereExpr;
this.columnSeparator = columnSeparator;
this.lineDelimiter = lineDelimiter;
this.enclose = enclose;
this.escape = escape;
this.sendBatchParallelism = sendBatchParallelism;
this.loadToSingleTablet = loadToSingleTablet;
this.isPartialUpdate = isPartialUpdate;
this.memtableOnSinkNode = memtableOnSinkNode;
}
@Override
public boolean getNegative() {
return false;
}
@Override
public long getTxnId() {
return -1L;
}
@Override
public int getTimeout() {
int timeoutSec = (int) maxBatchIntervalS * Config.routine_load_task_timeout_multiplier;
int realTimeoutSec = timeoutSec < Config.routine_load_task_min_timeout_sec
? Config.routine_load_task_min_timeout_sec : timeoutSec;
return realTimeoutSec;
}
@Override
public long getMemLimit() {
return execMemLimit;
}
@Override
public String getTimezone() {
String value = jobProperties.get(LoadStmt.TIMEZONE);
if (value == null) {
return TimeUtils.DEFAULT_TIME_ZONE;
}
return value;
}
@Override
public PartitionNames getPartitions() {
return partitions;
}
@Override
public LoadTask.MergeType getMergeType() {
return mergeType;
}
@Override
public Expression getDeleteCondition() {
return deleteCondition;
}
@Override
public boolean hasSequenceCol() {
return !Strings.isNullOrEmpty(sequenceCol);
}
@Override
public String getSequenceCol() {
return sequenceCol;
}
@Override
public TFileType getFileType() {
return TFileType.FILE_STREAM;
}
@Override
public TFileFormatType getFormatType() {
TFileFormatType fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
if (getFormat().equals("json")) {
fileFormatType = TFileFormatType.FORMAT_JSON;
}
return fileFormatType;
}
@Override
public TFileCompressType getCompressType() {
return TFileCompressType.PLAIN;
}
@Override
public String getJsonPaths() {
String value = jobProperties.get(PROPS_JSONPATHS);
if (value == null) {
return "";
}
return value;
}
@Override
public String getJsonRoot() {
String value = jobProperties.get(PROPS_JSONROOT);
if (value == null) {
return "";
}
return value;
}
@Override
public boolean isStripOuterArray() {
return Boolean.parseBoolean(jobProperties.get(PROPS_STRIP_OUTER_ARRAY));
}
@Override
public boolean isFuzzyParse() {
return Boolean.parseBoolean(jobProperties.get(PROPS_FUZZY_PARSE));
}
@Override
public boolean isNumAsString() {
return Boolean.parseBoolean(jobProperties.get(PROPS_NUM_AS_STRING));
}
@Override
public boolean isReadJsonByLine() {
return false;
}
@Override
public String getPath() {
return null;
}
@Override
public double getMaxFilterRatio() {
return maxFilterRatio;
}
@Override
public NereidsImportColumnDescs getColumnExprDescs() {
if (columnDescs == null) {
return new NereidsImportColumnDescs();
}
return columnDescs;
}
@Override
public boolean isStrictMode() {
String value = jobProperties.get(LoadStmt.STRICT_MODE);
if (value == null) {
return DEFAULT_STRICT_MODE;
}
return Boolean.parseBoolean(value);
}
@Override
public Expression getPrecedingFilter() {
return precedingFilter;
}
@Override
public Expression getWhereExpr() {
return whereExpr;
}
@Override
public Separator getColumnSeparator() {
return columnSeparator;
}
@Override
public Separator getLineDelimiter() {
return lineDelimiter;
}
@Override
public byte getEnclose() {
return enclose;
}
@Override
public byte getEscape() {
return escape;
}
@Override
public int getSendBatchParallelism() {
return sendBatchParallelism;
}
@Override
public boolean isLoadToSingleTablet() {
return loadToSingleTablet;
}
@Override
public String getHeaderType() {
return "";
}
@Override
public List<String> getHiddenColumns() {
return null;
}
@Override
public boolean isFixedPartialUpdate() {
return isPartialUpdate;
}
@Override
public boolean isMemtableOnSinkNode() {
return memtableOnSinkNode;
}
private String getFormat() {
String value = jobProperties.get(PROPS_FORMAT);
if (value == null) {
return "csv";
}
return value;
}
}