PhysicalLazyMaterialize.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.processor.post.materialize.MaterializeSource;
import org.apache.doris.nereids.properties.DataTrait.Builder;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.algebra.Relation;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.statistics.Statistics;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
lazy materialize node
*/
public class PhysicalLazyMaterialize<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_TYPE> {
private final Map<Relation, List<Slot>> relationToLazySlotMap;
private final BiMap<Relation, SlotReference> relationToRowId;
private final Map<Slot, MaterializeSource> materializeMap;
private final List<Slot> materializedSlots;
private final List<Slot> materializeInput;
private final List<Slot> materializeOutput;
/**
* The following four fields are used by BE to perform the actual lazy fetch.
* They are indexed by relation: index i corresponds to relations.get(i).
*
* Example:
* SQL: SELECT t1.a, t1.b, t2.c, t2.d FROM t1 JOIN t2 ON ... WHERE t1.a > 5
* Assume t1.b and t2.d are lazily materialized (fetched after filtering).
*
* materializedSlots (non-lazy, computed eagerly) = [t1.a, t2.c]
* Output slot order = [t1.a(0), t2.c(1), t1.b(2), t2.d(3)]
*
* rowIdList = [row_id_t1, row_id_t2]
* The row-id slots passed to BE to locate the original rows.
*
* relations = [t1, t2]
*
* lazyColumns = [[Column(b)], [Column(d)]]
* For each relation, the Column objects to be lazily fetched.
*
* lazyBaseColumnIndices = [[colIdxOf(b) in t1], [colIdxOf(d) in t2]]
* For each relation, the physical column index inside the table
* for each lazy column (used by BE to locate the column on disk).
*
* lazySlotLocations = [[2], [3]]
* A two-level array: the outer level is indexed by relation
* (same as relations / rowIdList), and the inner level lists the
* output-tuple position for each lazy column of that relation.
* Two levels are needed because a single relation can have
* multiple lazy columns. For example, if both t1.b and t1.e
* were lazy, the entry for t1 would be [2, 4] (positions of b
* and e in the output tuple), while t2 remains [3].
* BE uses each position to know which output slot to fill in
* after fetching the column value from disk.
*/
private final List<Slot> rowIdList;
private List<List<Column>> lazyColumns = new ArrayList<>();
private List<List<Integer>> lazySlotLocations = new ArrayList<>();
private List<List<Integer>> lazyBaseColumnIndices = new ArrayList<>();
private final List<Relation> relations;
/**
* constructor
*/
public PhysicalLazyMaterialize(CHILD_TYPE child,
List<Slot> materializeInput,
List<Slot> materializedSlots,
Map<Relation, List<Slot>> relationToLazySlotMap,
BiMap<Relation, SlotReference> relationToRowId,
Map<Slot, MaterializeSource> materializeMap) {
this(child, materializeInput, materializedSlots, relationToLazySlotMap,
relationToRowId, materializeMap, null, null);
}
/**
* constructor
*/
public PhysicalLazyMaterialize(CHILD_TYPE child,
List<Slot> materializeInput,
List<Slot> materializedSlots,
Map<Relation, List<Slot>> relationToLazySlotMap,
BiMap<Relation, SlotReference> relationToRowId,
Map<Slot, MaterializeSource> materializeMap,
PhysicalProperties physicalProperties, Statistics statistics) {
super(PlanType.PHYSICAL_MATERIALIZE, Optional.empty(),
null, physicalProperties, statistics, child);
this.materializeInput = materializeInput;
this.relationToLazySlotMap = relationToLazySlotMap;
this.relationToRowId = relationToRowId;
this.materializedSlots = ImmutableList.copyOf(materializedSlots);
this.materializeMap = materializeMap;
lazySlotLocations = new ArrayList<>();
lazyBaseColumnIndices = new ArrayList<>();
lazyColumns = new ArrayList<>();
ImmutableList.Builder<Slot> outputBuilder = ImmutableList.builder();
outputBuilder.addAll(materializedSlots);
int idx = materializedSlots.size();
int loc = idx;
ImmutableList.Builder<Slot> rowIdListBuilder = ImmutableList.builder();
ImmutableList.Builder<Relation> relationListBuilder = ImmutableList.builder();
for (; idx < materializeInput.size(); idx++) {
Slot rowId = materializeInput.get(idx);
rowIdListBuilder.add(rowId);
Relation rel = relationToRowId.inverse().get(rowId);
relationListBuilder.add(rel);
TableIf relationTable;
if (rel instanceof CatalogRelation) {
relationTable = ((CatalogRelation) rel).getTable();
} else if (rel instanceof PhysicalTVFRelation) {
relationTable = ((PhysicalTVFRelation) rel).getFunction().getTable();
} else {
throw new AnalysisException("Unsupported relation type: " + rel);
}
List<Column> lazyColumnForRel = new ArrayList<>();
lazyColumns.add(lazyColumnForRel);
List<Integer> lazyBaseColumnIdxForRel = new ArrayList<>();
lazyBaseColumnIndices.add(lazyBaseColumnIdxForRel);
List<Integer> lazySlotLocationForRel = new ArrayList<>();
lazySlotLocations.add(lazySlotLocationForRel);
for (Slot lazySlot : relationToLazySlotMap.get(rel)) {
// Set originalColumn on the lazy slot so that createSlotDesc can write
// colUniqueId into the thrift SlotDescriptor — BE needs it to resolve
// the column during remote fetch.
Column originalColumn = materializeMap.get(lazySlot).baseSlot.getOriginalColumn().get();
outputBuilder.add(((SlotReference) lazySlot).withColumn(originalColumn));
lazyColumnForRel.add(originalColumn);
lazyBaseColumnIdxForRel.add(relationTable.getBaseColumnIdxByName(lazySlot.getName()));
lazySlotLocationForRel.add(loc);
loc++;
}
}
relations = relationListBuilder.build();
rowIdList = rowIdListBuilder.build();
this.materializeOutput = outputBuilder.build();
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPhysicalLazyMaterialize(this, context);
}
@Override
public List<? extends Expression> getExpressions() {
return materializedSlots;
}
@Override
public List<Slot> computeOutput() {
return materializeOutput;
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return null;
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return null;
}
@Override
public void computeUnique(Builder builder) {
}
@Override
public void computeUniform(Builder builder) {
}
@Override
public void computeEqualSet(Builder builder) {
}
@Override
public void computeFd(Builder builder) {
}
@Override
public Plan withChildren(List<Plan> children) {
return new PhysicalLazyMaterialize<>(children.get(0),
materializeInput, materializedSlots, relationToLazySlotMap,
relationToRowId, materializeMap, null, null);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("PhysicalLazyMaterialize [Output= (")
.append(getOutput()).append("), lazySlots= (");
for (Map.Entry<Relation, List<Slot>> entry : relationToLazySlotMap.entrySet()) {
builder.append(entry.getValue());
}
builder.append(")]");
return builder.toString();
}
@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
return new PhysicalLazyMaterialize(children.get(0), materializeInput, materializedSlots, relationToLazySlotMap,
relationToRowId, materializeMap, physicalProperties, statistics);
}
@Override
public String shapeInfo() {
StringBuilder shapeBuilder = new StringBuilder();
List<Slot> lazySlots = new ArrayList<>();
for (List<Slot> slots : relationToLazySlotMap.values()) {
lazySlots.addAll(slots);
}
lazySlots = lazySlots.stream().sorted(new Comparator<Slot>() {
@Override
public int compare(Slot slot, Slot t1) {
return slot.shapeInfo().compareTo(t1.shapeInfo());
}
}).collect(Collectors.toList());
shapeBuilder.append(this.getClass().getSimpleName())
.append("[").append("materializedSlots:")
.append(ExpressionUtils.slotListShapeInfo(materializedSlots))
.append(" lazySlots:")
.append(ExpressionUtils.slotListShapeInfo(lazySlots));
shapeBuilder.append("]");
return shapeBuilder.toString();
}
public List<Relation> getRelations() {
return relations;
}
public List<List<Column>> getLazyColumns() {
return lazyColumns;
}
public List<List<Integer>> getLazySlotLocations() {
return lazySlotLocations;
}
public List<List<Integer>> getLazyBaseColumnIndices() {
return lazyBaseColumnIndices;
}
public List<Slot> getRowIds() {
return rowIdList;
}
}