CreateTableInfoToConnectorRequestConverter.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.connector.ddl;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.connector.api.ConnectorColumn;
import org.apache.doris.connector.api.ConnectorType;
import org.apache.doris.connector.api.ddl.ConnectorBucketSpec;
import org.apache.doris.connector.api.ddl.ConnectorCreateTableRequest;
import org.apache.doris.connector.api.ddl.ConnectorPartitionField;
import org.apache.doris.connector.api.ddl.ConnectorPartitionSpec;
import org.apache.doris.datasource.ConnectorColumnConverter;
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLikeLiteral;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
import org.apache.doris.nereids.trees.plans.commands.info.DistributionDescriptor;
import org.apache.doris.nereids.trees.plans.commands.info.PartitionTableInfo;
import org.apache.doris.nereids.types.DataType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Converts a nereids {@link CreateTableInfo} into a connector-SPI
* {@link ConnectorCreateTableRequest}.
*
* <p>Covers Hive-style {@code IDENTITY}, Iceberg-style {@code TRANSFORM}, and
* Doris {@code LIST} / {@code RANGE} partitioning, plus hash / random
* distribution.</p>
*/
public final class CreateTableInfoToConnectorRequestConverter {
private CreateTableInfoToConnectorRequestConverter() {
}
/**
* @param info the nereids CREATE TABLE info (must be analyzed)
* @param dbName target database name (caller may normalize case)
*/
public static ConnectorCreateTableRequest convert(CreateTableInfo info,
String dbName) {
return ConnectorCreateTableRequest.builder()
.dbName(dbName)
.tableName(info.getTableName())
.columns(convertColumns(info.getColumnDefinitions()))
.partitionSpec(convertPartition(info.getPartitionTableInfo()))
.bucketSpec(convertBucket(info.getDistribution()))
.comment(info.getComment())
.properties(info.getProperties())
.ifNotExists(info.isIfNotExists())
.external(info.isExternal())
.build();
}
// -------- columns --------
private static List<ConnectorColumn> convertColumns(
List<ColumnDefinition> defs) {
if (defs == null || defs.isEmpty()) {
return Collections.emptyList();
}
List<ConnectorColumn> out = new ArrayList<>(defs.size());
for (ColumnDefinition d : defs) {
DataType nereidsType = d.getType();
ConnectorType type = ConnectorColumnConverter.toConnectorType(
nereidsType.toCatalogDataType());
// Default value is not exposed via a public getter on ColumnDefinition
// (private Optional<DefaultValue>); pass null until the SPI gains a
// typed default-value carrier. See HANDOFF open issues.
out.add(new ConnectorColumn(
d.getName(), type, d.getComment(),
d.isNullable(), null, d.isKey()));
}
return out;
}
// -------- partition --------
private static ConnectorPartitionSpec convertPartition(
PartitionTableInfo info) {
if (info == null) {
return null;
}
String pType = info.getPartitionType();
List<Expression> exprs = info.getPartitionList();
boolean isList = PartitionType.LIST.name().equalsIgnoreCase(pType);
boolean isRange = PartitionType.RANGE.name().equalsIgnoreCase(pType);
boolean hasExprs = exprs != null && !exprs.isEmpty();
if (!isList && !isRange && !hasExprs) {
return null;
}
ConnectorPartitionSpec.Style style;
if (isList) {
style = ConnectorPartitionSpec.Style.LIST;
} else if (isRange) {
style = ConnectorPartitionSpec.Style.RANGE;
} else if (hasAnyTransform(exprs)) {
style = ConnectorPartitionSpec.Style.TRANSFORM;
} else {
style = ConnectorPartitionSpec.Style.IDENTITY;
}
List<ConnectorPartitionField> fields = hasExprs
? convertFields(exprs)
: Collections.emptyList();
// LIST/RANGE PartitionDefinition values are not lowered here: each
// PartitionDefinition is a sealed family (InPartition/LessThanPartition/
// FixedRangePartition/StepPartition) carrying nereids Expressions that
// require full analysis to flatten into List<List<String>>. Connectors
// that need the initial values today read the Doris PartitionDesc
// directly; this converter passes an empty list and leaves richer
// lowering for a follow-up.
return new ConnectorPartitionSpec(style, fields, Collections.emptyList());
}
private static boolean hasAnyTransform(List<Expression> exprs) {
for (Expression e : exprs) {
if (e instanceof UnboundFunction) {
return true;
}
}
return false;
}
private static List<ConnectorPartitionField> convertFields(
List<Expression> exprs) {
List<ConnectorPartitionField> out = new ArrayList<>(exprs.size());
for (Expression e : exprs) {
if (e instanceof UnboundSlot) {
out.add(new ConnectorPartitionField(
((UnboundSlot) e).getName(), "identity",
Collections.emptyList()));
} else if (e instanceof UnboundFunction) {
out.add(convertTransformField((UnboundFunction) e));
}
// Unknown expression shapes are dropped; the connector can still
// honor the spec via its own analysis if richer info is required.
}
return out;
}
private static ConnectorPartitionField convertTransformField(
UnboundFunction fn) {
String transform = fn.getName().toLowerCase();
String columnName = null;
List<Integer> args = new ArrayList<>();
for (Expression child : fn.children()) {
if (child instanceof UnboundSlot && columnName == null) {
columnName = ((UnboundSlot) child).getName();
} else if (child instanceof IntegerLikeLiteral) {
args.add(((IntegerLikeLiteral) child).getIntValue());
} else if (child instanceof Literal) {
Object v = ((Literal) child).getValue();
if (v instanceof Number) {
args.add(((Number) v).intValue());
}
}
}
if (columnName == null) {
columnName = fn.toString();
}
return new ConnectorPartitionField(columnName, transform, args);
}
// -------- bucket --------
private static ConnectorBucketSpec convertBucket(DistributionDescriptor d) {
if (d == null) {
return null;
}
List<String> cols = d.getCols() == null
? Collections.emptyList()
: d.getCols();
// bucketNum is private; read it off the translated catalog desc so we
// do not depend on private internals.
int numBuckets = readBucketNum(d);
String algorithm = d.isHash() ? "doris_default" : "doris_random";
return new ConnectorBucketSpec(cols, numBuckets, algorithm);
}
private static int readBucketNum(DistributionDescriptor d) {
try {
return d.translateToCatalogStyle().getBuckets();
} catch (Exception ignored) {
return 0;
}
}
}