PartitionPrunerV2Base.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.planner;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.common.AnalysisException;

import com.google.common.base.Preconditions;
import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

public abstract class PartitionPrunerV2Base implements PartitionPruner {
    protected final Map<Long, PartitionItem> idToPartitionItem;
    protected final List<Column> partitionColumns;
    protected final Map<String, ColumnRange> columnNameToRange;
    // used for single column partition
    protected RangeMap<ColumnBound, UniqueId> singleColumnRangeMap = null;
    // Flag to indicate if this pruner is for hive partition or not.
    protected boolean isHive = false;
    // currently only used for list partition
    private Map.Entry<Long, PartitionItem> defaultPartition;

    // Only called in PartitionPruneV2ByShortCircuitPlan constructor
    PartitionPrunerV2Base() {
        this.idToPartitionItem = null;
        this.partitionColumns = null;
        this.columnNameToRange = null;
    }

    public PartitionPrunerV2Base(Map<Long, PartitionItem> idToPartitionItem,
            List<Column> partitionColumns,
            Map<String, ColumnRange> columnNameToRange) {
        this.idToPartitionItem = idToPartitionItem;
        this.partitionColumns = partitionColumns;
        this.columnNameToRange = columnNameToRange;
        findDefaultPartition(idToPartitionItem);
    }

    // pass singleColumnRangeMap from outside
    public PartitionPrunerV2Base(Map<Long, PartitionItem> idToPartitionItem,
            List<Column> partitionColumns,
            Map<String, ColumnRange> columnNameToRange,
            RangeMap<ColumnBound, UniqueId> singleColumnRangeMap) {
        this.idToPartitionItem = idToPartitionItem;
        this.partitionColumns = partitionColumns;
        this.columnNameToRange = columnNameToRange;
        this.singleColumnRangeMap = singleColumnRangeMap;
        findDefaultPartition(idToPartitionItem);
    }

    private Collection<Long> handleDefaultPartition(Collection<Long> result) {
        if (this.defaultPartition != null) {
            Set<Long> r = result.stream().collect(Collectors.toSet());
            r.add(this.defaultPartition.getKey());
            return r;
        }
        return result;
    }

    private void findDefaultPartition(Map<Long, PartitionItem> idToPartitionItem) {
        this.defaultPartition = idToPartitionItem.entrySet().stream()
                                .filter(entry -> (entry.getValue().isDefaultPartition()))
                                .findAny()
                                .orElse(null);
    }

    public PartitionPrunerV2Base(Map<Long, PartitionItem> idToPartitionItem,
                                 List<Column> partitionColumns,
                                 Map<String, ColumnRange> columnNameToRange,
                                 RangeMap<ColumnBound, UniqueId> singleColumnRangeMap,
                                 boolean isHive) {
        this.idToPartitionItem = idToPartitionItem;
        this.partitionColumns = partitionColumns;
        this.columnNameToRange = columnNameToRange;
        this.singleColumnRangeMap = singleColumnRangeMap;
        this.isHive = isHive;
    }

    @Override
    public Collection<Long> prune() throws AnalysisException {
        Map<Column, FinalFilters> columnToFilters = Maps.newHashMap();
        for (Column column : partitionColumns) {
            ColumnRange columnRange = columnNameToRange.get(column.getName());
            if (columnRange == null) {
                columnToFilters.put(column, FinalFilters.noFilters());
            } else {
                columnToFilters.put(column, getFinalFilters(columnRange, column));
            }
        }

        Collection<Long> result;
        if (partitionColumns.size() == 1) {
            result = pruneSingleColumnPartition(columnToFilters);
        } else if (partitionColumns.size() > 1) {
            result = pruneMultipleColumnPartition(columnToFilters);
        } else {
            result = Lists.newArrayList();
        }

        return handleDefaultPartition(result);
    }

    abstract void genSingleColumnRangeMap();

    /**
     * Handle conjunctive and disjunctive `is null` predicates.
     */
    abstract FinalFilters getFinalFilters(ColumnRange columnRange,
                                          Column column) throws AnalysisException;

    /**
     * It's a little complex to unify the logic of pruning multiple columns partition for both
     * list and range partitions.
     *
     * The key point is that the list partitions value are the explicit values of partition columns,
     * however, the range bound for a partition column in multiple columns partition is depended on
     * both other partition columns' range values and the range value itself.
     *
     * Let's say we have two partition columns k1, k2:
     * For partition [(1, 5), (1, 10)), the range for k2 is [5, 10).
     * For partition [(1, 5), (2, 10)), the range for k2 is (-∞, +∞).
     * For partition [(1, 10), (2, 5)), the range for k2 is (-∞, 5) union [10, +∞).
     *
     * We could try to compute the range bound of every column in multiple columns partition and
     * unify the logic like pruning multiple list columns partition for multiple range ones.
     */
    abstract Collection<Long> pruneMultipleColumnPartition(
            Map<Column, FinalFilters> columnToFilters) throws AnalysisException;

    /**
     * Now we could unify the logic of pruning single column partition for both list and range
     * partitions.
     */
    private Collection<Long> pruneSingleColumnPartition(Map<Column, FinalFilters> columnToFilters) {
        FinalFilters finalFilters = columnToFilters.get(partitionColumns.get(0));
        switch (finalFilters.type) {
            case CONSTANT_FALSE_FILTERS:
                return Collections.emptySet();
            case HAVE_FILTERS:
                genSingleColumnRangeMap();
                Preconditions.checkNotNull(singleColumnRangeMap);
                return finalFilters.filters.stream()
                        .map(filter -> {
                            RangeMap<ColumnBound, UniqueId> filtered = singleColumnRangeMap.subRangeMap(filter);
                            return filtered.asMapOfRanges().values().stream()
                                    .map(UniqueId::getPartitionId)
                                    .collect(Collectors.toSet());
                        })
                        .flatMap(Set::stream)
                        .collect(Collectors.toSet());
            case NO_FILTERS:
            default:
                return idToPartitionItem.keySet();
        }
    }

    protected static Range<ColumnBound> mapPartitionKeyRange(Range<PartitionKey> fromRange,
            int columnIdx) {
        return mapRange(fromRange,
                partitionKey -> ColumnBound.of(partitionKey.getKeys().get(columnIdx)));
    }

    private static <TO extends Comparable, FROM extends Comparable> Range<TO> mapRange(
            Range<FROM> range, Function<FROM, TO> mapper) {
        TO lower = range.hasLowerBound() ? mapper.apply(range.lowerEndpoint()) : null;
        TO upper = range.hasUpperBound() ? mapper.apply(range.upperEndpoint()) : null;
        if (range.hasUpperBound()) {
            // has upper bound
            if (range.hasLowerBound()) {
                return Range.range(lower, range.lowerBoundType(), upper, range.upperBoundType());
            } else {
                if (range.upperBoundType() == BoundType.OPEN) {
                    return Range.lessThan(upper);
                } else {
                    return Range.atMost(upper);
                }
            }
        } else if (range.hasLowerBound()) {
            // has no upper bound, but has lower bound
            if (range.lowerBoundType() == BoundType.OPEN) {
                return Range.greaterThan(lower);
            } else {
                return Range.atLeast(lower);
            }
        } else {
            // has neither upper nor lower bound
            return Range.all();
        }
    }

    public interface UniqueId {
        long getPartitionId();
    }

    protected static class FinalFilters {
        enum Type {
            // Have no filters, should just return all the partitions.
            NO_FILTERS,
            // Have filters.
            HAVE_FILTERS,
            // Filter predicates are folded to constant false, pruned partitions should be
            // an empty collection.
            CONSTANT_FALSE_FILTERS,
        }

        final Type type;
        final Set<Range<ColumnBound>> filters;

        private FinalFilters(Type type, Set<Range<ColumnBound>> filters) {
            this.type = type;
            this.filters = filters;
        }

        private static final FinalFilters NO_FILTERS = new FinalFilters(Type.NO_FILTERS, null);

        private static final FinalFilters CONSTANT_FALSE_FILTERS = new FinalFilters(Type.CONSTANT_FALSE_FILTERS, null);

        public static FinalFilters noFilters() {
            return NO_FILTERS;
        }

        public static FinalFilters constantFalseFilters() {
            return CONSTANT_FALSE_FILTERS;
        }

        public static FinalFilters create(Set<Range<ColumnBound>> filters) {
            return new FinalFilters(Type.HAVE_FILTERS, filters);
        }
    }
}