LineageInfo.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.lineage;

import org.apache.doris.catalog.TableIf;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * This class describes the common in-memory lineage data format used in Doris.
 * Based on this data structure, complete lineage information and corresponding event details can be parsed.
 * */
public class LineageInfo {

    /*
     * Example SQL used below:
     * INSERT INTO tgt_region_revenue
     * SELECT n.n_name AS nation_name,
     *        SUM(l.l_extendedprice * (1 - l.l_discount)) AS revenue
     * FROM customer c
     * JOIN orders o ON c.c_custkey = o.o_custkey
     * JOIN lineitem l ON o.o_orderkey = l.l_orderkey
     * JOIN nation n ON c.c_nationkey = n.n_nationkey
     * JOIN region r ON n.n_regionkey = r.r_regionkey
     * WHERE r.r_name = 'ASIA'
     * GROUP BY n.n_name;
     */

    // the key is the output slot, the value is the shuttled expression which output slot depend directly
    // this is dependent on the ExpressionLineageReplacer
    // Example: nation_name -> {IDENTITY: n.n_name}, revenue -> {AGGREGATION: SUM(l.l_extendedprice * (1-l.l_discount))}
    private Map<SlotReference, SetMultimap<DirectLineageType, Expression>> directLineageMap;
    // inDirectLineageMap stores expressions that indirectly affect output slots. These expressions,
    // which indirectly impact output slots, are categorized as IndirectLineageType.
    // Example: nation_name -> {GROUP_BY: n.n_name} (intersects direct slots), revenue may have no per-output indirects.
    private Map<SlotReference, SetMultimap<IndirectLineageType, Expression>> inDirectLineageMap;
    // datasetIndirectLineageMap stores expressions that affect the whole dataset when some outputs do not directly
    // depend on the referenced slots (e.g. filter or join on non-selected columns).
    // Example: FILTER {r.r_name = 'ASIA'}, JOIN {o.o_orderkey = l.l_orderkey}.
    private Multimap<IndirectLineageType, Expression> datasetIndirectLineageMap;
    // tableLineageSet stores tables that the plan depends on
    // Example: {customer, orders, lineitem, nation, region}.
    private Set<TableIf> tableLineageSet;
    // target table for this lineage event
    // Example: tgt_region_revenue.
    private TableIf targetTable;
    // target columns for this lineage event
    // Example: [nation_name, revenue].
    private List<Slot> targetColumns;
    // query metadata
    // Example: {sourceCommand=InsertIntoTableCommand, queryId=..., database=lineage_tpch}.
    private LineageContext context;

    /**
     * Indirect lineage type - expressions that indirectly affect output slots
     */
    public enum IndirectLineageType {
        // input used in join condition
        JOIN,
        // output is aggregated based on input
        GROUP_BY,
        // input used as a filtering condition
        FILTER,
        // output is sorted based on input field
        SORT,
        // output is windowed based on input field
        WINDOW,
        // input value is used in IF, CASE WHEN or COALESCE statements
        CONDITIONAL
    }

    /**
     * Direct lineage type - how output value relates to input
     */
    public enum DirectLineageType {
        // output value is taken as is from the input
        IDENTITY,
        // output value is transformed source value from input row
        TRANSFORMATION,
        // output value is aggregation of source values from multiple input rows
        AGGREGATION
    }

    public LineageInfo() {
        this.directLineageMap = new HashMap<>();
        this.inDirectLineageMap = new HashMap<>();
        this.tableLineageSet = new HashSet<>();
        this.datasetIndirectLineageMap = HashMultimap.create();
    }

    public Map<SlotReference, SetMultimap<DirectLineageType, Expression>> getDirectLineageMap() {
        return directLineageMap;
    }

    public void setDirectLineageMap(Map<SlotReference, SetMultimap<DirectLineageType, Expression>> directLineageMap) {
        this.directLineageMap = directLineageMap;
    }

    /**
     * Get merged indirect lineage info for each output slot.
     *
     * <p>Examples:
     * <ul>
     *   <li>If dataset-level FILTER has {l_shipdate >= '1995-01-01'} and
     *       output slot price has GROUP_BY {l_orderkey}, the merged result for price
     *       contains both FILTER and GROUP_BY expressions.</li>
     *   <li>If dataset-level SORT has {l_shipdate} and output slot orderkey has no
     *       per-output indirect lineage, the merged result for orderkey still contains SORT.</li>
     * </ul>
     */
    public Map<SlotReference, SetMultimap<IndirectLineageType, Expression>> getInDirectLineageMap() {
        if (datasetIndirectLineageMap.isEmpty()) {
            return inDirectLineageMap;
        }
        Map<SlotReference, SetMultimap<IndirectLineageType, Expression>> merged = new HashMap<>();
        Set<SlotReference> outputSlots = new HashSet<>();
        outputSlots.addAll(directLineageMap.keySet());
        outputSlots.addAll(inDirectLineageMap.keySet());
        for (SlotReference outputSlot : outputSlots) {
            SetMultimap<IndirectLineageType, Expression> combined = HashMultimap.create();
            combined.putAll(datasetIndirectLineageMap);
            SetMultimap<IndirectLineageType, Expression> perOutput = inDirectLineageMap.get(outputSlot);
            if (perOutput != null) {
                combined.putAll(perOutput);
            }
            merged.put(outputSlot, combined);
        }
        return merged;
    }

    /**
     * Get dataset-level indirect lineage expressions.
     *
     * @return dataset-level indirect lineage map
     */
    public Multimap<IndirectLineageType, Expression> getDatasetIndirectLineageMap() {
        return datasetIndirectLineageMap;
    }

    public Set<TableIf> getTableLineageSet() {
        return tableLineageSet;
    }

    public void setTableLineageSet(Set<TableIf> tableLineageSet) {
        this.tableLineageSet = tableLineageSet;
    }

    public void addTableLineage(TableIf table) {
        this.tableLineageSet.add(table);
    }

    public TableIf getTargetTable() {
        return targetTable;
    }

    public void setTargetTable(TableIf targetTable) {
        this.targetTable = targetTable;
    }

    public List<Slot> getTargetColumns() {
        return targetColumns;
    }

    public void setTargetColumns(List<Slot> targetColumns) {
        this.targetColumns = targetColumns;
    }

    /**
     * Get lineage context metadata.
     */
    public LineageContext getContext() {
        return context;
    }

    /**
     * Set lineage context metadata.
     */
    public void setContext(LineageContext context) {
        this.context = context;
    }

    /**
     * Add direct lineage for an output slot
     */
    public void addDirectLineage(SlotReference outputSlot, DirectLineageType type, Expression expr) {
        directLineageMap.computeIfAbsent(outputSlot, k -> HashMultimap.create()).put(type, expr);
    }

    /**
     * Add indirect lineage for an output slot
     */
    public void addIndirectLineage(SlotReference outputSlot, IndirectLineageType type, Expression expr) {
        inDirectLineageMap.computeIfAbsent(outputSlot, k -> HashMultimap.create()).put(type, expr);
    }

    /**
     * Add indirect lineage for all output slots.
     * Stored as dataset-level indirect lineage to avoid duplication.
     */
    public void addDatasetIndirectLineage(IndirectLineageType type, Expression expr) {
        datasetIndirectLineageMap.put(type, expr);
    }

    /**
     * Add indirect lineage for all output slots.
     * Stored as dataset-level indirect lineage to avoid duplication.
     */
    public void addDatasetIndirectLineage(IndirectLineageType type, Set<Expression> exprs) {
        for (Expression expr : exprs) {
            addDatasetIndirectLineage(type, expr);
        }
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("LineageInfo{\n");
        sb.append("  context=").append(context).append(",\n");
        sb.append("  tableLineageSet=").append(tableLineageSet).append(",\n");
        sb.append("  directLineageMap=").append(directLineageMap).append(",\n");
        sb.append("  inDirectLineageMap=").append(inDirectLineageMap).append(",\n");
        sb.append("  targetTable=").append(targetTable != null ? targetTable.getName() : "null").append("\n");
        sb.append("}");
        return sb.toString();
    }
}