RollupSelector.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.planner;

import org.apache.doris.alter.MaterializedViewHandler;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;


public final class RollupSelector {
    private static final Logger LOG = LogManager.getLogger(RollupSelector.class);

    // Rollup's table info.
    private final TupleDescriptor tupleDesc;
    private final OlapTable table;
    private final Analyzer analyzer;

    public RollupSelector(Analyzer analyzer, TupleDescriptor tupleDesc, OlapTable table) {
        this.analyzer = analyzer;
        this.tupleDesc = tupleDesc;
        this.table = table;
    }

    public long selectBestRollup(
            Collection<Long> partitionIds, List<Expr> conjuncts, boolean isPreAggregation)
            throws UserException {
        Preconditions.checkArgument(partitionIds != null, "Paritition can't be null.");

        ConnectContext connectContext = ConnectContext.get();
        if (connectContext != null && connectContext.getSessionVariable().isUseV2Rollup()) {
            // if user set `use_v2_rollup` variable to true, and there is a segment v2 rollup,
            // just return the segment v2 rollup, because user want to check the v2 format data.
            String v2RollupIndexName = MaterializedViewHandler.NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + table.getName();
            Long v2RollupIndexId = table.getIndexIdByName(v2RollupIndexName);
            if (v2RollupIndexId != null) {
                return v2RollupIndexId;
            }
        }
        // Get first partition to select best prefix index rollups,
        // because MaterializedIndex ids in one rollup's partitions are all same.
        final List<Long> bestPrefixIndexRollups = selectBestPrefixIndexRollup(conjuncts, isPreAggregation);
        return selectBestRowCountRollup(bestPrefixIndexRollups, partitionIds);
    }

    private long selectBestRowCountRollup(List<Long> bestPrefixIndexRollups, Collection<Long> partitionIds) {
        long minRowCount = Long.MAX_VALUE;
        long selectedIndexId = 0;
        for (Long indexId : bestPrefixIndexRollups) {
            long rowCount = 0;
            for (Long partitionId : partitionIds) {
                rowCount += table.getPartition(partitionId).getIndex(indexId).getRowCount();
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("rowCount={} for table={}", rowCount, indexId);
            }
            if (rowCount < minRowCount) {
                minRowCount = rowCount;
                selectedIndexId = indexId;
            } else if (rowCount == minRowCount) {
                // check column number, select one minimum column number
                int selectedColumnSize = table.getSchemaByIndexId(selectedIndexId).size();
                int currColumnSize = table.getSchemaByIndexId(indexId).size();
                if (currColumnSize < selectedColumnSize) {
                    selectedIndexId = indexId;
                }
            }
        }
        String tableName = table.getName();
        String v2RollupIndexName = MaterializedViewHandler.NEW_STORAGE_FORMAT_INDEX_NAME_PREFIX + tableName;
        Long v2RollupIndexId = table.getIndexIdByName(v2RollupIndexName);
        long baseIndexId = table.getBaseIndexId();
        if (v2RollupIndexId != null && v2RollupIndexId == selectedIndexId) {
            // if the selectedIndexId is v2RollupIndex
            // but useV2Rollup is false, use baseIndexId as selectedIndexId
            // just make sure to use baseIndex instead of v2RollupIndex if the useV2Rollup is false
            selectedIndexId = baseIndexId;
        }
        return selectedIndexId;
    }

    private List<Long> selectBestPrefixIndexRollup(List<Expr> conjuncts, boolean isPreAggregation)
            throws UserException {
        final List<String> outputColumns = Lists.newArrayList();
        for (SlotDescriptor slot : tupleDesc.getMaterializedSlots()) {
            Column col = slot.getColumn();
            outputColumns.add(col.getName());
        }

        final List<MaterializedIndex> rollups = table.getVisibleIndex();
        if (LOG.isDebugEnabled()) {
            LOG.debug("num of rollup(base included): {}, pre aggr: {}", rollups.size(), isPreAggregation);
        }

        // 1. find all rollup indexes which contains all tuple columns
        final List<MaterializedIndex> rollupsContainsOutput = Lists.newArrayList();
        final List<Column> baseTableColumns = table.getKeyColumnsByIndexId(table.getBaseIndexId());
        for (MaterializedIndex rollup : rollups) {
            final Set<String> rollupColumns = Sets.newHashSet();
            table.getSchemaByIndexId(rollup.getId(), true)
                    .forEach(column -> rollupColumns.add(column.getName()));

            if (rollupColumns.containsAll(outputColumns)) {
                // If preAggregation is off, so that we only can use base table
                // or those rollup tables which key columns is the same with base table
                // (often in different order)
                if (isPreAggregation) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("preAggregation is on. add index {} which contains all output columns",
                                rollup.getId());
                    }
                    rollupsContainsOutput.add(rollup);
                } else if (table.getKeyColumnsByIndexId(rollup.getId()).size() == baseTableColumns.size()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("preAggregation is off, but index {} have same key columns with base index.",
                                rollup.getId());
                    }
                    rollupsContainsOutput.add(rollup);
                }
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("exclude index {} because it does not contain all output columns", rollup.getId());
                }
            }
        }

        if (rollupsContainsOutput.size() == 0) {
            throw new UserException("Can't find candicate rollup contains all output columns.");
        }

        // 2. find all rollups which match the prefix most based on predicates column
        // from containTupleIndices.
        final Set<String> equivalenceColumns = Sets.newHashSet();
        final Set<String> unequivalenceColumns = Sets.newHashSet();
        collectColumns(conjuncts, equivalenceColumns, unequivalenceColumns);
        final List<Long> rollupsMatchingBestPrefixIndex = Lists.newArrayList();
        matchPrefixIndex(rollupsContainsOutput, rollupsMatchingBestPrefixIndex,
                         equivalenceColumns, unequivalenceColumns);

        if (rollupsMatchingBestPrefixIndex.isEmpty()) {
            rollupsContainsOutput.forEach(n -> rollupsMatchingBestPrefixIndex.add(n.getId()));
        }

        // 3. sorted the final candidate indexes by index id
        // this is to make sure that candidate indexes find in all partitions will be returned in same order
        Collections.sort(rollupsMatchingBestPrefixIndex, new Comparator<Long>() {
            @Override
            public int compare(Long id1, Long id2) {
                return (int) (id1 - id2);
            }
        });
        return rollupsMatchingBestPrefixIndex;
    }

    private void matchPrefixIndex(List<MaterializedIndex> candidateRollups,
                                 List<Long> rollupsMatchingBestPrefixIndex,
                                 Set<String> equivalenceColumns,
                                 Set<String> unequivalenceColumns) {
        if (equivalenceColumns.size() == 0 && unequivalenceColumns.size() == 0) {
            return;
        }
        int maxPrefixMatchCount = 0;
        int prefixMatchCount;
        for (MaterializedIndex index : candidateRollups) {
            prefixMatchCount = 0;
            for (Column col : table.getSchemaByIndexId(index.getId())) {
                if (equivalenceColumns.contains(col.getName())) {
                    prefixMatchCount++;
                } else if (unequivalenceColumns.contains(col.getName())) {
                    // Unequivalence predicate's columns can match only first column in rollup.
                    prefixMatchCount++;
                    break;
                } else {
                    break;
                }
            }

            if (prefixMatchCount == maxPrefixMatchCount) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("s3: find a equal prefix match index {}. match count: {}",
                            index.getId(), prefixMatchCount);
                }
                rollupsMatchingBestPrefixIndex.add(index.getId());
            } else if (prefixMatchCount > maxPrefixMatchCount) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("s3: find a better prefix match index {}. match count: {}",
                            index.getId(), prefixMatchCount);
                }
                maxPrefixMatchCount = prefixMatchCount;
                rollupsMatchingBestPrefixIndex.clear();
                rollupsMatchingBestPrefixIndex.add(index.getId());
            }
        }
    }

    private void collectColumns(
            List<Expr> conjuncts, Set<String> equivalenceColumns, Set<String> unequivalenceColumns) {

        // 1. Get columns which has predicate on it.
        for (Expr expr : conjuncts) {
            if (!isPredicateUsedForPrefixIndex(expr, false)) {
                continue;
            }
            for (SlotDescriptor slot : tupleDesc.getMaterializedSlots()) {
                if (expr.isBound(slot.getId())) {
                    if (!isEquivalenceExpr(expr)) {
                        unequivalenceColumns.add(slot.getColumn().getName());
                    } else {
                        equivalenceColumns.add(slot.getColumn().getName());
                    }
                    break;
                }
            }
        }

        // 2. Equal join predicates when pushing inner child.
        List<Expr> eqJoinPredicate = analyzer.getEqJoinConjuncts(tupleDesc.getId());
        for (Expr expr : eqJoinPredicate) {
            if (!isPredicateUsedForPrefixIndex(expr, true)) {
                continue;
            }
            for (SlotDescriptor slot : tupleDesc.getMaterializedSlots()) {
                for (int i = 0; i < 2; i++) {
                    if (expr.getChild(i).isBound(slot.getId())) {
                        equivalenceColumns.add(slot.getColumn().getName());
                        break;
                    }
                }
            }
        }
    }

    private boolean isEquivalenceExpr(Expr expr) {
        if (expr instanceof InPredicate) {
            return true;
        }
        if (expr instanceof BinaryPredicate) {
            final BinaryPredicate predicate = (BinaryPredicate) expr;
            if (predicate.getOp().isEquivalence()) {
                return true;
            }
        }
        return false;
    }

    private boolean isPredicateUsedForPrefixIndex(Expr expr, boolean isJoinConjunct) {
        if (!(expr instanceof InPredicate)
                && !(expr instanceof BinaryPredicate)) {
            return false;
        }
        if (expr instanceof InPredicate) {
            return isInPredicateUsedForPrefixIndex((InPredicate) expr);
        } else if (expr instanceof BinaryPredicate) {
            if (isJoinConjunct) {
                return isEqualJoinConjunctUsedForPrefixIndex((BinaryPredicate) expr);
            } else {
                return isBinaryPredicateUsedForPrefixIndex((BinaryPredicate) expr);
            }
        }
        return true;
    }

    private boolean isEqualJoinConjunctUsedForPrefixIndex(BinaryPredicate expr) {
        Preconditions.checkArgument(expr.getOp().isEquivalence());
        if (expr.isAuxExpr()) {
            return false;
        }
        for (Expr child : expr.getChildren()) {
            for (SlotDescriptor slot : tupleDesc.getMaterializedSlots()) {
                if (child.isBound(slot.getId()) && isSlotRefNested(child)) {
                    return true;
                }
            }
        }
        return false;
    }

    private boolean isBinaryPredicateUsedForPrefixIndex(BinaryPredicate expr) {
        if (expr.isAuxExpr() || expr.getOp().isUnequivalence()) {
            return false;
        }
        return  (isSlotRefNested(expr.getChild(0)) && expr.getChild(1).isConstant())
                || (isSlotRefNested(expr.getChild(1)) && expr.getChild(0).isConstant());
    }

    private boolean isInPredicateUsedForPrefixIndex(InPredicate expr) {
        if (expr.isNotIn()) {
            return false;
        }
        return isSlotRefNested(expr.getChild(0)) && expr.isLiteralChildren();
    }

    private boolean isSlotRefNested(Expr expr) {
        while (expr instanceof CastExpr) {
            expr = expr.getChild(0);
        }
        return expr instanceof SlotRef;
    }
}