InsertStmt.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSink;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import org.apache.commons.collections.MapUtils;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
* This is the unified abstract stmt for all load kinds of load in {@link LoadType}
* All contents of native InsertStmt is moved to {@link NativeInsertStmt}
* Currently this abstract class keep the native insert methods for compatibility, and will eventually be moved
* to {@link NativeInsertStmt}
*/
@Deprecated
public abstract class InsertStmt extends DdlStmt implements NotFallbackInParser {
public static class Properties {
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
public static final String TIMEZONE = "timezone";
public static final String STRICT_MODE = "strict_mode";
public static final String MAX_FILTER_RATIO = "max_filter_ratio";
public static final String SEND_BATCH_PARALLELISM = "send_batch_parallelism";
public static final String TIMEOUT_PROPERTY = "timeout";
public static final String LOAD_TO_SINGLE_TABLET = "load_to_single_tablet";
public static final String TRIM_DOUBLE_QUOTES = "trim_double_quotes";
public static final String SKIP_LINES = "skip_lines";
public static final String LINE_DELIMITER = "line_delimiter";
public static final String COLUMN_SEPARATOR = "column_separator";
// ------------------------ just for routine load ------------------------
public static final String MAX_BATCH_SIZE = "max_batch_size";
public static final String MAX_BATCH_ROWS = "max_batch_rows";
public static final String MAX_BATCH_INTERVAL = "max_batch_interval";
// -----------------------------------------------------------------------
// TODO: to be discovered in developing
}
protected LabelName label;
protected Map<String, String> properties;
protected List<Table> targetTables;
protected String comments;
/**
* TODO: change Function to Util.getXXXPropertyOrDefault()
*/
public static final ImmutableMap<String, Function<String, ?>> PROPERTIES_MAP =
new Builder<String, Function<String, ?>>()
.put(Properties.EXEC_MEM_LIMIT, (Function<String, Long>) Long::valueOf)
.put(Properties.TIMEOUT_PROPERTY, (Function<String, Long>) Long::valueOf)
.put(Properties.TIMEZONE, (Function<String, String>) input -> input)
.put(Properties.LOAD_TO_SINGLE_TABLET,
(Function<String, Boolean>) Boolean::parseBoolean)
.put(Properties.MAX_FILTER_RATIO, (Function<String, Double>) Double::parseDouble)
.put(Properties.SEND_BATCH_PARALLELISM,
(Function<String, Integer>) Integer::parseInt)
.put(Properties.STRICT_MODE, (Function<String, Boolean>) Boolean::parseBoolean)
.put(Properties.TRIM_DOUBLE_QUOTES, (Function<String, Boolean>) Boolean::parseBoolean)
.put(Properties.SKIP_LINES, (Function<String, Integer>) Integer::valueOf)
.put(Properties.LINE_DELIMITER, (Function<String, String>) String::valueOf)
.put(Properties.COLUMN_SEPARATOR, (Function<String, String>) String::valueOf)
.put(Properties.MAX_BATCH_SIZE, (Function<String, Long>) Long::parseLong)
.put(Properties.MAX_BATCH_ROWS, (Function<String, Integer>) Integer::valueOf)
.put(Properties.MAX_BATCH_INTERVAL, (Function<String, Long>) Long::valueOf)
.build();
public InsertStmt(LabelName label, Map<String, String> properties, String comment) {
this.label = label;
this.properties = properties;
this.comments = comment != null ? comment : "";
}
// ---------------------------- for old insert stmt ----------------------------
public boolean isValuesOrConstantSelect() {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public Table getTargetTable() {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public void setTargetTable(Table targetTable) {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public long getTransactionId() {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public Boolean isRepartition() {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public String getDbName() {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public String getTbl() {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public void getTables(Analyzer analyzer, Map<Long, TableIf> tableMap, Set<String> parentViewNameSet)
throws AnalysisException {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public QueryStmt getQueryStmt() {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public void setQueryStmt(QueryStmt queryStmt) {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public String getLabel() {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public DataSink getDataSink() {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public DatabaseIf getDbObj() {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public boolean isTransactionBegin() {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public void prepareExpressions() throws UserException {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public void complete() throws UserException {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
public DataPartition getDataPartition() {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
// ---------------------------------------------------------------------------
// ------------------------- for unified insert stmt -------------------------
public boolean needLoadManager() {
return getLoadType() != LoadType.NATIVE_INSERT && getLoadType() != LoadType.UNKNOWN;
}
public LabelName getLoadLabel() {
return label;
}
/**
* for multi-tables load, we need have several target tbl
*
* @return all target table names
*/
public List<Table> getTargetTableList() {
return targetTables;
}
public abstract List<? extends DataDesc> getDataDescList();
public abstract ResourceDesc getResourceDesc();
public abstract LoadType getLoadType();
public Map<String, String> getProperties() {
return properties;
}
public String getComments() {
return comments;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
analyzeProperties();
}
protected void analyzeProperties() throws DdlException {
checkProperties();
}
/**
* TODO(tsy): find a shorter way to check props
*/
private void checkProperties() throws DdlException {
if (MapUtils.isEmpty(properties)) {
return;
}
for (Entry<String, String> entry : properties.entrySet()) {
if (!InsertStmt.PROPERTIES_MAP.containsKey(entry.getKey())) {
throw new DdlException(entry.getKey() + " is invalid property");
}
}
// exec mem
final String execMemProperty = properties.get(InsertStmt.Properties.EXEC_MEM_LIMIT);
if (execMemProperty != null) {
try {
final long execMem = Long.parseLong(execMemProperty);
if (execMem <= 0) {
throw new DdlException(InsertStmt.Properties.EXEC_MEM_LIMIT + " must be greater than 0");
}
} catch (NumberFormatException e) {
throw new DdlException(InsertStmt.Properties.EXEC_MEM_LIMIT + " is not a number.");
}
}
// timeout
final String timeoutLimitProperty = properties.get(InsertStmt.Properties.TIMEOUT_PROPERTY);
if (timeoutLimitProperty != null) {
try {
final int timeoutLimit = Integer.parseInt(timeoutLimitProperty);
if (timeoutLimit < 0) {
throw new DdlException(InsertStmt.Properties.TIMEOUT_PROPERTY + " must be greater than 0");
}
} catch (NumberFormatException e) {
throw new DdlException(InsertStmt.Properties.TIMEOUT_PROPERTY + " is not a number.");
}
}
// max filter ratio
final String maxFilterRadioProperty = properties.get(Properties.MAX_FILTER_RATIO);
if (maxFilterRadioProperty != null) {
try {
double maxFilterRatio = Double.parseDouble(maxFilterRadioProperty);
if (maxFilterRatio < 0.0 || maxFilterRatio > 1.0) {
throw new DdlException(Properties.MAX_FILTER_RATIO + " must between 0.0 and 1.0.");
}
} catch (NumberFormatException e) {
throw new DdlException(Properties.MAX_FILTER_RATIO + " is not a number.");
}
}
// strict mode
final String strictModeProperty = properties.get(Properties.STRICT_MODE);
if (strictModeProperty != null) {
if (!strictModeProperty.equalsIgnoreCase("true")
&& !strictModeProperty.equalsIgnoreCase("false")) {
throw new DdlException(Properties.STRICT_MODE + " is not a boolean");
}
}
// time zone
final String timezone = properties.get(Properties.TIMEZONE);
if (timezone != null) {
properties.put(Properties.TIMEZONE, TimeUtils.checkTimeZoneValidAndStandardize(
properties.getOrDefault(LoadStmt.TIMEZONE, TimeUtils.DEFAULT_TIME_ZONE)));
}
// send batch parallelism
final String sendBatchParallelism = properties.get(Properties.SEND_BATCH_PARALLELISM);
if (sendBatchParallelism != null) {
try {
final int sendBatchParallelismValue = Integer.parseInt(sendBatchParallelism);
if (sendBatchParallelismValue < 1) {
throw new DdlException(Properties.SEND_BATCH_PARALLELISM + " must be greater than 0");
}
} catch (NumberFormatException e) {
throw new DdlException(Properties.SEND_BATCH_PARALLELISM + " is not a number.");
}
}
}
public NativeInsertStmt getNativeInsertStmt() {
throw new UnsupportedOperationException("only invoked in NativeInsertStmt");
}
/**
* TODO: unify the data_desc
* the unique entrance for data_desc
*/
interface DataDesc {
String toSql();
}
@Override
public StmtType stmtType() {
return StmtType.INSERT;
}
}