DataStreamSink.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/DataStreamSink.java
// and modified by Doris
package org.apache.doris.planner;
import org.apache.doris.analysis.BitmapFilterPredicate;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TDataStreamSink;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TOlapTableLocationParam;
import org.apache.doris.thrift.TOlapTablePartitionParam;
import org.apache.doris.thrift.TOlapTableSchemaParam;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
/**
 * Data sink that forwards data to an exchange node.
 */
public class DataStreamSink extends DataSink {
    private PlanNodeId exchNodeId;
    private DataPartition outputPartition;
    protected TupleDescriptor outputTupleDesc;
    protected List<Expr> projections;
    protected List<Expr> conjuncts = Lists.newArrayList();
    protected List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
    // use for tablet id shuffle sink only
    protected TOlapTableSchemaParam tabletSinkSchemaParam = null;
    protected TOlapTablePartitionParam tabletSinkPartitionParam = null;
    protected TOlapTableLocationParam tabletSinkLocationParam = null;
    protected TupleDescriptor tabletSinkTupleDesc = null;
    protected long tabletSinkTxnId = -1;
    protected List<Expr> tabletSinkExprs = null;
    public DataStreamSink() {
    }
    public DataStreamSink(PlanNodeId exchNodeId) {
        this.exchNodeId = exchNodeId;
    }
    @Override
    public PlanNodeId getExchNodeId() {
        return exchNodeId;
    }
    public void setExchNodeId(PlanNodeId exchNodeId) {
        this.exchNodeId = exchNodeId;
    }
    @Override
    public DataPartition getOutputPartition() {
        return outputPartition;
    }
    public void setOutputPartition(DataPartition outputPartition) {
        this.outputPartition = outputPartition;
    }
    public TupleDescriptor getOutputTupleDesc() {
        return outputTupleDesc;
    }
    public void setOutputTupleDesc(TupleDescriptor outputTupleDesc) {
        this.outputTupleDesc = outputTupleDesc;
    }
    public List<Expr> getProjections() {
        return projections;
    }
    public void setProjections(List<Expr> projections) {
        this.projections = projections;
    }
    public List<Expr> getConjuncts() {
        return conjuncts;
    }
    public void setConjuncts(List<Expr> conjuncts) {
        this.conjuncts = conjuncts;
    }
    public void addConjunct(Expr conjunct) {
        this.conjuncts.add(conjunct);
    }
    public List<RuntimeFilter> getRuntimeFilters() {
        return runtimeFilters;
    }
    public void setRuntimeFilters(List<RuntimeFilter> runtimeFilters) {
        this.runtimeFilters = runtimeFilters;
    }
    public void addRuntimeFilter(RuntimeFilter runtimeFilter) {
        this.runtimeFilters.add(runtimeFilter);
    }
    public void setTabletSinkSchemaParam(TOlapTableSchemaParam schemaParam) {
        this.tabletSinkSchemaParam = schemaParam;
    }
    public void setTabletSinkPartitionParam(TOlapTablePartitionParam partitionParam) {
        this.tabletSinkPartitionParam = partitionParam;
    }
    public void setTabletSinkTupleDesc(TupleDescriptor tupleDesc) {
        this.tabletSinkTupleDesc = tupleDesc;
    }
    public void setTabletSinkLocationParam(TOlapTableLocationParam locationParam) {
        this.tabletSinkLocationParam = locationParam;
    }
    public void setTabletSinkExprs(List<Expr> tabletSinkExprs) {
        this.tabletSinkExprs = tabletSinkExprs;
    }
    public void setTabletSinkTxnId(long txnId) {
        this.tabletSinkTxnId = txnId;
    }
    @Override
    public String getExplainString(String prefix, TExplainLevel explainLevel) {
        StringBuilder strBuilder = new StringBuilder();
        strBuilder.append(prefix).append("STREAM DATA SINK\n");
        strBuilder.append(prefix).append("  EXCHANGE ID: ").append(exchNodeId);
        if (outputPartition != null) {
            strBuilder.append("\n").append(prefix).append("  ").append(outputPartition.getExplainString(explainLevel));
        }
        if (!conjuncts.isEmpty()) {
            Expr expr = PlanNode.convertConjunctsToAndCompoundPredicate(conjuncts);
            strBuilder.append(prefix).append("  CONJUNCTS: ").append(expr.toSql()).append("\n");
        }
        if (!runtimeFilters.isEmpty()) {
            strBuilder.append(prefix).append("  runtime filters: ");
            strBuilder.append(getRuntimeFilterExplainString(false, false));
        }
        if (!CollectionUtils.isEmpty(projections)) {
            strBuilder.append(prefix).append("  PROJECTIONS: ")
                    .append(PlanNode.getExplainString(projections)).append("\n");
            strBuilder.append(prefix).append("  PROJECTION TUPLE: ").append(outputTupleDesc.getId());
            strBuilder.append("\n");
        }
        if (isMerge) {
            strBuilder.append("IS_MERGE: true\n");
        }
        return strBuilder.toString();
    }
    protected String getRuntimeFilterExplainString(boolean isBuildNode, boolean isBrief) {
        if (runtimeFilters.isEmpty()) {
            return "";
        }
        List<String> filtersStr = new ArrayList<>();
        for (RuntimeFilter filter : runtimeFilters) {
            filtersStr.add(filter.getExplainString(getExchNodeId()));
        }
        return Joiner.on(", ").join(filtersStr) + "\n";
    }
    @Override
    protected TDataSink toThrift() {
        TDataSink result = new TDataSink(TDataSinkType.DATA_STREAM_SINK);
        TDataStreamSink tStreamSink =
                new TDataStreamSink(exchNodeId.asInt(), outputPartition.toThrift());
        for (Expr e : conjuncts) {
            if  (!(e instanceof BitmapFilterPredicate)) {
                tStreamSink.addToConjuncts(e.treeToThrift());
            }
        }
        if (projections != null) {
            for (Expr expr : projections) {
                tStreamSink.addToOutputExprs(expr.treeToThrift());
            }
        }
        if (outputTupleDesc != null) {
            tStreamSink.setOutputTupleId(outputTupleDesc.getId().asInt());
        }
        if (runtimeFilters != null) {
            for (RuntimeFilter rf : runtimeFilters) {
                tStreamSink.addToRuntimeFilters(rf.toThrift());
            }
        }
        Preconditions.checkState((tabletSinkSchemaParam != null) == (tabletSinkPartitionParam != null),
                "schemaParam and partitionParam should be set together.");
        if (tabletSinkSchemaParam != null) {
            tStreamSink.setTabletSinkSchema(tabletSinkSchemaParam);
        }
        if (tabletSinkPartitionParam != null) {
            tStreamSink.setTabletSinkPartition(tabletSinkPartitionParam);
        }
        if (tabletSinkTupleDesc != null) {
            tStreamSink.setTabletSinkTupleId(tabletSinkTupleDesc.getId().asInt());
        }
        if (tabletSinkLocationParam != null) {
            tStreamSink.setTabletSinkLocation(tabletSinkLocationParam);
        }
        if (tabletSinkExprs != null) {
            for (Expr expr : tabletSinkExprs) {
                tStreamSink.addToTabletSinkExprs(expr.treeToThrift());
            }
        }
        tStreamSink.setIsMerge(isMerge);
        tStreamSink.setTabletSinkTxnId(tabletSinkTxnId);
        result.setStreamSink(tStreamSink);
        return result;
    }
}