TableBinlogFunction.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.tablefunction;

import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.RowBinlogTableWrapper;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.info.PartitionNamesInfo;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanContext;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * The implementation of table valued function `binlog`.
 *
 * This TVF is debug-only, used to read binlog<row> data from a table.
 */
public class TableBinlogFunction extends TableValuedFunctionIf {
    public static final String NAME = "binlog";

    private static final String DB = "db";
    private static final String TABLE = "table";
    private static final String PARTITION = "partition";
    private static final String TABLET = "tablet";

    private static final ImmutableSet<String> PROPERTIES_SET = ImmutableSet.of(DB, TABLE, PARTITION, TABLET);

    private final String dbName;
    private final String tableName;
    private final PartitionNamesInfo partitionNamesInfo;
    private final Set<Long> specifiedTabletIds;
    private final OlapTable originTable;
    private final RowBinlogTableWrapper rowBinlogTableWrapper;

    public TableBinlogFunction(Map<String, String> params) throws AnalysisException {
        // Cloud mode uses a single-version snapshot for scan range versioning and is not supported here.
        if (Config.isCloudMode()) {
            throw new AnalysisException("binlog<row> table valued function is not supported in cloud mode");
        }

        Map<String, String> validParams = Maps.newHashMap();
        for (Map.Entry<String, String> e : params.entrySet()) {
            String key = StringUtils.lowerCase(e.getKey());
            if (!PROPERTIES_SET.contains(key)) {
                throw new AnalysisException("'" + e.getKey() + "' is invalid property");
            }
            validParams.put(key, e.getValue());
        }

        this.tableName = StringUtils.trimToEmpty(validParams.get(TABLE));
        if (Strings.isNullOrEmpty(tableName)) {
            throw new AnalysisException("'table' is required for binlog<row>");
        }

        String db = Strings.nullToEmpty(validParams.get(DB)).trim();
        if (db.isEmpty()) {
            ConnectContext ctx = ConnectContext.get();
            if (ctx != null) {
                db = Strings.nullToEmpty(ctx.getDatabase()).trim();
            }
        }
        if (db.isEmpty()) {
            throw new AnalysisException("'db' is required for binlog<row>");
        }
        this.dbName = db;

        this.partitionNamesInfo = parsePartitionNamesInfo(validParams.get(PARTITION));
        this.specifiedTabletIds = parseTabletIds(validParams.get(TABLET));

        DatabaseIf<?> dbIf;
        TableIf tableIf;
        try {
            dbIf = Env.getCurrentEnv().getInternalCatalog().getDbOrMetaException(dbName);
            tableIf = dbIf.getTableOrMetaException(tableName, TableType.OLAP);
        } catch (MetaNotFoundException e) {
            throw new AnalysisException(e.getMessage(), e);
        }
        if (tableIf.getType() != TableType.OLAP) {
            throw new AnalysisException("binlog<row> only supports OLAP table, table=" + tableName);
        }

        this.originTable = (OlapTable) tableIf;
        originTable.readLock();
        try {
            if (!originTable.needRowBinlog()) {
                throw new AnalysisException("binlog<row> is not enabled for table=" + originTable.getName());
            }
            this.rowBinlogTableWrapper = new RowBinlogTableWrapper(originTable);
        } finally {
            originTable.readUnlock();
        }
    }

    @Override
    public String getTableName() {
        return "BinlogTableFunction";
    }

    @Override
    public List<Column> getTableColumns() throws AnalysisException {
        originTable.readLock();
        try {
            return originTable.getRowBinlogMeta().getSchema(true);
        } finally {
            originTable.readUnlock();
        }
    }

    @Override
    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) {
        // Replace tvf FunctionGenTable with the binlog<row> OlapTable wrapper.
        desc.setTable(rowBinlogTableWrapper);
        OlapScanNode olapScanNode = new OlapScanNode(id, desc, "OlapScanNode",
                ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
        olapScanNode.setSelectedIndexInfo(rowBinlogTableWrapper.getBaseIndexId(), false, "binlog<row> read");
        if (specifiedTabletIds != null && !specifiedTabletIds.isEmpty()) {
            olapScanNode.setSpecifiedTabletIds(specifiedTabletIds);
        }
        // Resolve partition names to IDs, same pattern as Nereids PhysicalPlanTranslator.
        if (partitionNamesInfo != null && !partitionNamesInfo.getPartitionNames().isEmpty()) {
            List<Long> partitionIds = Lists.newArrayList();
            originTable.readLock();
            try {
                for (String partName : partitionNamesInfo.getPartitionNames()) {
                    Partition partition = originTable.getPartition(partName);
                    if (partition == null) {
                        throw new IllegalStateException("Partition not found: " + partName);
                    }
                    partitionIds.add(partition.getId());
                }
            } finally {
                originTable.readUnlock();
            }
            olapScanNode.setSelectedPartitionIds(partitionIds);
        } else {
            try {
                olapScanNode.computePartitionInfo();
            } catch (AnalysisException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
        return olapScanNode;
    }

    private static PartitionNamesInfo parsePartitionNamesInfo(String partitions) throws AnalysisException {
        if (Strings.isNullOrEmpty(partitions)) {
            return null;
        }
        List<String> partitionNames = Lists.newArrayList(
                partitions.split(",", -1)).stream().map(String::trim).filter(s -> !s.isEmpty()).collect(
                Collectors.toList());
        if (partitionNames.isEmpty()) {
            throw new AnalysisException("Invalid partition names: " + partitions);
        }
        return new PartitionNamesInfo(false, partitionNames);
    }

    private static Set<Long> parseTabletIds(String tabletIds) throws AnalysisException {
        if (Strings.isNullOrEmpty(tabletIds)) {
            return null;
        }
        try {
            Set<Long> tabletIdSet = Lists.newArrayList(tabletIds.split(",", -1)).stream().map(String::trim).filter(
                    s -> !s.isEmpty()).map(Long::parseLong).collect(Collectors.toSet());
            if (tabletIdSet.isEmpty()) {
                throw new AnalysisException("Invalid tablet ids: " + tabletIds);
            }
            return tabletIdSet;
        } catch (NumberFormatException e) {
            throw new AnalysisException("Invalid tablet ids: " + tabletIds);
        }
    }
}