RelationUtil.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.util;

import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.systable.SysTable;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * relation util
 */
public class RelationUtil {
    private static final String SYNC_MV_PLANER_DISABLE_RULES = "OLAP_SCAN_PARTITION_PRUNE, PRUNE_EMPTY_PARTITION, "
            + "ELIMINATE_GROUP_BY_KEY_BY_UNIFORM, HAVING_TO_FILTER, ELIMINATE_GROUP_BY, SIMPLIFY_AGG_GROUP_BY, "
            + "MERGE_PERCENTILE_TO_ARRAY, VARIANT_SUB_PATH_PRUNING, INFER_PREDICATES, INFER_AGG_NOT_NULL, "
            + "INFER_SET_OPERATOR_DISTINCT, INFER_FILTER_NOT_NULL, INFER_JOIN_NOT_NULL, PUSH_DOWN_MAX_MIN_FILTER, "
            + "ELIMINATE_SORT, ELIMINATE_AGGREGATE, ELIMINATE_LIMIT, ELIMINATE_SEMI_JOIN, ELIMINATE_NOT_NULL, "
            + "ELIMINATE_JOIN_BY_UK, ELIMINATE_JOIN_BY_FK, ELIMINATE_GROUP_BY_KEY, ELIMINATE_GROUP_BY_KEY_BY_UNIFORM, "
            + "ELIMINATE_FILTER_GROUP_BY_KEY";

    /**
     * get table qualifier
     */
    public static List<String> getQualifierName(ConnectContext context, List<String> nameParts) {
        switch (nameParts.size()) {
            case 1: { // table
                // Use current database name from catalog.
                String tableName = nameParts.get(0);
                CatalogIf catalogIf = context.getCurrentCatalog();
                if (catalogIf == null) {
                    throw new IllegalStateException(
                            "Current catalog is not set. default catalog is: " + context.getDefaultCatalog());
                }
                String catalogName = catalogIf.getName();
                String dbName = context.getDatabase();
                if (Strings.isNullOrEmpty(dbName)) {
                    throw new IllegalStateException("Current database is not set.");
                }
                return ImmutableList.of(catalogName, dbName, tableName);
            }
            case 2: { // db.table
                // Use database name from table name parts.
                CatalogIf catalogIf = context.getCurrentCatalog();
                if (catalogIf == null) {
                    throw new IllegalStateException("Current catalog is not set.");
                }
                String catalogName = catalogIf.getName();
                // if the relation is view, nameParts.get(0) is dbName.
                String dbName = nameParts.get(0);
                String tableName = nameParts.get(1);
                return ImmutableList.of(catalogName, dbName, tableName);
            }
            case 3: { // catalog.db.table
                // Use catalog and database name from name parts.
                String catalogName = nameParts.get(0);
                String dbName = nameParts.get(1);
                String tableName = nameParts.get(2);
                return ImmutableList.of(catalogName, dbName, tableName);
            }
            default:
                throw new IllegalStateException("Table name [" + java.lang.String
                        .join(".", nameParts) + "] is invalid.");
        }
    }

    /**
     * get table
     */
    public static TableIf getTable(List<String> qualifierName, Env env) {
        return getDbAndTable(qualifierName, env).second;
    }

    /**
     * get database and table
     */
    public static Pair<DatabaseIf<?>, TableIf> getDbAndTable(List<String> qualifierName, Env env) {
        String catalogName = qualifierName.get(0);
        String dbName = qualifierName.get(1);
        String tableName = qualifierName.get(2);
        CatalogIf<?> catalog = env.getCatalogMgr().getCatalog(catalogName);
        if (catalog == null) {
            throw new AnalysisException(java.lang.String.format("Catalog %s does not exist.", catalogName));
        }
        try {
            DatabaseIf<TableIf> db = catalog.getDbOrException(dbName, s -> new AnalysisException(
                    "Database [" + dbName + "] does not exist."));
            Pair<String, String> tableNameWithSysTableName
                    = SysTable.getTableNameWithSysTableName(tableName);
            TableIf tbl = db.getTableOrException(tableNameWithSysTableName.first,
                    s -> new AnalysisException(
                            "Table [" + tableName + "] does not exist in database [" + dbName + "]."));
            Optional<TableValuedFunction> sysTable = tbl.getSysTableFunction(catalogName, dbName, tableName);
            if (!Strings.isNullOrEmpty(tableNameWithSysTableName.second) && !sysTable.isPresent()) {
                throw new AnalysisException("Unknown sys table '" + tableName + "'");
            }
            return Pair.of(db, tbl);
        } catch (Throwable e) {
            throw new AnalysisException(e.getMessage(), e.getCause());
        }
    }

    /**
     * get mv used column names of base table
     */
    public static Set<String> getMvUsedColumnNames(MaterializedIndexMeta meta) {
        Set<String> columns = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
        if (meta.getDefineStmt() != null) {
            // get the original create mv sql
            String createMvSql = meta.getDefineStmt().originStmt;
            Optional<String> querySql = new NereidsParser().parseForSyncMv(createMvSql);
            if (querySql.isPresent()) {
                LogicalPlan unboundMvPlan = new NereidsParser().parseSingle(querySql.get());
                ConnectContext connectContext = ConnectContext.get();
                StatementContext statementContext = new StatementContext(connectContext,
                        new OriginStatement(querySql.get(), 0));
                NereidsPlanner planner = new NereidsPlanner(statementContext);
                if (statementContext.getConnectContext().getStatementContext() == null) {
                    statementContext.getConnectContext().setStatementContext(statementContext);
                }
                Set<String> tempDisableRules = connectContext.getSessionVariable().getDisableNereidsRuleNames();
                connectContext.getSessionVariable().setDisableNereidsRules(SYNC_MV_PLANER_DISABLE_RULES);
                connectContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
                LogicalPlan logicalPlan;
                try {
                    // disable rbo sync mv rewrite
                    connectContext.getSessionVariable()
                            .setVarOnce(SessionVariable.ENABLE_SYNC_MV_COST_BASED_REWRITE, "true");
                    // disable constant fold
                    connectContext.getSessionVariable().setVarOnce(SessionVariable.DEBUG_SKIP_FOLD_CONSTANT, "true");
                    planner.planWithLock(unboundMvPlan, PhysicalProperties.ANY,
                            ExplainCommand.ExplainLevel.REWRITTEN_PLAN);
                    logicalPlan = (LogicalPlan) planner.getCascadesContext().getRewritePlan();
                } finally {
                    // after operate, roll back the disable rules
                    connectContext.getSessionVariable().setDisableNereidsRules(String.join(",", tempDisableRules));
                    connectContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
                }
                Map<Boolean, List<Object>> partitionedPlan = logicalPlan
                        .collect(plan -> true)
                        .stream()
                        .collect(Collectors.partitioningBy(
                                plan -> plan instanceof LogicalProject
                                        && ((LogicalProject<?>) plan).child() instanceof LogicalCatalogRelation
                        ));
                List<Object> projects = partitionedPlan.get(true);
                if (projects.isEmpty()) {
                    // for scan
                    partitionedPlan.get(false)
                            .stream()
                            .filter(plan -> plan instanceof LogicalCatalogRelation)
                            .map(plan -> (LogicalCatalogRelation) plan)
                            .forEach(plan -> columns.addAll(logicalPlan.getOutput().stream().map(Slot::getName).collect(
                                    Collectors.toList())));
                } else {
                    // for projects
                    projects
                            .stream()
                            .map(plan -> (LogicalProject<?>) plan)
                            .forEach(plan -> columns.addAll(plan.getInputSlots().stream().map(Slot::getName).collect(
                                    Collectors.toList())));
                }
            } else {
                throw new AnalysisException(String.format("can't parse %s ", createMvSql));
            }
        } else {
            // no define stmt, means mv created by add rollup. assume schema change can handle such case
            // columns.addAll(meta.getSchema(false).stream().map(Column::getName).collect(Collectors.toList()));
        }
        return columns;
    }
}