PaimonSysExternalTable.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.paimon;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.systable.SysTable;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ExternalAnalysisTask;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeRoot;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* Represents a Paimon system table (e.g., snapshots, binlog, audit_log) that wraps a source data table.
*
* <p>This class enables system tables to be queried using the native table execution path
* (FileQueryScanNode) instead of the TVF path (MetadataScanNode). This provides:
* <ul>
* <li>Unified execution path with regular tables</li>
* <li>Native vectorized reading for data-oriented system tables</li>
* <li>Better integration with query optimization</li>
* </ul>
*
* <p>System tables are classified into two categories:
* <ul>
* <li><b>Data tables</b> (e.g., binlog, audit_log, ro): Read actual ORC/Parquet data files</li>
* <li><b>Metadata tables</b> (snapshots, partitions, etc.): Read metadata/manifest files</li>
* </ul>
*/
public class PaimonSysExternalTable extends ExternalTable {
private static final Logger LOG = LogManager.getLogger(PaimonSysExternalTable.class);
private final PaimonExternalTable sourceTable;
private final String sysTableType;
private volatile Boolean isDataTable;
private volatile Table paimonSysTable;
/**
* Creates a new Paimon system external table.
*
* @param sourceTable the underlying data table being wrapped
* @param sysTableType the type of system table (e.g., "snapshots", "binlog")
*/
public PaimonSysExternalTable(PaimonExternalTable sourceTable, String sysTableType) {
super(generateSysTableId(sourceTable.getId(), sysTableType),
sourceTable.getName() + "$" + sysTableType,
sourceTable.getRemoteName() + "$" + sysTableType,
(PaimonExternalCatalog) sourceTable.getCatalog(),
(PaimonExternalDatabase) sourceTable.getDatabase(),
TableIf.TableType.PAIMON_EXTERNAL_TABLE);
this.sourceTable = sourceTable;
this.sysTableType = sysTableType;
}
protected synchronized void makeSureInitialized() {
super.makeSureInitialized();
if (!objectCreated) {
objectCreated = true;
}
}
/**
* Generate a unique ID for the system table based on source table ID and system table type.
*/
private static long generateSysTableId(long sourceTableId, String sysTableType) {
// Use a simple hash combination to generate a unique ID
return sourceTableId ^ (sysTableType.hashCode() * 31L);
}
/**
* Returns the Paimon system table instance (e.g., snapshots, binlog).
* Note: system tables currently ignore snapshot semantics.
*/
public Table getSysPaimonTable() {
if (paimonSysTable == null) {
synchronized (this) {
if (paimonSysTable == null) {
PaimonExternalCatalog catalog = (PaimonExternalCatalog) getCatalog();
paimonSysTable = catalog.getPaimonTable(
sourceTable.getOrBuildNameMapping(),
"main", // branch
sysTableType // queryType: snapshots, binlog, etc.
);
LOG.info("Created Paimon system table: {} for source table: {}",
sysTableType, sourceTable.getName());
}
}
}
return paimonSysTable;
}
/**
* Returns the schema of the system table.
* The schema is derived from the system table's rowType.
*/
@Override
public List<Column> getFullSchema() {
Table sysTable = getSysPaimonTable();
List<DataField> fields = sysTable.rowType().getFields();
List<Column> columns = Lists.newArrayListWithCapacity(fields.size());
for (DataField field : fields) {
Column column = new Column(
field.name().toLowerCase(),
PaimonUtil.paimonTypeToDorisType(
field.type(),
getCatalog().getEnableMappingVarbinary(),
getCatalog().getEnableMappingTimestampTz()),
true,
null,
true,
field.description(),
true,
field.id());
PaimonUtil.updatePaimonColumnUniqueId(column, field);
if (field.type().getTypeRoot() == DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
column.setWithTZExtraInfo();
}
columns.add(column);
}
return columns;
}
public PaimonExternalTable getSourceTable() {
return sourceTable;
}
public String getSysTableType() {
return sysTableType;
}
public boolean isDataTable() {
return resolveIsDataTable();
}
private boolean resolveIsDataTable() {
if (isDataTable == null) {
synchronized (this) {
if (isDataTable == null) {
isDataTable = getSysPaimonTable() instanceof DataTable;
}
}
}
return isDataTable;
}
@Override
public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
makeSureInitialized();
return new ExternalAnalysisTask(info);
}
@Override
public TTableDescriptor toThrift() {
List<Column> schema = getFullSchema();
String catalogType = sourceTable.getPaimonCatalogType();
if (PaimonExternalCatalog.PAIMON_HMS.equals(catalogType)
|| PaimonExternalCatalog.PAIMON_FILESYSTEM.equals(catalogType)
|| PaimonExternalCatalog.PAIMON_DLF.equals(catalogType)
|| PaimonExternalCatalog.PAIMON_REST.equals(catalogType)) {
THiveTable tHiveTable = new THiveTable(dbName, name, new HashMap<>());
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, schema.size(), 0,
getName(), dbName);
tTableDescriptor.setHiveTable(tHiveTable);
return tTableDescriptor;
} else {
throw new IllegalArgumentException(
"Currently only supports hms/dlf/rest/filesystem catalog, do not support :" + catalogType);
}
}
@Override
public long fetchRowCount() {
makeSureInitialized();
long rowCount = 0;
List<Split> splits = getSysPaimonTable().newReadBuilder().newScan().plan().splits();
for (Split split : splits) {
rowCount += split.rowCount();
}
if (rowCount == 0) {
LOG.info("Paimon system table {} row count is 0, return -1", name);
}
return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT;
}
@Override
public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
return Optional.of(new SchemaCacheValue(getFullSchema()));
}
@Override
public Map<String, SysTable> getSupportedSysTables() {
return sourceTable.getSupportedSysTables();
}
public Map<String, String> getTableProperties() {
return sourceTable.getTableProperties();
}
@Override
public String getComment() {
return "Paimon system table: " + sysTableType + " for " + sourceTable.getName();
}
}