MysqlLoadCommand.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.load;
import org.apache.doris.analysis.StmtType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.LoadJobRowResult;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.NoForward;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
// LOAD command, load files into tables.
//
// syntax:
// LOAD mysqlDataDesc
// [PROPERTIES (key1=value1, )]
// [commentSpec]
//
// mysqlDataDesc:
// DATA [ LOCAL ]
// INFILE "<file_name>"
// INTO TABLE "<tbl_name>"
// [ PARTITION (<partition_name> [, ... ]) ]
// [ COLUMNS TERMINATED BY "<column_separator>" ]
// [ LINES TERMINATED BY "<line_delimiter>" ]
// [ IGNORE <number> {LINES | ROWS} ]
// [ (<col_name_or_user_var> [, ... ] ) ]
// [ SET (col_name={<expr> | DEFAULT} [, col_name={<expr> | DEFAULT}] ...) ]
// [ PROPERTIES ("<key>" = "<value>" [ , ... ]) ]
// commentSpec:
// COMMENT ...
/**
* MysqlLoadCommand
*/
public class MysqlLoadCommand extends Command implements NoForward {
public static final String TIMEOUT_PROPERTY = "timeout";
public static final String EXEC_MEM_LIMIT_PROPERTY = "exec_mem_limit";
public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio";
public static final String STRICT_MODE_PROPERTY = "strict_mode";
public static final String TIMEZONE_PROPERTY = "timezone";
public static final String ENCLOSE_PROPERTY = "enclose";
public static final String ESCAPE_PROPERTY = "escape";
public static final String TRIM_DOUBLE_QUOTES_PROPERTY = "trim_double_quotes";
public static final String KEY_SKIP_LINES = "skip_lines";
public static final String KEY_IN_PARAM_COLUMN_SEPARATOR = "column_separator";
public static final String KEY_IN_PARAM_LINE_DELIMITER = "line_delimiter";
public static final String KEY_IN_PARAM_COLUMNS = "columns";
public static final String KEY_IN_PARAM_TEMP_PARTITIONS = "temporary_partitions";
public static final String KEY_IN_PARAM_PARTITIONS = "partitions";
public static final String KEY_CLOUD_CLUSTER = "cloud_cluster";
private static final Logger LOG = LogManager.getLogger(MysqlLoadCommand.class);
private static final ImmutableMap<String, Function> PROPERTIES_MAP = new ImmutableMap.Builder<String, Function>()
.put(TIMEOUT_PROPERTY, new Function<String, Long>() {
@Override
public @Nullable Long apply(@Nullable String s) {
return Long.valueOf(s);
}
})
.put(MAX_FILTER_RATIO_PROPERTY, new Function<String, Double>() {
@Override
public @Nullable Double apply(@Nullable String s) {
return Double.valueOf(s);
}
})
.put(EXEC_MEM_LIMIT_PROPERTY, new Function<String, Long>() {
@Override
public @Nullable Long apply(@Nullable String s) {
return Long.valueOf(s);
}
})
.put(STRICT_MODE_PROPERTY, new Function<String, Boolean>() {
@Override
public @Nullable Boolean apply(@Nullable String s) {
return Boolean.valueOf(s);
}
})
.put(TIMEZONE_PROPERTY, new Function<String, String>() {
@Override
public @Nullable String apply(@Nullable String s) {
return s;
}
})
.put(TRIM_DOUBLE_QUOTES_PROPERTY, new Function<String, Boolean>() {
@Override
public @Nullable Boolean apply(@Nullable String s) {
return Boolean.valueOf(s);
}
})
.put(ENCLOSE_PROPERTY, new Function<String, String>() {
@Override
public @Nullable String apply(@Nullable String s) {
return s;
}
})
.put(ESCAPE_PROPERTY, new Function<String, String>() {
@Override
public @Nullable String apply(@Nullable String s) {
return s;
}
})
.build();
private final MysqlDataDescription mysqlDataDescription;
private final Map<String, String> properties;
private final EtlJobType etlJobType = EtlJobType.LOCAL_FILE;
private final String comment;
/**
* MysqlLoadCommand
*/
public MysqlLoadCommand(MysqlDataDescription mysqlDataDescription, Map<String, String> properties, String comment) {
super(PlanType.MYSQL_LOAD_COMMAND);
Objects.requireNonNull(mysqlDataDescription, "mysqlDataDescription is null");
Objects.requireNonNull(properties, "properties is null");
Objects.requireNonNull(comment, "comment is null");
this.mysqlDataDescription = mysqlDataDescription;
this.properties = properties;
this.comment = comment;
}
public MysqlDataDescription getMysqlDataDescription() {
return mysqlDataDescription;
}
public Map<String, String> getProperties() {
return properties;
}
public EtlJobType getEtlJobType() {
return etlJobType;
}
public String getComment() {
return comment;
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
validate(ctx);
handleMysqlLoadComand(ctx);
}
/**
* validate
*/
public void validate(ConnectContext ctx) throws UserException, IOException {
if (mysqlDataDescription == null) {
throw new AnalysisException("No data file in mysql load command.");
}
// check data descriptions, only support one file path:
String fullDbName = mysqlDataDescription.analyzeFullDbName(ctx);
mysqlDataDescription.analyze(fullDbName);
if (!mysqlDataDescription.isClientLocal()) {
for (String path : mysqlDataDescription.getFilePaths()) {
if (Config.mysql_load_server_secure_path.isEmpty()) {
throw new AnalysisException("Load local data from fe local is not enabled. If you want to use it,"
+ " please set the `mysql_load_server_secure_path` for FE to be a right path.");
} else {
File file = new File(path);
if (!file.getCanonicalPath().startsWith(Config.mysql_load_server_secure_path)) {
throw new AnalysisException("Local file should be under the secure path of FE.");
}
if (!file.exists()) {
throw new AnalysisException("File: " + path + " is not exists.");
}
}
}
}
try {
checkProperties(properties);
} catch (DdlException e) {
throw new AnalysisException(e.getMessage());
}
}
private void handleMysqlLoadComand(ConnectContext ctx) {
try {
LoadManager loadManager = ctx.getEnv().getLoadManager();
if (!ctx.getCapability().supportClientLocalFile()) {
ctx.getState().setError(ErrorCode.ERR_NOT_ALLOWED_COMMAND, "This client is not support"
+ " to load client local file.");
return;
}
String loadId = UUID.randomUUID().toString();
LoadJobRowResult submitResult = loadManager.getMysqlLoadManager()
.executeMySqlLoadJob(ctx, mysqlDataDescription, loadId);
ctx.getState().setOk(submitResult.getRecords(), submitResult.getWarnings(),
submitResult.toString());
} catch (UserException e) {
// Return message to info client what happened.
if (LOG.isDebugEnabled()) {
LOG.debug("DDL statement({}) process failed.", toSql(), e);
}
ctx.getState().setError(e.getMysqlErrorCode(), e.getMessage());
} catch (Exception e) {
// Maybe our bug
LOG.warn("DDL statement(" + toSql() + ") process failed.", e);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
}
}
/**
* checkProperties
*/
public static void checkProperties(Map<String, String> properties) throws DdlException {
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (!PROPERTIES_MAP.containsKey(entry.getKey())) {
throw new DdlException(entry.getKey() + " is invalid property");
}
}
// exec mem
final String execMemProperty = properties.get(EXEC_MEM_LIMIT_PROPERTY);
if (execMemProperty != null) {
try {
final long execMem = Long.valueOf(execMemProperty);
if (execMem <= 0) {
throw new DdlException(EXEC_MEM_LIMIT_PROPERTY + " must be greater than 0");
}
} catch (NumberFormatException e) {
throw new DdlException(EXEC_MEM_LIMIT_PROPERTY + " is not a number.");
}
}
// timeout
final String timeoutLimitProperty = properties.get(TIMEOUT_PROPERTY);
if (timeoutLimitProperty != null) {
try {
final int timeoutLimit = Integer.valueOf(timeoutLimitProperty);
if (timeoutLimit < 0) {
throw new DdlException(TIMEOUT_PROPERTY + " must be greater than 0");
}
} catch (NumberFormatException e) {
throw new DdlException(TIMEOUT_PROPERTY + " is not a number.");
}
}
// max filter ratio
final String maxFilterRadioProperty = properties.get(MAX_FILTER_RATIO_PROPERTY);
if (maxFilterRadioProperty != null) {
try {
double maxFilterRatio = Double.valueOf(maxFilterRadioProperty);
if (maxFilterRatio < 0.0 || maxFilterRatio > 1.0) {
throw new DdlException(MAX_FILTER_RATIO_PROPERTY + " must between 0.0 and 1.0.");
}
} catch (NumberFormatException e) {
throw new DdlException(MAX_FILTER_RATIO_PROPERTY + " is not a number.");
}
}
// strict mode
final String strictModeProperty = properties.get(STRICT_MODE_PROPERTY);
if (strictModeProperty != null) {
if (!strictModeProperty.equalsIgnoreCase("true")
&& !strictModeProperty.equalsIgnoreCase("false")) {
throw new DdlException(STRICT_MODE_PROPERTY + " is not a boolean");
}
}
// time zone
final String timezone = properties.get(TIMEZONE_PROPERTY);
if (timezone != null) {
properties.put(TIMEZONE_PROPERTY, TimeUtils.checkTimeZoneValidAndStandardize(
properties.getOrDefault(TIMEZONE_PROPERTY, TimeUtils.DEFAULT_TIME_ZONE)));
}
}
/**
* toSql
*/
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("LOAD").append("\n");
sb.append("(");
sb.append(mysqlDataDescription.toSql()).append(")");
if (!properties.isEmpty()) {
sb.append("\nPROPERTIES (");
sb.append(new PrintableMap<>(properties, "=", true, false, true));
sb.append(")");
}
return sb.toString();
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitMysqlLoadCommand(this, context);
}
@Override
public StmtType stmtType() {
return StmtType.LOAD;
}
}