ConnectorColumnConverter.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.connector.api.ConnectorColumn;
import org.apache.doris.connector.api.ConnectorType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
/**
* Converts between the connector SPI type system ({@link ConnectorColumn}/{@link ConnectorType})
* and the Doris internal type system ({@link Column}/{@link Type}).
*
* <p>This converter lives in fe-core because it depends on both the SPI API types
* (from fe-connector-api) and the internal Doris catalog types (from fe-type/fe-core).</p>
*/
public final class ConnectorColumnConverter {
private static final Logger LOG = LogManager.getLogger(ConnectorColumnConverter.class);
private ConnectorColumnConverter() {
}
/**
* Converts a list of {@link ConnectorColumn} to a list of Doris {@link Column}.
*/
public static List<Column> convertColumns(List<ConnectorColumn> connectorColumns) {
return connectorColumns.stream()
.map(ConnectorColumnConverter::convertColumn)
.collect(Collectors.toList());
}
/**
* Converts a single {@link ConnectorColumn} to a Doris {@link Column}.
*/
public static Column convertColumn(ConnectorColumn cc) {
Type dorisType = convertType(cc.getType());
Column column = new Column(cc.getName(), dorisType, cc.isKey(), null,
cc.isNullable(), cc.getDefaultValue(),
cc.getComment() != null ? cc.getComment() : "");
// Re-apply the WITH_TIMEZONE "Extra" marker the connector carried across the SPI boundary
// (ConnectorColumn.withTimeZone()), matching legacy PaimonExternalTable/IcebergUtils which set it
// via setWithTZExtraInfo() from the source TZ type. Independent of the mapped Doris type, so it is
// shown even when the column was mapped to a plain DATETIME (timestamp_tz mapping off).
if (cc.isWithTimeZone()) {
column.setWithTZExtraInfo();
}
// Re-apply the hidden marker the connector carried across the SPI boundary
// (ConnectorColumn.invisible()), so synthetic write columns a connector declares through the schema
// SPI (iceberg __DORIS_ICEBERG_ROWID_COL__ / v3 row-lineage) stay hidden, matching legacy
// Column.setIsVisible(false). A Doris Column defaults to visible, so only the false case is re-applied.
if (!cc.isVisible()) {
column.setIsVisible(false);
}
// Re-apply the reserved field id the connector carried across the SPI boundary
// (ConnectorColumn.withUniqueId()), so synthetic write columns whose Doris identity must equal a
// connector-reserved field id keep it (iceberg v3 row-lineage _row_id=2147483540 /
// _last_updated_sequence_number=2147483539, matched by field id BE-side). A Doris Column defaults to
// an unset (-1) uniqueId, so only a set (>= 0) id is re-applied.
if (cc.getUniqueId() >= 0) {
column.setUniqueId(cc.getUniqueId());
}
// Stamp the nested (STRUCT/ARRAY/MAP) child column tree with the per-field ids the connector carried
// on the ConnectorType (iceberg), mirroring legacy IcebergUtils.updateIcebergColumnUniqueId's
// recursive set. The BE field-id scan path matches a pruned nested leaf by id; a -1 leaf is skipped
// and returns NULL. Inert for connectors that don't carry field ids (getChildFieldId returns -1).
applyNestedFieldIds(column, cc.getType());
return column;
}
/**
* Recursively stamps {@code column}'s child tree (STRUCT fields / ARRAY element / MAP key+value) with the
* per-child field ids carried on {@code type} ({@link ConnectorType#getChildFieldId(int)}). The Doris
* child column order built by {@code Column.createChildrenColumn} matches the {@link ConnectorType}
* children order (array element / map key,value / struct fields-in-order), so a parallel walk aligns them.
* Only sets a child whose carried id is {@code >= 0}, leaving others at the default -1.
*/
private static void applyNestedFieldIds(Column column, ConnectorType type) {
List<Column> childColumns = column.getChildren();
if (childColumns == null || childColumns.isEmpty()) {
return;
}
List<ConnectorType> childTypes = type.getChildren();
int n = Math.min(childColumns.size(), childTypes.size());
for (int i = 0; i < n; i++) {
Column childColumn = childColumns.get(i);
int childFieldId = type.getChildFieldId(i);
if (childFieldId >= 0) {
childColumn.setUniqueId(childFieldId);
}
applyNestedFieldIds(childColumn, childTypes.get(i));
}
}
/**
* Converts a list of Doris {@link Column} to a list of {@link ConnectorColumn}.
*/
public static List<ConnectorColumn> toConnectorColumns(List<Column> columns) {
return columns.stream()
.map(ConnectorColumnConverter::toConnectorColumn)
.collect(Collectors.toList());
}
/**
* Converts a Doris {@link Column} to a {@link ConnectorColumn}.
* This is the inverse of {@link #convertColumn(ConnectorColumn)}.
*
* <p>The {@code isKey}/{@code isAutoInc}/{@code isAggregated} flags are carried so a connector can
* re-enforce its column-validation parity (e.g. iceberg rejects aggregated / auto-increment columns
* in {@code ALTER TABLE ADD/MODIFY COLUMN}); without them those flags would default to {@code false}
* and the connector could not tell an aggregated/auto-inc column apart from a plain one.</p>
*/
public static ConnectorColumn toConnectorColumn(Column col) {
ConnectorType connectorType = toConnectorType(col.getType());
return new ConnectorColumn(
col.getName(),
connectorType,
col.getComment(),
col.isAllowNull(),
col.getDefaultValue(),
col.isKey(),
col.isAutoInc(),
col.isAggregated());
}
/**
* Converts a Doris {@link Type} to a {@link ConnectorType}, handling
* complex types (ARRAY, MAP, STRUCT) recursively.
* This is the inverse of {@link #convertType(ConnectorType)}.
*/
public static ConnectorType toConnectorType(Type dorisType) {
if (dorisType instanceof ArrayType) {
ArrayType arr = (ArrayType) dorisType;
// Carry the element's nullability so a connector can preserve a NOT NULL ARRAY element
// (e.g. iceberg CREATE TABLE / complex MODIFY COLUMN); legacy lost it (defaulted optional).
return ConnectorType.arrayOf(toConnectorType(arr.getItemType()), arr.getContainsNull());
} else if (dorisType instanceof MapType) {
MapType map = (MapType) dorisType;
// Map keys are always required; only the value nullability is carried.
return ConnectorType.mapOf(
toConnectorType(map.getKeyType()),
toConnectorType(map.getValueType()),
map.getIsValueContainsNull());
} else if (dorisType instanceof StructType) {
StructType struct = (StructType) dorisType;
List<String> names = new ArrayList<>();
List<ConnectorType> types = new ArrayList<>();
List<Boolean> nullables = new ArrayList<>();
List<String> comments = new ArrayList<>();
// Carry each field's nullability + comment so a connector can preserve a NOT NULL / commented
// STRUCT field and diff a complex MODIFY COLUMN field-by-field; legacy carried neither.
for (StructField f : struct.getFields()) {
names.add(f.getName());
types.add(toConnectorType(f.getType()));
nullables.add(f.getContainsNull());
comments.add(f.getComment());
}
return ConnectorType.structOf(names, types, nullables, comments);
} else if (dorisType instanceof ScalarType) {
ScalarType scalar = (ScalarType) dorisType;
PrimitiveType primitiveType = scalar.getPrimitiveType();
// CHAR/VARCHAR store their length in `len`, not `precision`; encode it
// into the ConnectorType precision field (matching convertScalarType and
// the connector type convention) so CREATE TABLE requests keep the length.
if (primitiveType == PrimitiveType.CHAR
|| primitiveType == PrimitiveType.VARCHAR) {
return ConnectorType.of(primitiveType.toString(),
scalar.getLength(), 0);
}
return ConnectorType.of(
primitiveType.toString(),
scalar.getScalarPrecision(),
scalar.getScalarScale());
} else {
return ConnectorType.of(dorisType.toString(), -1, -1);
}
}
/**
* Converts a {@link ConnectorType} to a Doris {@link Type}, handling
* complex types (ARRAY, MAP, STRUCT) recursively.
*/
public static Type convertType(ConnectorType ct) {
String typeName = ct.getTypeName().toUpperCase(Locale.ROOT);
switch (typeName) {
case "ARRAY":
return convertArrayType(ct);
case "MAP":
return convertMapType(ct);
case "STRUCT":
return convertStructType(ct);
default:
return convertScalarType(typeName, ct.getPrecision(), ct.getScale());
}
}
private static Type convertArrayType(ConnectorType ct) {
List<ConnectorType> children = ct.getChildren();
if (children.isEmpty()) {
return ArrayType.create(Type.NULL, true);
}
return ArrayType.create(convertType(children.get(0)), true);
}
private static Type convertMapType(ConnectorType ct) {
List<ConnectorType> children = ct.getChildren();
if (children.size() < 2) {
return new MapType(Type.NULL, Type.NULL);
}
return new MapType(convertType(children.get(0)), convertType(children.get(1)));
}
private static Type convertStructType(ConnectorType ct) {
List<ConnectorType> children = ct.getChildren();
List<String> fieldNames = ct.getFieldNames();
ArrayList<StructField> fields = new ArrayList<>();
for (int i = 0; i < children.size(); i++) {
String fieldName = i < fieldNames.size() ? fieldNames.get(i) : "col" + i;
fields.add(new StructField(fieldName, convertType(children.get(i))));
}
return new StructType(fields);
}
private static Type convertScalarType(String typeName, int precision, int scale) {
switch (typeName) {
case "CHAR":
if (precision > 0) {
return ScalarType.createCharType(precision);
}
return ScalarType.CHAR;
case "VARCHAR":
if (precision > 0) {
return ScalarType.createVarcharType(precision);
}
return ScalarType.createVarcharType();
case "DECIMAL":
case "DECIMALV2":
if (precision > 0) {
return ScalarType.createDecimalType(precision, Math.max(scale, 0));
}
return ScalarType.createDecimalType();
case "DECIMALV3":
case "DECIMAL32":
case "DECIMAL64":
case "DECIMAL128":
case "DECIMAL256":
if (precision > 0) {
return ScalarType.createDecimalV3Type(precision, Math.max(scale, 0));
}
return ScalarType.createDecimalV3Type();
case "DATETIMEV2":
// Connectors encode datetime scale in the precision field of ConnectorType.
if (precision >= 0) {
return ScalarType.createDatetimeV2Type(precision);
}
return ScalarType.DATETIMEV2;
case "TIMESTAMPTZ":
if (precision >= 0) {
return ScalarType.createTimeStampTzType(precision);
}
return ScalarType.createTimeStampTzType(0);
case "VARBINARY":
if (precision > 0) {
return ScalarType.createVarbinaryType(precision);
}
return ScalarType.createVarbinaryType(ScalarType.MAX_VARBINARY_LENGTH);
case "JSONB":
return ScalarType.createType("JSON");
case "UNSUPPORTED":
return Type.UNSUPPORTED;
default:
try {
return ScalarType.createType(typeName);
} catch (Exception e) {
LOG.warn("Unrecognized connector type '{}', marking as UNSUPPORTED", typeName);
return Type.UNSUPPORTED;
}
}
}
}