PaimonTableValuedFunction.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.tablefunction;

import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TPaimonMetadataParams;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * Table-valued function for querying Paimon system tables metadata.
 */
public class PaimonTableValuedFunction extends MetadataTableValuedFunction {
    public static final String NAME = "paimon_meta";
    public static final String TABLE = "table";
    public static final String QUERY_TYPE = "query_type";

    private static final ImmutableSet<String> PROPERTIES_SET = ImmutableSet.of(TABLE, QUERY_TYPE);

    private final String queryType;
    private final Table paimonSysTable;
    private final List<Column> schema;
    private final Map<String, String> hadoopProps;
    private final Map<String, String> paimonProps;
    private final HadoopAuthenticator hadoopAuthenticator;
    private final TableName paimonTableName;
    private final long ctlId;
    private final long dbId;
    private final long tblId;

    /**
     * Creates a new Paimon table-valued function instance.
     *
     * @param paimonTableName the target Paimon table name
     * @param queryType the type of metadata query to perform
     * @throws AnalysisException if table validation or initialization fails
     */
    public PaimonTableValuedFunction(TableName paimonTableName, String queryType) throws AnalysisException {
        this.queryType = queryType;
        CatalogIf<?> dorisCatalog = Env.getCurrentEnv()
                .getCatalogMgr()
                .getCatalog(paimonTableName.getCtl());

        if (!(dorisCatalog instanceof PaimonExternalCatalog)) {
            throw new AnalysisException("Catalog " + paimonTableName.getCtl() + " is not an paimon catalog");
        }

        this.paimonTableName = paimonTableName;
        PaimonExternalCatalog paimonExternalCatalog = (PaimonExternalCatalog) dorisCatalog;
        this.hadoopProps = paimonExternalCatalog.getCatalogProperty().getHadoopProperties();
        this.paimonProps = paimonExternalCatalog.getPaimonOptionsMap();
        this.hadoopAuthenticator = paimonExternalCatalog.getPreExecutionAuthenticator().getHadoopAuthenticator();
        this.ctlId = paimonExternalCatalog.getId();

        ExternalDatabase<? extends ExternalTable> database = paimonExternalCatalog.getDb(paimonTableName.getDb())
                .orElseThrow(() -> new AnalysisException(
                        String.format("Paimon catalog database '%s' does not exist", paimonTableName.getDb())
                ));
        this.dbId = database.getId();

        ExternalTable externalTable = database.getTable(paimonTableName.getTbl())
                .orElseThrow(() -> new AnalysisException(
                        String.format("Paimon catalog table '%s.%s' does not exist",
                                paimonTableName.getDb(), paimonTableName.getTbl())
                ));
        NameMapping buildNameMapping = externalTable.getOrBuildNameMapping();
        this.tblId = externalTable.getId();

        this.paimonSysTable = paimonExternalCatalog.getPaimonSystemTable(buildNameMapping,
                queryType);
        this.schema = PaimonUtil.parseSchema(paimonSysTable);

    }

    public static PaimonTableValuedFunction create(Map<String, String> params) throws AnalysisException {
        Map<String, String> validParams = Maps.newHashMap();
        for (String key : params.keySet()) {
            if (!PROPERTIES_SET.contains(key.toLowerCase())) {
                throw new AnalysisException("'" + key + "' is invalid property");
            }
            // check ctl, db, tbl
            validParams.put(key.toLowerCase(), params.get(key));
        }

        String tableName = validParams.get(TABLE);
        String queryType = validParams.get(QUERY_TYPE);
        if (tableName == null || queryType == null) {
            throw new AnalysisException("Invalid paimon metadata query");
        }

        String[] names = tableName.split("\\.");
        if (names.length != 3) {
            throw new AnalysisException("The paimon table name contains the catalogName, databaseName, and tableName");
        }
        TableName paimonTableName = new TableName(names[0], names[1], names[2]);
        // check auth
        if (!Env.getCurrentEnv().getAccessManager()
                .checkTblPriv(ConnectContext.get(), paimonTableName, PrivPredicate.SELECT)) {
            ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "SELECT",
                    ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(),
                    paimonTableName.getDb() + ": " + paimonTableName.getTbl());
        }
        return new PaimonTableValuedFunction(paimonTableName, queryType);
    }

    @Override
    public TMetadataType getMetadataType() {
        return TMetadataType.PAIMON;
    }

    @Override
    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) {
        int[] projections = requiredFileds.stream().mapToInt(
                        field -> paimonSysTable.rowType().getFieldNames()
                                .stream()
                                .map(String::toLowerCase)
                                .collect(Collectors.toList())
                                .indexOf(field))
                .toArray();
        List<Split> splits;

        try {
            splits = hadoopAuthenticator.doAs(
                    () -> paimonSysTable.newReadBuilder().withProjection(projections).newScan().plan().splits());
        } catch (Exception e) {
            throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e));
        }

        return splits.stream().map(this::createMetaScanRange).collect(Collectors.toList());
    }

    @Override
    public String getTableName() {
        return "PaimonTableValuedFunction<" + queryType + ">";
    }

    @Override
    public List<Column> getTableColumns() throws AnalysisException {
        return schema;
    }

    private TMetaScanRange createMetaScanRange(Split split) {
        TMetaScanRange tMetaScanRange = new TMetaScanRange();
        tMetaScanRange.setMetadataType(TMetadataType.PAIMON);

        TPaimonMetadataParams tPaimonMetadataParams = new TPaimonMetadataParams();
        tPaimonMetadataParams.setCtlId(ctlId);
        tPaimonMetadataParams.setDbId(dbId);
        tPaimonMetadataParams.setTblId(tblId);
        tPaimonMetadataParams.setQueryType(queryType);
        tPaimonMetadataParams.setDbName(paimonTableName.getDb());
        tPaimonMetadataParams.setTblName(paimonTableName.getTbl());
        tPaimonMetadataParams.setHadoopProps(hadoopProps);
        tPaimonMetadataParams.setPaimonProps(paimonProps);
        tPaimonMetadataParams.setSerializedSplit(PaimonUtil.encodeObjectToString(split));

        tMetaScanRange.setPaimonParams(tPaimonMetadataParams);
        return tMetaScanRange;
    }
}