InsertIntoTVFCommand.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFTableSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
/**
* Command for INSERT INTO tvf_name(properties) SELECT ...
* This command is independent from InsertIntoTableCommand since TVF sink
* has no real table, no transaction, and no table lock.
*/
public class InsertIntoTVFCommand extends Command implements ForwardWithSync, Explainable {
private static final Logger LOG = LogManager.getLogger(InsertIntoTVFCommand.class);
private final LogicalPlan logicalQuery;
private final Optional<String> labelName;
private final Optional<LogicalPlan> cte;
public InsertIntoTVFCommand(LogicalPlan logicalQuery, Optional<String> labelName,
Optional<LogicalPlan> cte) {
super(PlanType.INSERT_INTO_TVF_COMMAND);
this.logicalQuery = logicalQuery;
this.labelName = labelName;
this.cte = cte;
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
// 1. Check privilege
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ctx, PrivPredicate.ADMIN)
&& !Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ctx, PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"INSERT INTO TVF requires ADMIN or LOAD privilege");
}
// 2. Prepare the plan
LogicalPlan plan = logicalQuery;
if (cte.isPresent()) {
plan = (LogicalPlan) cte.get().withChildren(plan);
}
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(plan, ctx.getStatementContext());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
executor.setPlanner(planner);
executor.checkBlockRules();
// FE-side deletion of existing files (before BE execution)
PhysicalPlan physicalPlan = planner.getPhysicalPlan();
if (physicalPlan instanceof PhysicalTVFTableSink) {
PhysicalTVFTableSink<?> tvfSink = (PhysicalTVFTableSink<?>) physicalPlan;
String sinkTvfName = tvfSink.getTvfName();
Map<String, String> sinkProps = tvfSink.getProperties();
boolean deleteExisting = Boolean.parseBoolean(
sinkProps.getOrDefault("delete_existing_files", "false"));
if (deleteExisting && !"local".equals(sinkTvfName)) {
deleteExistingFilesInFE(sinkTvfName, sinkProps);
}
}
if (ctx.getConnectType() == ConnectType.MYSQL && ctx.getMysqlChannel() != null) {
ctx.getMysqlChannel().reset();
}
// 3. Create coordinator
Coordinator coordinator = EnvFactory.getInstance().createCoordinator(
ctx, planner, ctx.getStatsErrorEstimator());
TUniqueId queryId = ctx.queryId();
QeProcessorImpl.INSTANCE.registerQuery(queryId,
new QueryInfo(ctx, "INSERT INTO TVF", coordinator));
try {
coordinator.exec();
// Wait for completion
int timeoutS = ctx.getExecTimeoutS();
if (coordinator.join(timeoutS)) {
if (!coordinator.isDone()) {
coordinator.cancel(new Status(TStatusCode.INTERNAL_ERROR, "Insert into TVF timeout"));
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Insert into TVF timeout");
return;
}
}
if (coordinator.getExecStatus().ok()) {
String label = labelName.orElse(
String.format("tvf_insert_%x_%x", ctx.queryId().hi, ctx.queryId().lo));
long loadedRows = 0;
String loadedRowsStr = coordinator.getLoadCounters()
.get(LoadEtlTask.DPP_NORMAL_ALL);
if (loadedRowsStr != null) {
loadedRows = Long.parseLong(loadedRowsStr);
}
ctx.getState().setOk(loadedRows, 0, "Insert into TVF succeeded. label: " + label);
} else {
String errMsg = coordinator.getExecStatus().getErrorMsg();
LOG.warn("insert into TVF failed, error: {}", errMsg);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, errMsg);
}
} catch (Exception e) {
LOG.warn("insert into TVF failed", e);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
e.getMessage() == null ? "unknown error" : e.getMessage());
} finally {
QeProcessorImpl.INSTANCE.unregisterQuery(queryId);
coordinator.close();
}
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitCommand(this, context);
}
@Override
public Plan getExplainPlan(ConnectContext ctx) {
return this.logicalQuery;
}
private void deleteExistingFilesInFE(String tvfName, Map<String, String> props)
throws Exception {
String filePath = props.get("file_path");
// Extract parent directory from prefix path: s3://bucket/path/to/prefix_ -> s3://bucket/path/to/
String parentDir = extractParentDirectory(filePath);
LOG.info("TVF sink: deleting existing files in directory: {}", parentDir);
// Copy props for building StorageProperties (exclude write-specific params)
Map<String, String> fsCopyProps = new HashMap<>(props);
fsCopyProps.remove("file_path");
fsCopyProps.remove("format");
fsCopyProps.remove("delete_existing_files");
fsCopyProps.remove("max_file_size");
fsCopyProps.remove("column_separator");
fsCopyProps.remove("line_delimiter");
fsCopyProps.remove("compression_type");
fsCopyProps.remove("compress_type");
StorageProperties storageProps = StorageProperties.createPrimary(fsCopyProps);
RemoteFileSystem fs = FileSystemFactory.get(storageProps);
org.apache.doris.backup.Status deleteStatus = fs.deleteDirectory(parentDir);
if (!deleteStatus.ok()) {
throw new UserException("Failed to delete existing files in "
+ parentDir + ": " + deleteStatus.getErrMsg());
}
}
private static String extractParentDirectory(String prefixPath) {
int lastSlash = prefixPath.lastIndexOf('/');
if (lastSlash >= 0) {
return prefixPath.substring(0, lastSlash + 1);
}
return prefixPath;
}
}