PaimonSysExternalTable.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.systable;
import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalDatabase;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.datasource.paimon.PaimonUtil;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeRoot;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* Represents a Paimon system table (e.g., snapshots, binlog, audit_log) that wraps a source data table.
*
* <p>This class enables system tables to be queried using the native table execution path
* (FileQueryScanNode) instead of the TVF path (MetadataScanNode). This provides:
* <ul>
* <li>Unified execution path with regular tables</li>
* <li>Native vectorized reading for data-oriented system tables</li>
* <li>Better integration with query optimization</li>
* </ul>
*
* <p>System tables are classified into two categories:
* <ul>
* <li><b>Data tables</b> (binlog, audit_log, ro): Read actual ORC/Parquet data files</li>
* <li><b>Metadata tables</b> (snapshots, partitions, etc.): Read metadata/manifest files</li>
* </ul>
*/
public class PaimonSysExternalTable extends PaimonExternalTable {
private static final Logger LOG = LogManager.getLogger(PaimonSysExternalTable.class);
private final PaimonExternalTable sourceTable;
private final String sysTableType;
private final boolean isDataTable;
private volatile Table paimonSysTable;
/**
* Creates a new Paimon system external table.
*
* @param sourceTable the underlying data table being wrapped
* @param sysTableType the type of system table (e.g., "snapshots", "binlog")
*/
public PaimonSysExternalTable(PaimonExternalTable sourceTable, String sysTableType) {
super(generateSysTableId(sourceTable.getId(), sysTableType),
sourceTable.getName() + "$" + sysTableType,
sourceTable.getRemoteName() + "$" + sysTableType,
(PaimonExternalCatalog) sourceTable.getCatalog(),
(PaimonExternalDatabase) sourceTable.getDatabase());
this.sourceTable = sourceTable;
this.sysTableType = sysTableType;
this.isDataTable = PaimonSysTable.isDataSysTable(sysTableType);
}
/**
* Generate a unique ID for the system table based on source table ID and system table type.
*/
private static long generateSysTableId(long sourceTableId, String sysTableType) {
// Use a simple hash combination to generate a unique ID
return sourceTableId ^ (sysTableType.hashCode() * 31L);
}
/**
* Returns the Paimon system table instance (e.g., SnapshotsTable, BinlogTable).
*/
@Override
public Table getPaimonTable(Optional<MvccSnapshot> snapshot) {
if (paimonSysTable == null) {
synchronized (this) {
if (paimonSysTable == null) {
PaimonExternalCatalog catalog = (PaimonExternalCatalog) getCatalog();
paimonSysTable = catalog.getPaimonTable(
sourceTable.getOrBuildNameMapping(),
"main", // branch
sysTableType // queryType: snapshots, binlog, etc.
);
LOG.info("Created Paimon system table: {} for source table: {}",
sysTableType, sourceTable.getName());
}
}
}
return paimonSysTable;
}
/**
* Returns the schema of the system table.
* The schema is derived from the system table's rowType.
*/
@Override
public List<Column> getFullSchema() {
Table sysTable = getPaimonTable(Optional.empty());
List<DataField> fields = sysTable.rowType().getFields();
List<Column> columns = Lists.newArrayListWithCapacity(fields.size());
for (DataField field : fields) {
Column column = new Column(
field.name().toLowerCase(),
PaimonUtil.paimonTypeToDorisType(
field.type(),
getCatalog().getEnableMappingVarbinary(),
getCatalog().getEnableMappingTimestampTz()),
true,
null,
true,
field.description(),
true,
field.id());
PaimonUtil.updatePaimonColumnUniqueId(column, field);
if (field.type().getTypeRoot() == DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
column.setWithTZExtraInfo();
}
columns.add(column);
}
return columns;
}
public ExternalTable getSourceTable() {
return sourceTable;
}
public String getSysTableType() {
return sysTableType;
}
public boolean isDataTable() {
return isDataTable;
}
@Override
public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot, Optional<TableScanParams> scanParams) {
return sourceTable.loadSnapshot(tableSnapshot, scanParams);
}
@Override
public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
return isDataTable ? sourceTable.getPartitionType(snapshot) : PartitionType.UNPARTITIONED;
}
@Override
public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) {
return isDataTable ? sourceTable.getPartitionColumnNames(snapshot) : Collections.emptySet();
}
@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
return isDataTable ? sourceTable.getPartitionColumns(snapshot) : Collections.emptyList();
}
@Override
public boolean isPartitionInvalid(Optional<MvccSnapshot> snapshot) {
return !isDataTable || sourceTable.isPartitionInvalid(snapshot);
}
@Override
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
return isDataTable ? sourceTable.getNameToPartitionItems(snapshot) : Collections.emptyMap();
}
@Override
public boolean supportInternalPartitionPruned() {
return isDataTable && sourceTable.supportInternalPartitionPruned();
}
@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot) throws AnalysisException {
return sourceTable.getPartitionSnapshot(partitionName, context, snapshot);
}
@Override
public Map<String, Partition> getPartitionSnapshot(Optional<MvccSnapshot> snapshot) {
return sourceTable.getPartitionSnapshot(snapshot);
}
@Override
public MTMVSnapshotIf getTableSnapshot(Optional<MvccSnapshot> snapshot) throws AnalysisException {
return sourceTable.getTableSnapshot(snapshot);
}
@Override
public long getNewestUpdateVersionOrTime() {
return sourceTable.getNewestUpdateVersionOrTime();
}
@Override
public boolean isPartitionedTable() {
return isDataTable && sourceTable.isPartitionedTable();
}
@Override
public String getComment() {
return "Paimon system table: " + sysTableType + " for " + sourceTable.getName();
}
}