PluginDrivenTableSink.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.connector.api.write.ConnectorWriteConfig;
import org.apache.doris.connector.api.write.ConnectorWriteType;
import org.apache.doris.datasource.PluginDrivenExternalTable;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.THiveColumn;
import org.apache.doris.thrift.THiveColumnType;
import org.apache.doris.thrift.THiveLocationParams;
import org.apache.doris.thrift.THiveTableSink;
import org.apache.doris.thrift.TJdbcTable;
import org.apache.doris.thrift.TJdbcTableSink;
import org.apache.doris.thrift.TOdbcTableType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* Generic data sink for plugin-driven external tables.
*
* <p>Extends {@link BaseExternalTableDataSink} and constructs the appropriate
* Thrift {@link TDataSink} based on {@link ConnectorWriteConfig} obtained from
* the connector SPI. This allows different connector plugins to produce their
* write configuration without knowing Thrift types, while the engine handles
* the Thrift serialization.</p>
*
* <p>Supported write types and their Thrift mappings:</p>
* <ul>
* <li>{@link ConnectorWriteType#FILE_WRITE} ��� {@link TDataSinkType#HIVE_TABLE_SINK}</li>
* <li>{@link ConnectorWriteType#JDBC_WRITE} ��� {@link TDataSinkType#JDBC_TABLE_SINK}</li>
* <li>Others ��� determined by per-connector migration</li>
* </ul>
*/
public class PluginDrivenTableSink extends BaseExternalTableDataSink {
private static final Logger LOG = LogManager.getLogger(PluginDrivenTableSink.class);
// Well-known property keys in ConnectorWriteConfig.properties
public static final String PROP_DB_NAME = "db_name";
public static final String PROP_TABLE_NAME = "table_name";
public static final String PROP_OVERWRITE = "overwrite";
public static final String PROP_WRITE_PATH = "write_path";
public static final String PROP_TARGET_PATH = "target_path";
public static final String PROP_ORIGINAL_WRITE_PATH = "original_write_path";
// JDBC-specific property keys
public static final String PROP_JDBC_URL = "jdbc_url";
public static final String PROP_JDBC_USER = "jdbc_user";
public static final String PROP_JDBC_PASSWORD = "jdbc_password";
public static final String PROP_JDBC_DRIVER_URL = "jdbc_driver_url";
public static final String PROP_JDBC_DRIVER_CLASS = "jdbc_driver_class";
public static final String PROP_JDBC_DRIVER_CHECKSUM = "jdbc_driver_checksum";
public static final String PROP_JDBC_TABLE_NAME = "jdbc_table_name";
public static final String PROP_JDBC_RESOURCE_NAME = "jdbc_resource_name";
public static final String PROP_JDBC_TABLE_TYPE = "jdbc_table_type";
public static final String PROP_JDBC_INSERT_SQL = "jdbc_insert_sql";
public static final String PROP_JDBC_USE_TRANSACTION = "jdbc_use_transaction";
public static final String PROP_JDBC_CATALOG_ID = "jdbc_catalog_id";
public static final String PROP_JDBC_POOL_MIN = "connection_pool_min_size";
public static final String PROP_JDBC_POOL_MAX = "connection_pool_max_size";
public static final String PROP_JDBC_POOL_MAX_WAIT = "connection_pool_max_wait_time";
public static final String PROP_JDBC_POOL_MAX_LIFE = "connection_pool_max_life_time";
public static final String PROP_JDBC_POOL_KEEP_ALIVE = "connection_pool_keep_alive";
private final PluginDrivenExternalTable targetTable;
private final ConnectorWriteConfig writeConfig;
public PluginDrivenTableSink(PluginDrivenExternalTable targetTable,
ConnectorWriteConfig writeConfig) {
super();
this.targetTable = targetTable;
this.writeConfig = writeConfig;
}
@Override
protected Set<TFileFormatType> supportedFileFormatTypes() {
// Connector determines format through write config; accept all
return EnumSet.allOf(TFileFormatType.class);
}
@Override
public String getExplainString(String prefix, TExplainLevel explainLevel) {
StringBuilder sb = new StringBuilder();
sb.append(prefix).append("PLUGIN-DRIVEN TABLE SINK\n");
if (explainLevel == TExplainLevel.BRIEF) {
return sb.toString();
}
sb.append(prefix).append(" WRITE TYPE: ").append(writeConfig.getWriteType()).append("\n");
sb.append(prefix).append(" TABLE: ").append(targetTable.getName()).append("\n");
if (writeConfig.getWriteType() == ConnectorWriteType.JDBC_WRITE) {
Map<String, String> props = writeConfig.getProperties();
sb.append(prefix).append(" TABLE TYPE: ")
.append(props.getOrDefault(PROP_JDBC_TABLE_TYPE, "")).append("\n");
sb.append(prefix).append(" INSERT SQL: ")
.append(props.getOrDefault(PROP_JDBC_INSERT_SQL, "")).append("\n");
sb.append(prefix).append(" USE TRANSACTION: ")
.append(props.getOrDefault(PROP_JDBC_USE_TRANSACTION, "false")).append("\n");
} else {
if (writeConfig.getFileFormat() != null) {
sb.append(prefix).append(" FORMAT: ").append(writeConfig.getFileFormat()).append("\n");
}
if (writeConfig.getWriteLocation() != null) {
sb.append(prefix).append(" LOCATION: ").append(writeConfig.getWriteLocation()).append("\n");
}
}
return sb.toString();
}
@Override
public void bindDataSink(Optional<InsertCommandContext> insertCtx)
throws AnalysisException {
ConnectorWriteType writeType = writeConfig.getWriteType();
switch (writeType) {
case FILE_WRITE:
bindFileWriteSink(insertCtx);
break;
case JDBC_WRITE:
bindJdbcWriteSink(insertCtx);
break;
default:
throw new AnalysisException(
"Unsupported write type for plugin-driven sink: " + writeType);
}
}
/**
* Returns the write config associated with this sink.
* Used by the insert executor to access connector write configuration.
*/
public ConnectorWriteConfig getWriteConfig() {
return writeConfig;
}
/**
* Returns the target table.
*/
public PluginDrivenExternalTable getTargetTable() {
return targetTable;
}
/**
* Builds a THiveTableSink for file-based writes.
*
* <p>BE's Hive table sink is the generic file writer that handles
* Parquet/ORC/Text output. Connectors provide all necessary
* configuration through {@link ConnectorWriteConfig}.</p>
*/
private void bindFileWriteSink(Optional<InsertCommandContext> insertCtx)
throws AnalysisException {
Map<String, String> props = writeConfig.getProperties();
THiveTableSink tSink = new THiveTableSink();
// DB and table names
tSink.setDbName(props.getOrDefault(PROP_DB_NAME, targetTable.getDbName()));
tSink.setTableName(props.getOrDefault(PROP_TABLE_NAME, targetTable.getName()));
// Columns: build from target table schema + partition info from write config
Set<String> partNames = new HashSet<>(writeConfig.getPartitionColumns());
List<Column> allColumns = targetTable.getColumns();
List<THiveColumn> targetColumns = new ArrayList<>();
for (Column col : allColumns) {
THiveColumn tHiveColumn = new THiveColumn();
tHiveColumn.setName(col.getName());
tHiveColumn.setColumnType(
partNames.contains(col.getName())
? THiveColumnType.PARTITION_KEY
: THiveColumnType.REGULAR);
targetColumns.add(tHiveColumn);
}
tSink.setColumns(targetColumns);
// File format
if (writeConfig.getFileFormat() != null) {
TFileFormatType formatType = getTFileFormatType(writeConfig.getFileFormat());
tSink.setFileFormat(formatType);
}
// Compression
if (writeConfig.getCompression() != null) {
tSink.setCompressionType(getTFileCompressType(writeConfig.getCompression()));
}
// Location
String writePath = props.getOrDefault(PROP_WRITE_PATH, writeConfig.getWriteLocation());
String targetPath = props.getOrDefault(PROP_TARGET_PATH, writeConfig.getWriteLocation());
if (writePath != null) {
THiveLocationParams locationParams = new THiveLocationParams();
locationParams.setWritePath(writePath);
locationParams.setOriginalWritePath(
props.getOrDefault(PROP_ORIGINAL_WRITE_PATH, writePath));
locationParams.setTargetPath(targetPath);
LocationPath locationPath = LocationPath.of(targetPath,
targetTable.getCatalog().getCatalogProperty().getStoragePropertiesMap());
TFileType fileType = locationPath.getTFileTypeForBE();
locationParams.setFileType(fileType);
tSink.setLocation(locationParams);
if (fileType.equals(TFileType.FILE_BROKER)) {
tSink.setBrokerAddresses(
getBrokerAddresses(targetTable.getCatalog().bindBrokerName()));
}
}
// Overwrite flag
if (props.containsKey(PROP_OVERWRITE)) {
tSink.setOverwrite(Boolean.parseBoolean(props.get(PROP_OVERWRITE)));
}
// Hadoop/storage config for BE access
Map<String, String> beStorageProps = targetTable.getCatalog()
.getCatalogProperty().getBackendStorageProperties();
tSink.setHadoopConfig(beStorageProps);
// Any extra connector-specific properties: pass through via hadoop_config
for (Map.Entry<String, String> entry : props.entrySet()) {
String key = entry.getKey();
if (!isWellKnownProperty(key)) {
tSink.putToHadoopConfig(key, entry.getValue());
}
}
tDataSink = new TDataSink(TDataSinkType.HIVE_TABLE_SINK);
tDataSink.setHiveTableSink(tSink);
}
/**
* Builds a TJdbcTableSink for JDBC-based writes.
*/
private void bindJdbcWriteSink(Optional<InsertCommandContext> insertCtx)
throws AnalysisException {
Map<String, String> props = writeConfig.getProperties();
TJdbcTableSink jdbcSink = new TJdbcTableSink();
TJdbcTable tJdbcTable = new TJdbcTable();
tJdbcTable.setJdbcUrl(props.getOrDefault(PROP_JDBC_URL, ""));
tJdbcTable.setJdbcUser(props.getOrDefault(PROP_JDBC_USER, ""));
tJdbcTable.setJdbcPassword(props.getOrDefault(PROP_JDBC_PASSWORD, ""));
tJdbcTable.setJdbcDriverUrl(props.getOrDefault(PROP_JDBC_DRIVER_URL, ""));
tJdbcTable.setJdbcDriverClass(props.getOrDefault(PROP_JDBC_DRIVER_CLASS, ""));
tJdbcTable.setJdbcDriverChecksum(props.getOrDefault(PROP_JDBC_DRIVER_CHECKSUM, ""));
tJdbcTable.setJdbcTableName(props.getOrDefault(PROP_JDBC_TABLE_NAME, ""));
tJdbcTable.setJdbcResourceName(props.getOrDefault(PROP_JDBC_RESOURCE_NAME, ""));
tJdbcTable.setCatalogId(Long.parseLong(props.getOrDefault(PROP_JDBC_CATALOG_ID, "0")));
tJdbcTable.setConnectionPoolMinSize(
Integer.parseInt(props.getOrDefault(PROP_JDBC_POOL_MIN, "1")));
tJdbcTable.setConnectionPoolMaxSize(
Integer.parseInt(props.getOrDefault(PROP_JDBC_POOL_MAX, "10")));
tJdbcTable.setConnectionPoolMaxWaitTime(
Integer.parseInt(props.getOrDefault(PROP_JDBC_POOL_MAX_WAIT, "5000")));
tJdbcTable.setConnectionPoolMaxLifeTime(
Integer.parseInt(props.getOrDefault(PROP_JDBC_POOL_MAX_LIFE, "1800000")));
tJdbcTable.setConnectionPoolKeepAlive(
Boolean.parseBoolean(props.getOrDefault(PROP_JDBC_POOL_KEEP_ALIVE, "false")));
jdbcSink.setJdbcTable(tJdbcTable);
String insertSql = props.getOrDefault(PROP_JDBC_INSERT_SQL, "");
jdbcSink.setInsertSql(insertSql);
boolean useTxn = Boolean.parseBoolean(
props.getOrDefault(PROP_JDBC_USE_TRANSACTION, "false"));
jdbcSink.setUseTransaction(useTxn);
String tableType = props.getOrDefault(PROP_JDBC_TABLE_TYPE, "");
if (!tableType.isEmpty()) {
jdbcSink.setTableType(TOdbcTableType.valueOf(tableType));
}
tDataSink = new TDataSink(TDataSinkType.JDBC_TABLE_SINK);
tDataSink.setJdbcTableSink(jdbcSink);
}
private boolean isWellKnownProperty(String key) {
return key.equals(PROP_DB_NAME) || key.equals(PROP_TABLE_NAME)
|| key.equals(PROP_OVERWRITE)
|| key.equals(PROP_WRITE_PATH) || key.equals(PROP_TARGET_PATH)
|| key.equals(PROP_ORIGINAL_WRITE_PATH)
|| key.startsWith("jdbc_");
}
}