LazyMaterializeTopN.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.processor.post.materialize;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Type;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.processor.post.PlanPostProcessor;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLazyMaterialize;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;

import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
 * post rule to do lazy materialize
 */
public class LazyMaterializeTopN extends PlanPostProcessor {
    /* BE do not support pattern:
       union
          -->materialize
                   -->topn
                        -->scan1
          -->materialize
                   -->topn
                        -->scan2
       when we create materializeNode for the first union child, set hasMaterialized=true
       to avoid generating materializeNode for other union's children
    */
    private boolean hasMaterialized = false;

    @Override
    public Plan visitPhysicalTopN(PhysicalTopN topN, CascadesContext ctx) {
        if (hasMaterialized) {
            return topN;
        }
        /*
         topn(output=[x] orderkey=[b])
             ->project(a as x)
                ->T(a, b)
         'x' can be lazy materialized.
         materializeMap: x->(T, a)
         */
        Map<Slot, MaterializeSource> materializeMap = new HashMap<>();
        List<Slot> materializedSlots = new ArrayList<>();
        // find the slots which can be lazy materialized
        for (Slot slot : topN.getOutput()) {
            Optional<MaterializeSource> source = computeMaterializeSource(topN, (SlotReference) slot);
            if (source.isPresent()) {
                SlotReference baseSlot = source.get().baseSlot;
                if (source.get().baseSlot.hasSubColPath()) {
                    slot = baseSlot.withExprId(slot.getExprId());
                }
                materializeMap.put(slot, source.get());
            } else {
                materializedSlots.add(slot);
            }
        }
        // find out the slots which are worth doing lazy materialization
        List<Slot> lazyMaterializeSlots = filterSlotsForLazyMaterialization(materializeMap);
        if (lazyMaterializeSlots.isEmpty()) {
            return topN;
        }

        Map<CatalogRelation, List<Slot>> relationToLazySlotMap = new HashMap<>();
        for (Slot slot : lazyMaterializeSlots) {
            MaterializeSource source = materializeMap.get(slot);
            relationToLazySlotMap.computeIfAbsent(source.relation, relation -> new ArrayList<>()).add(slot);
        }

        Plan result = topN;
        List<Slot> originOutput = topN.getOutput();
        BiMap<CatalogRelation, SlotReference> relationToRowId = HashBiMap.create(relationToLazySlotMap.size());
        HashSet<SlotReference> rowIdSet = new HashSet<>();
        for (CatalogRelation relation : relationToLazySlotMap.keySet()) {
            Column rowIdCol = new Column(Column.GLOBAL_ROWID_COL + relation.getTable().getName(),
                    Type.STRING, false, null, false,
                    "", relation.getTable().getName() + ".global_row_id");
            SlotReference rowIdSlot = SlotReference.fromColumn(relation.getTable(), rowIdCol,
                    relation.getQualifier());
            result = result.accept(new LazySlotPruning(),
                    new LazySlotPruning.Context((PhysicalCatalogRelation) relation,
                            rowIdSlot, relationToLazySlotMap.get(relation)));
            relationToRowId.put(relation, rowIdSlot);
            rowIdSet.add(rowIdSlot);
        }

        // materialize.child.output requires
        // rowId only appears once.
        // that is [a, rowId1, b rowId1] is not acceptable
        List<SlotReference> materializeInput = moveRowIdsToTail(result.getOutput(), rowIdSet);

        if (materializeInput == null) {
            /*
            topn
              -->any
            =>
            project
               -->materialize
                   -->topn
                     -->any
             */
            result = new PhysicalLazyMaterialize(result, result.getOutput(),
                    materializedSlots, relationToLazySlotMap, relationToRowId, materializeMap,
                    null, ((AbstractPlan) result).getStats());
            hasMaterialized = true;
        } else {
            /*
            topn
              -->any
            =>
            project
              -->materialize
                -->project
                  -->topn
                     -->any
             */
            List<Slot> reOrderedMaterializedSlots = new ArrayList<>();
            for (Slot slot : materializeInput) {
                if (rowIdSet.contains(slot)) {
                    break;
                }
                reOrderedMaterializedSlots.add(slot);
            }
            result = new PhysicalProject(materializeInput, null, result);
            result = new PhysicalLazyMaterialize(result, materializeInput,
                    reOrderedMaterializedSlots, relationToLazySlotMap, relationToRowId, materializeMap,
                    null, ((AbstractPlan) result).getStats());
            hasMaterialized = true;
        }
        result = new PhysicalProject(originOutput, null, result);
        return result;
    }

    /*
        [a, r1, r2, b, r2] => [a, b, r1, r2]
        move all rowIds to tail, and remove duplicated rowIds
     */
    private List<SlotReference> moveRowIdsToTail(List<Slot> slots, Set<SlotReference> rowIds) {
        List<SlotReference> reArrangedSlots = new ArrayList<>();
        List<SlotReference> reArrangedRowIds = new ArrayList<>();
        boolean moved = false;
        boolean meetRowId = false;
        for (Slot slot : slots) {
            if (rowIds.contains(slot)) {
                if (!reArrangedRowIds.contains(slot)) {
                    reArrangedRowIds.add((SlotReference) slot);
                }
                meetRowId = true;
            } else {
                if (meetRowId) {
                    moved = true;
                }
                reArrangedSlots.add((SlotReference) slot);
            }
        }
        if (!moved) {
            return null;
        }
        reArrangedSlots.addAll(reArrangedRowIds);
        return reArrangedSlots;
    }

    private List<Slot> filterSlotsForLazyMaterialization(Map<Slot, MaterializeSource> materializeMap) {
        return new ArrayList<>(materializeMap.keySet());
    }

    private Optional<MaterializeSource> computeMaterializeSource(PhysicalTopN topN, SlotReference slot) {
        MaterializeProbeVisitor probe = new MaterializeProbeVisitor();
        MaterializeProbeVisitor.ProbeContext context = new MaterializeProbeVisitor.ProbeContext(slot);
        return probe.visit(topN, context);
    }

}