TVFTableSink.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
import org.apache.doris.datasource.property.fileformat.OrcFileFormatProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TTVFTableSink;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* TVFTableSink is used for INSERT INTO tvf_name(properties) SELECT ...
* It writes query results to files via TVF (local/s3/hdfs).
*
* Property parsing reuses the same StorageProperties and FileFormatProperties
* infrastructure as the read-side TVF (SELECT * FROM s3/hdfs/local(...)).
*/
public class TVFTableSink extends DataSink {
private final PlanNodeId exchNodeId;
private final String tvfName;
private final Map<String, String> properties;
private final List<Column> cols;
private TDataSink tDataSink;
public TVFTableSink(PlanNodeId exchNodeId, String tvfName, Map<String, String> properties, List<Column> cols) {
this.exchNodeId = exchNodeId;
this.tvfName = tvfName;
this.properties = properties;
this.cols = cols;
}
public void bindDataSink() throws AnalysisException {
TTVFTableSink tSink = new TTVFTableSink();
tSink.setTvfName(tvfName);
// --- 1. Parse file format properties (reuse read-side FileFormatProperties) ---
// Make a mutable copy; FileFormatProperties.analyzeFileFormatProperties removes consumed keys.
Map<String, String> propsCopy = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
propsCopy.putAll(properties);
String formatStr = propsCopy.getOrDefault("format", "csv").toLowerCase();
propsCopy.remove("format");
// Also consume "compression_type" as alias for "compress_type" (write-side convention)
if (propsCopy.containsKey("compression_type") && !propsCopy.containsKey("compress_type")) {
propsCopy.put("compress_type", propsCopy.remove("compression_type"));
}
FileFormatProperties fileFormatProps = FileFormatProperties.createFileFormatProperties(formatStr);
fileFormatProps.analyzeFileFormatProperties(propsCopy, true);
TFileFormatType formatType = fileFormatProps.getFileFormatType();
if (!Util.isCsvFormat(formatType) && formatType != TFileFormatType.FORMAT_PARQUET
&& formatType != TFileFormatType.FORMAT_ORC) {
throw new AnalysisException("Unsupported format: " + formatType.name());
}
tSink.setFileFormat(formatType);
// Set file type based on TVF name
TFileType fileType = getFileType(tvfName);
tSink.setFileType(fileType);
// --- 2. Parse storage/connection properties (reuse read-side StorageProperties) ---
Map<String, String> backendConnectProps;
if (tvfName.equals("local")) {
// Local TVF: pass properties as-is (same as LocalProperties.getBackendConfigProperties)
backendConnectProps = new java.util.HashMap<>(propsCopy);
} else {
// S3/HDFS: use StorageProperties to normalize connection property keys
// (e.g. "s3.endpoint" -> "AWS_ENDPOINT", "hadoop.username" -> hadoop config)
try {
StorageProperties storageProps = StorageProperties.createPrimary(propsCopy);
backendConnectProps = storageProps.getBackendConfigProperties();
} catch (Exception e) {
throw new AnalysisException("Failed to parse storage properties: " + e.getMessage(), e);
}
}
String filePath = properties.get("file_path");
tSink.setFilePath(filePath);
// Set normalized properties for BE
tSink.setProperties(backendConnectProps);
// Set columns
List<TColumn> tColumns = new ArrayList<>();
for (Column col : cols) {
tColumns.add(col.toThrift());
}
tSink.setColumns(tColumns);
// --- 3. Set format-specific sink options ---
if (fileFormatProps instanceof CsvFileFormatProperties) {
CsvFileFormatProperties csvProps = (CsvFileFormatProperties) fileFormatProps;
csvProps.checkSupportedCompressionType(true);
tSink.setColumnSeparator(csvProps.getColumnSeparator());
tSink.setLineDelimiter(csvProps.getLineDelimiter());
tSink.setCompressionType(csvProps.getCompressionType());
} else if (fileFormatProps instanceof OrcFileFormatProperties) {
tSink.setCompressionType(((OrcFileFormatProperties) fileFormatProps).getOrcCompressionType());
}
// Parquet compression is handled by BE via parquet writer options
// --- 4. Set sink-specific options ---
// Max file size
String maxFileSizeStr = properties.get("max_file_size");
if (maxFileSizeStr != null) {
tSink.setMaxFileSizeBytes(Long.parseLong(maxFileSizeStr));
}
// Delete existing files is handled by FE (InsertIntoTVFCommand), always tell BE not to delete
tSink.setDeleteExistingFiles(false);
// Backend id for local TVF
String backendIdStr = properties.get("backend_id");
if (backendIdStr != null) {
tSink.setBackendId(Long.parseLong(backendIdStr));
}
// Set hadoop config for hdfs/s3 (BE uses this for file writer creation)
if (!tvfName.equals("local")) {
tSink.setHadoopConfig(backendConnectProps);
}
tDataSink = new TDataSink(TDataSinkType.TVF_TABLE_SINK);
tDataSink.setTvfTableSink(tSink);
}
private TFileType getFileType(String tvfName) throws AnalysisException {
switch (tvfName.toLowerCase()) {
case "local":
return TFileType.FILE_LOCAL;
case "s3":
return TFileType.FILE_S3;
case "hdfs":
return TFileType.FILE_HDFS;
default:
throw new AnalysisException("Unsupported TVF type: " + tvfName);
}
}
@Override
public String getExplainString(String prefix, TExplainLevel explainLevel) {
StringBuilder strBuilder = new StringBuilder();
strBuilder.append(prefix).append("TVF TABLE SINK\n");
strBuilder.append(prefix).append(" tvfName: ").append(tvfName).append("\n");
strBuilder.append(prefix).append(" filePath: ").append(properties.get("file_path")).append("\n");
strBuilder.append(prefix).append(" format: ").append(properties.getOrDefault("format", "csv")).append("\n");
return strBuilder.toString();
}
@Override
protected TDataSink toThrift() {
return tDataSink;
}
@Override
public DataPartition getOutputPartition() {
return null;
}
@Override
public PlanNodeId getExchNodeId() {
return exchNodeId;
}
}