PaimonSysExternalTable.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.systable;

import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalDatabase;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.datasource.paimon.PaimonUtil;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVSnapshotIf;

import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeRoot;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
 * Represents a Paimon system table (e.g., snapshots, binlog, audit_log) that wraps a source data table.
 *
 * <p>This class enables system tables to be queried using the native table execution path
 * (FileQueryScanNode) instead of the TVF path (MetadataScanNode). This provides:
 * <ul>
 *   <li>Unified execution path with regular tables</li>
 *   <li>Native vectorized reading for data-oriented system tables</li>
 *   <li>Better integration with query optimization</li>
 * </ul>
 *
 * <p>System tables are classified into two categories:
 * <ul>
 *   <li><b>Data tables</b> (binlog, audit_log, ro): Read actual ORC/Parquet data files</li>
 *   <li><b>Metadata tables</b> (snapshots, partitions, etc.): Read metadata/manifest files</li>
 * </ul>
 */
public class PaimonSysExternalTable extends PaimonExternalTable {

    private static final Logger LOG = LogManager.getLogger(PaimonSysExternalTable.class);

    private final PaimonExternalTable sourceTable;
    private final String sysTableType;
    private final boolean isDataTable;
    private volatile Table paimonSysTable;

    /**
     * Creates a new Paimon system external table.
     *
     * @param sourceTable the underlying data table being wrapped
     * @param sysTableType the type of system table (e.g., "snapshots", "binlog")
     */
    public PaimonSysExternalTable(PaimonExternalTable sourceTable, String sysTableType) {
        super(generateSysTableId(sourceTable.getId(), sysTableType),
                sourceTable.getName() + "$" + sysTableType,
                sourceTable.getRemoteName() + "$" + sysTableType,
                (PaimonExternalCatalog) sourceTable.getCatalog(),
                (PaimonExternalDatabase) sourceTable.getDatabase());
        this.sourceTable = sourceTable;
        this.sysTableType = sysTableType;
        this.isDataTable = PaimonSysTable.isDataSysTable(sysTableType);
    }

    /**
     * Generate a unique ID for the system table based on source table ID and system table type.
     */
    private static long generateSysTableId(long sourceTableId, String sysTableType) {
        // Use a simple hash combination to generate a unique ID
        return sourceTableId ^ (sysTableType.hashCode() * 31L);
    }

    /**
     * Returns the Paimon system table instance (e.g., SnapshotsTable, BinlogTable).
     */
    @Override
    public Table getPaimonTable(Optional<MvccSnapshot> snapshot) {
        if (paimonSysTable == null) {
            synchronized (this) {
                if (paimonSysTable == null) {
                    PaimonExternalCatalog catalog = (PaimonExternalCatalog) getCatalog();
                    paimonSysTable = catalog.getPaimonTable(
                            sourceTable.getOrBuildNameMapping(),
                            "main",  // branch
                            sysTableType  // queryType: snapshots, binlog, etc.
                    );
                    LOG.info("Created Paimon system table: {} for source table: {}",
                            sysTableType, sourceTable.getName());
                }
            }
        }
        return paimonSysTable;
    }

    /**
     * Returns the schema of the system table.
     * The schema is derived from the system table's rowType.
     */
    @Override
    public List<Column> getFullSchema() {
        Table sysTable = getPaimonTable(Optional.empty());
        List<DataField> fields = sysTable.rowType().getFields();
        List<Column> columns = Lists.newArrayListWithCapacity(fields.size());

        for (DataField field : fields) {
            Column column = new Column(
                    field.name().toLowerCase(),
                    PaimonUtil.paimonTypeToDorisType(
                            field.type(),
                            getCatalog().getEnableMappingVarbinary(),
                            getCatalog().getEnableMappingTimestampTz()),
                    true,
                    null,
                    true,
                    field.description(),
                    true,
                    field.id());
            PaimonUtil.updatePaimonColumnUniqueId(column, field);
            if (field.type().getTypeRoot() == DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
                column.setWithTZExtraInfo();
            }
            columns.add(column);
        }
        return columns;
    }

    public ExternalTable getSourceTable() {
        return sourceTable;
    }

    public String getSysTableType() {
        return sysTableType;
    }

    public boolean isDataTable() {
        return isDataTable;
    }

    @Override
    public MvccSnapshot loadSnapshot(Optional<TableSnapshot> tableSnapshot, Optional<TableScanParams> scanParams) {
        return sourceTable.loadSnapshot(tableSnapshot, scanParams);
    }

    @Override
    public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
        return isDataTable ? sourceTable.getPartitionType(snapshot) : PartitionType.UNPARTITIONED;
    }

    @Override
    public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) {
        return isDataTable ? sourceTable.getPartitionColumnNames(snapshot) : Collections.emptySet();
    }

    @Override
    public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
        return isDataTable ? sourceTable.getPartitionColumns(snapshot) : Collections.emptyList();
    }

    @Override
    public boolean isPartitionInvalid(Optional<MvccSnapshot> snapshot) {
        return !isDataTable || sourceTable.isPartitionInvalid(snapshot);
    }

    @Override
    public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
        return isDataTable ? sourceTable.getNameToPartitionItems(snapshot) : Collections.emptyMap();
    }

    @Override
    public boolean supportInternalPartitionPruned() {
        return isDataTable && sourceTable.supportInternalPartitionPruned();
    }

    @Override
    public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
            Optional<MvccSnapshot> snapshot) throws AnalysisException {
        return sourceTable.getPartitionSnapshot(partitionName, context, snapshot);
    }

    @Override
    public Map<String, Partition> getPartitionSnapshot(Optional<MvccSnapshot> snapshot) {
        return sourceTable.getPartitionSnapshot(snapshot);
    }

    @Override
    public MTMVSnapshotIf getTableSnapshot(Optional<MvccSnapshot> snapshot) throws AnalysisException {
        return sourceTable.getTableSnapshot(snapshot);
    }

    @Override
    public long getNewestUpdateVersionOrTime() {
        return sourceTable.getNewestUpdateVersionOrTime();
    }

    @Override
    public boolean isPartitionedTable() {
        return isDataTable && sourceTable.isPartitionedTable();
    }

    @Override
    public String getComment() {
        return "Paimon system table: " + sysTableType + " for " + sourceTable.getName();
    }
}