PaimonTableSink.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TPaimonTableSink;
import org.apache.doris.thrift.TPaimonWriteShuffleMode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.utils.InstantiationUtil;
import java.net.URI;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* Paimon table sink
*
* This class materializes the TPaimonTableSink payload consumed by BE,
* including table location, write options, partition keys, bucket keys,
* shuffle mode and sink column names.
*/
public class PaimonTableSink extends BaseExternalTableDataSink {
private static final Logger LOG = LogManager.getLogger(PaimonTableSink.class);
private final PaimonExternalTable targetTable;
private static final Base64.Encoder BASE64_ENCODER = java.util.Base64.getUrlEncoder().withoutPadding();
private static final HashSet<TFileFormatType> supportedTypes = new HashSet<TFileFormatType>() {{
add(TFileFormatType.FORMAT_ORC);
add(TFileFormatType.FORMAT_PARQUET);
}};
public PaimonTableSink(PaimonExternalTable targetTable) {
super();
this.targetTable = targetTable;
}
// List of columns to be written to the sink, used to populate columnNames in Thrift
private List<Column> cols;
public void setCols(List<Column> cols) {
this.cols = cols;
}
@Override
public String getExplainString(String prefix, TExplainLevel explainLevel) {
StringBuilder strBuilder = new StringBuilder();
strBuilder.append(prefix).append("PAIMON TABLE SINK\n");
if (explainLevel == TExplainLevel.BRIEF) {
return strBuilder.toString();
}
return strBuilder.toString();
}
@Override
protected Set<TFileFormatType> supportedFileFormatTypes() {
return supportedTypes;
}
@Override
public void bindDataSink(Optional<InsertCommandContext> insertCtx) throws AnalysisException {
TPaimonTableSink tSink = new TPaimonTableSink();
// basic identifiers
tSink.setCatalogName(targetTable.getCatalog().getName());
tSink.setDbName(targetTable.getDbName());
tSink.setTbName(targetTable.getName());
Map<String, String> catalogProps = targetTable.getCatalog().getCatalogProperty().getHadoopProperties();
Map<String, String> options = new HashMap<>();
options.putAll(catalogProps);
String warehouse = ((PaimonExternalCatalog) targetTable.getCatalog()).getPaimonOptionsMap()
.get(CatalogOptions.WAREHOUSE.key());
String defaultFsName = resolveDefaultFsName(warehouse);
if (defaultFsName != null && !defaultFsName.isEmpty()) {
String currentDefaultFs = options.get("fs.defaultFS");
if (currentDefaultFs == null || currentDefaultFs.isEmpty() || currentDefaultFs.startsWith("file:/")) {
options.put("fs.defaultFS", defaultFsName);
}
}
if (insertCtx.isPresent() && insertCtx.get() instanceof BaseExternalTableInsertCommandContext) {
BaseExternalTableInsertCommandContext ctx = (BaseExternalTableInsertCommandContext) insertCtx.get();
if (ctx.getTxnId() > 0) {
options.put("doris.commit_identifier", String.valueOf(ctx.getTxnId()));
}
if (ctx.getCommitUser() != null && !ctx.getCommitUser().isEmpty()) {
options.put("doris.commit_user", ctx.getCommitUser());
}
}
if (ConnectContext.get() != null) {
options.put("target-file-size",
String.valueOf(ConnectContext.get().getSessionVariable().paimonTargetFileSize));
options.put("write-buffer-size",
String.valueOf(ConnectContext.get().getSessionVariable().paimonWriteBufferSize));
String hadoopUser = options.get("hadoop.username");
if (hadoopUser == null || hadoopUser.isEmpty()) {
hadoopUser = options.get("hadoop.user.name");
}
if (hadoopUser == null || hadoopUser.isEmpty()) {
hadoopUser = "hadoop";
}
options.put("hadoop.user.name", hadoopUser);
options.put("hadoop.username", hadoopUser);
}
String tableLocation = null;
org.apache.paimon.table.Table paimonTable =
targetTable.getPaimonTable(MvccUtil.getSnapshotFromContext(targetTable));
if (paimonTable instanceof org.apache.paimon.table.FileStoreTable) {
tableLocation = ((org.apache.paimon.table.FileStoreTable) paimonTable).location().toString();
}
tableLocation = normalizeTableLocation(tableLocation);
if (tableLocation == null || tableLocation.isEmpty()) {
if (warehouse != null && !warehouse.isEmpty()) {
String base = warehouse.endsWith("/") ? warehouse : warehouse + "/";
tableLocation = base + targetTable.getDbName() + ".db/" + targetTable.getName() + "/";
}
} else if (defaultFsName != null && !defaultFsName.isEmpty() && tableLocation.startsWith("hdfs://")) {
try {
URI tableLocationUri = URI.create(tableLocation);
String path = tableLocationUri.getPath();
if (path != null && !path.isEmpty()) {
tableLocation = defaultFsName + path;
}
} catch (Exception e) {
LOG.warn("paimon: failed to align table location {} with default fs {}", tableLocation, defaultFsName);
}
} else if (!tableLocation.contains("://") && defaultFsName != null && !defaultFsName.isEmpty()) {
String tablePath = tableLocation.startsWith("/") ? tableLocation : "/" + tableLocation;
tableLocation = defaultFsName + tablePath;
}
if (tableLocation != null && !tableLocation.isEmpty()) {
tSink.setTableLocation(tableLocation);
}
if (paimonTable != null) {
tSink.setSerializedTable(encodeObjectToString(paimonTable));
}
ArrayList<String> partitionKeys = new ArrayList<>();
try {
targetTable.getPartitionColumns(MvccUtil.getSnapshotFromContext(targetTable)).forEach(c -> {
partitionKeys.add(c.getName());
});
} catch (Exception e) {
LOG.warn("paimon: failed to get partition keys for table={}.{}: {}",
targetTable.getDbName(), targetTable.getName(), e.getMessage());
throw new AnalysisException("Failed to get partition keys for paimon table", e);
}
tSink.setPartitionKeys(partitionKeys);
ArrayList<String> bucketKeys = new ArrayList<>();
int bucketNum = 0;
try {
if (paimonTable instanceof org.apache.paimon.table.FileStoreTable) {
org.apache.paimon.schema.TableSchema schema =
((org.apache.paimon.table.FileStoreTable) paimonTable).schema();
bucketNum = schema.numBuckets();
bucketKeys.addAll(schema.bucketKeys());
if (bucketNum > 0 && bucketKeys.isEmpty()) {
bucketKeys.addAll(schema.fieldNames());
}
}
} catch (Exception e) {
LOG.error("paimon: failed to get bucket info for table={}.{}: {}",
targetTable.getDbName(), targetTable.getName(), e.getMessage());
throw new AnalysisException("Failed to get bucket info for paimon table", e);
}
tSink.setBucketKeys(bucketKeys);
if (bucketNum > 0) {
tSink.setBucketNum(bucketNum);
tSink.setShuffleMode(TPaimonWriteShuffleMode.PAIMON_SHUFFLE_BUCKET);
} else if (!partitionKeys.isEmpty()) {
tSink.setShuffleMode(TPaimonWriteShuffleMode.PAIMON_SHUFFLE_PARTITION);
} else {
tSink.setShuffleMode(TPaimonWriteShuffleMode.PAIMON_SHUFFLE_RANDOM);
}
boolean enableJni = false;
if (ConnectContext.get() != null) {
enableJni = ConnectContext.get().getSessionVariable().enablePaimonJniWriter;
}
options.put("paimon_use_jni", String.valueOf(enableJni));
if (ConnectContext.get() != null) {
boolean enableJniCompact = ConnectContext.get().getSessionVariable().enablePaimonJniCompact;
options.put("paimon_use_jni_compact", String.valueOf(enableJniCompact));
} else {
options.put("paimon_use_jni_compact", "false");
}
tSink.setOptions(options);
// Pass column names to BE because PipelineX may strip them from Block
ArrayList<String> columnNames = new ArrayList<>();
for (Column col : cols) {
columnNames.add(col.getName());
}
tSink.setColumnNames(columnNames);
tDataSink = new TDataSink(TDataSinkType.PAIMON_TABLE_SINK);
tDataSink.setPaimonTableSink(tSink);
}
public static <T> String encodeObjectToString(T t) {
try {
byte[] bytes = InstantiationUtil.serializeObject(t);
return new String(BASE64_ENCODER.encode(bytes), java.nio.charset.StandardCharsets.UTF_8);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private String resolveDefaultFsName(String warehouse) {
if (warehouse == null || warehouse.isEmpty()) {
return null;
}
try {
URI uri = URI.create(warehouse);
String scheme = uri.getScheme();
String authority = uri.getAuthority();
if (scheme == null || scheme.isEmpty() || authority == null || authority.isEmpty()) {
return null;
}
return scheme + "://" + authority;
} catch (Exception e) {
LOG.warn("paimon: invalid warehouse uri {}, skip default fs resolve", warehouse);
return null;
}
}
private String normalizeTableLocation(String tableLocation) {
if (tableLocation == null || tableLocation.isEmpty()) {
return tableLocation;
}
try {
URI uri = URI.create(tableLocation);
String scheme = uri.getScheme();
if ("hdfs".equalsIgnoreCase(scheme)) {
String authority = uri.getAuthority();
if (authority != null && !authority.isEmpty()) {
return tableLocation;
}
String path = uri.getPath();
if (path != null && !path.isEmpty()) {
return path;
}
}
} catch (Exception e) {
if (tableLocation.startsWith("hdfs:/") && !tableLocation.startsWith("hdfs://")) {
return "/" + tableLocation.substring("hdfs:/".length());
}
}
return tableLocation;
}
}