LogicalOlapScanToPhysicalOlapScan.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.implementation;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.nereids.properties.DistributionSpec;
import org.apache.doris.nereids.properties.DistributionSpecHash;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.properties.DistributionSpecStorageAny;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Optional;
/**
 * Implementation rule that convert logical OlapScan to physical OlapScan.
 */
public class LogicalOlapScanToPhysicalOlapScan extends OneImplementationRuleFactory {
    @Override
    public Rule build() {
        return logicalOlapScan().then(olapScan ->
                new PhysicalOlapScan(
                        olapScan.getRelationId(),
                        olapScan.getTable(),
                        olapScan.getQualifier(),
                        olapScan.getSelectedIndexId(),
                        olapScan.getSelectedTabletIds(),
                        olapScan.getSelectedPartitionIds(),
                        convertDistribution(olapScan),
                        olapScan.getPreAggStatus(),
                        olapScan.getOutputByIndex(olapScan.getTable().getBaseIndexId()),
                        Optional.empty(),
                        olapScan.getLogicalProperties(),
                        olapScan.getTableSample(),
                        olapScan.getOperativeSlots(),
                        olapScan.getVirtualColumns(),
                        olapScan.getScoreOrderKeys(),
                        olapScan.getScoreLimit(),
                        olapScan.getAnnOrderKeys(),
                        olapScan.getAnnLimit())
        ).toRule(RuleType.LOGICAL_OLAP_SCAN_TO_PHYSICAL_OLAP_SCAN_RULE);
    }
    /** convertDistribution */
    public static DistributionSpec convertDistribution(LogicalOlapScan olapScan) {
        OlapTable olapTable = olapScan.getTable();
        DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
        ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
        // When there are multiple partitions, olapScan tasks of different buckets are dispatched in
        // rounded robin algorithm. Therefore, the hashDistributedSpec can be broken except they are in
        // the same stable colocateGroup(CG)
        boolean isBelongStableCG = colocateTableIndex.isColocateTable(olapTable.getId())
                && !colocateTableIndex.isGroupUnstable(colocateTableIndex.getGroup(olapTable.getId()));
        boolean isSelectUnpartition = olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED
                || olapScan.getSelectedPartitionIds().size() == 1;
        // TODO: find a better way to handle both tablet num == 1 and colocate table together in future
        if (distributionInfo instanceof HashDistributionInfo && (isBelongStableCG || isSelectUnpartition)) {
            if (olapScan.getSelectedIndexId() != olapScan.getTable().getBaseIndexId()) {
                HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
                List<Slot> output = olapScan.getOutput();
                List<Slot> baseOutput = olapScan.getOutputByIndex(olapScan.getTable().getBaseIndexId());
                List<ExprId> hashColumns = Lists.newArrayList();
                for (Column column : hashDistributionInfo.getDistributionColumns()) {
                    for (Slot slot : output) {
                        Column originalColumn = ((SlotReference) slot).getOriginalColumn().get();
                        String origName = originalColumn.tryGetBaseColumnName();
                        if (origName.equals(column.getName())) {
                            hashColumns.add(slot.getExprId());
                        }
                    }
                }
                if (hashColumns.size() != hashDistributionInfo.getDistributionColumns().size()) {
                    for (Column column : hashDistributionInfo.getDistributionColumns()) {
                        for (Slot slot : baseOutput) {
                            // If the length of the column in the bucket key changes after DDL, the length cannot be
                            // determined. As a result, some bucket fields are lost in the query execution plan.
                            // So here we use the column name to avoid this problem
                            if (((SlotReference) slot).getOriginalColumn().get().getName()
                                    .equalsIgnoreCase(column.getName())) {
                                hashColumns.add(slot.getExprId());
                            }
                        }
                    }
                }
                return new DistributionSpecHash(hashColumns, ShuffleType.NATURAL, olapScan.getTable().getId(),
                        olapScan.getSelectedIndexId(), Sets.newLinkedHashSet(olapScan.getSelectedPartitionIds()));
            } else {
                HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
                List<Slot> output = olapScan.getOutput();
                List<ExprId> hashColumns = Lists.newArrayList();
                for (Column column : hashDistributionInfo.getDistributionColumns()) {
                    for (Slot slot : output) {
                        // If the length of the column in the bucket key changes after DDL, the length cannot be
                        // determined. As a result, some bucket fields are lost in the query execution plan.
                        // So here we use the column name to avoid this problem
                        if (((SlotReference) slot).getOriginalColumn().isPresent()
                                && ((SlotReference) slot).getOriginalColumn().get().getName()
                                .equalsIgnoreCase(column.getName())) {
                            hashColumns.add(slot.getExprId());
                        }
                    }
                }
                return new DistributionSpecHash(hashColumns, ShuffleType.NATURAL, olapScan.getTable().getId(),
                        olapScan.getSelectedIndexId(), Sets.newLinkedHashSet(olapScan.getSelectedPartitionIds()));
            }
        } else {
            // RandomDistributionInfo
            return DistributionSpecStorageAny.INSTANCE;
        }
    }
}