BindSink.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.rules.analysis;

import org.apache.doris.analysis.ColumnDef.DefaultValue;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.GeneratedColumnInfo;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.dictionary.Dictionary;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.analyzer.Scope;
import org.apache.doris.nereids.analyzer.UnboundDictionarySink;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
import org.apache.doris.nereids.analyzer.UnboundJdbcTableSink;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.pattern.MatchingContext;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.DefaultValueSlot;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalDictionarySink;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
import org.apache.doris.nereids.trees.plans.visitor.InferPlanOutputAlias;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.StringType;
import org.apache.doris.nereids.types.coercion.CharacterType;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
 * bind an unbound logicalTableSink represent the target table of an insert command
 */
public class BindSink implements AnalysisRuleFactory {

    @Override
    public List<Rule> buildRules() {
        return ImmutableList.of(
                RuleType.BINDING_INSERT_TARGET_TABLE.build(unboundTableSink().thenApply(this::bindOlapTableSink)),
                RuleType.BINDING_INSERT_FILE.build(logicalFileSink().when(s -> s.getOutputExprs().isEmpty())
                        .then(fileSink -> {
                            ImmutableListMultimap.Builder<ExprId, Integer> exprIdToIndexMapBuilder =
                                    ImmutableListMultimap.builder();
                            List<Slot> childOutput = fileSink.child().getOutput();
                            for (int index = 0; index < childOutput.size(); index++) {
                                exprIdToIndexMapBuilder.put(childOutput.get(index).getExprId(), index);
                            }
                            InferPlanOutputAlias aliasInfer = new InferPlanOutputAlias(childOutput);
                            List<NamedExpression> output = aliasInfer.infer(fileSink.child(),
                                    exprIdToIndexMapBuilder.build());
                            return fileSink.withOutputExprs(output);
                        })
                ),
                // TODO: bind hive target table
                RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink)),
                RuleType.BINDING_INSERT_ICEBERG_TABLE.build(
                    unboundIcebergTableSink().thenApply(this::bindIcebergTableSink)),
                RuleType.BINDING_INSERT_JDBC_TABLE.build(unboundJdbcTableSink().thenApply(this::bindJdbcTableSink)),
                RuleType.BINDING_INSERT_DICTIONARY_TABLE
                        .build(unboundDictionarySink().thenApply(this::bindDictionarySink))
        );
    }

    private Plan bindOlapTableSink(MatchingContext<UnboundTableSink<Plan>> ctx) {
        UnboundTableSink<?> sink = ctx.root;
        Pair<Database, OlapTable> pair = bind(ctx.cascadesContext, sink);
        Database database = pair.first;
        OlapTable table = pair.second;
        boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS;

        LogicalPlan child = ((LogicalPlan) sink.child());
        boolean childHasSeqCol = child.getOutput().stream()
                .anyMatch(slot -> slot.getName().equals(Column.SEQUENCE_COL));
        boolean needExtraSeqCol = isPartialUpdate && !childHasSeqCol && table.hasSequenceCol()
                && table.getSequenceMapCol() != null
                && sink.getColNames().contains(table.getSequenceMapCol());
        // 1. bind target columns: from sink's column names to target tables' Columns
        Pair<List<Column>, Integer> bindColumnsResult =
                bindTargetColumns(table, sink.getColNames(), childHasSeqCol, needExtraSeqCol,
                        sink.getDMLCommandType() == DMLCommandType.GROUP_COMMIT);
        List<Column> bindColumns = bindColumnsResult.first;
        int extraColumnsNum = bindColumnsResult.second;

        LogicalOlapTableSink<?> boundSink = new LogicalOlapTableSink<>(
                database,
                table,
                bindColumns,
                bindPartitionIds(table, sink.getPartitions(), sink.isTemporaryPartition()),
                child.getOutput().stream()
                        .map(NamedExpression.class::cast)
                        .collect(ImmutableList.toImmutableList()),
                isPartialUpdate,
                sink.getDMLCommandType(),
                child);

        // we need to insert all the columns of the target table
        // although some columns are not mentions.
        // so we add a projects to supply the default value.
        if (boundSink.getCols().size() != child.getOutput().size() + extraColumnsNum) {
            throw new AnalysisException("insert into cols should be corresponding to the query output");
        }

        try {
            // For Unique Key table with sequence column (which default value is not CURRENT_TIMESTAMP),
            // user MUST specify the sequence column while inserting data
            //
            // case1: create table by `function_column.sequence_col`
            //        a) insert with column list, must include the sequence map column
            //        b) insert without column list, already contains the column, don't need to check
            // case2: create table by `function_column.sequence_type`
            //        a) insert with column list, must include the hidden column __DORIS_SEQUENCE_COL__
            //        b) insert without column list, don't include the hidden column __DORIS_SEQUENCE_COL__
            //           by default, will fail.
            if (table.hasSequenceCol()) {
                boolean haveInputSeqCol = false;
                Optional<Column> seqColInTable = Optional.empty();
                if (table.getSequenceMapCol() != null) {
                    if (!sink.getColNames().isEmpty()) {
                        if (sink.getColNames().stream()
                                .anyMatch(c -> c.equalsIgnoreCase(table.getSequenceMapCol()))) {
                            haveInputSeqCol = true; // case1.a
                        }
                    } else {
                        haveInputSeqCol = true; // case1.b
                    }
                    seqColInTable = table.getFullSchema().stream()
                            .filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol()))
                            .findFirst();
                } else {
                    // ATTN: must use bindColumns here. Because of insert into from group_commit tvf submitted by BE
                    //   do not follow any column list with target table, but it contains all inviable data in sink's
                    //   child. THis is different with other insert action that contain non-inviable data by default.
                    if (!bindColumns.isEmpty()) {
                        if (bindColumns.stream()
                                .map(Column::getName)
                                .anyMatch(c -> c.equalsIgnoreCase(Column.SEQUENCE_COL))) {
                            haveInputSeqCol = true; // case2.a
                        } // else case2.b
                    }
                }

                // Don't require user to provide sequence column for partial updates,
                // including the following cases:
                // 1. it's a load job with `partial_columns=true`
                // 2. UPDATE and DELETE, planner will automatically add these hidden columns
                // 3. session value `require_sequence_in_insert` is false
                if (!haveInputSeqCol && !isPartialUpdate && (
                        boundSink.getDmlCommandType() != DMLCommandType.UPDATE
                                && boundSink.getDmlCommandType() != DMLCommandType.DELETE) && (
                        boundSink.getDmlCommandType() != DMLCommandType.INSERT
                                || ConnectContext.get().getSessionVariable().isRequireSequenceInInsert())) {
                    if (!seqColInTable.isPresent() || seqColInTable.get().getDefaultValue() == null
                            || !seqColInTable.get().getDefaultValue()
                            .equalsIgnoreCase(DefaultValue.CURRENT_TIMESTAMP)) {
                        throw new org.apache.doris.common.AnalysisException("Table " + table.getName()
                                + " has sequence column, need to specify the sequence column");
                    }
                }
            }
        } catch (Exception e) {
            throw new AnalysisException(e.getMessage(), e.getCause());
        }

        Map<String, NamedExpression> columnToOutput = getColumnToOutput(
                ctx, table, isPartialUpdate, boundSink, child);
        LogicalProject<?> fullOutputProject = getOutputProjectByCoercion(
                table.getFullSchema(), child, columnToOutput);
        List<Column> columns = new ArrayList<>(table.getFullSchema().size());
        for (int i = 0; i < table.getFullSchema().size(); ++i) {
            Column col = table.getFullSchema().get(i);
            if (columnToOutput.get(col.getName()) != null) {
                columns.add(col);
            }
        }
        if (fullOutputProject.getOutputs().size() != columns.size()) {
            throw new AnalysisException("output's size should be same as columns's size");
        }

        int size = columns.size();
        List<Slot> targetTableSlots = new ArrayList<>(size);
        for (int i = 0; i < size; ++i) {
            targetTableSlots.add(SlotReference.fromColumn(table, columns.get(i),
                    table.getFullQualifiers()));
        }
        LegacyExprTranslator exprTranslator = new LegacyExprTranslator(table, targetTableSlots);
        return boundSink.withChildAndUpdateOutput(fullOutputProject, exprTranslator.createPartitionExprList(),
                exprTranslator.createSyncMvWhereClause(), targetTableSlots);
    }

    private LogicalProject<?> getOutputProjectByCoercion(List<Column> tableSchema, LogicalPlan child,
                                                         Map<String, NamedExpression> columnToOutput) {
        List<NamedExpression> fullOutputExprs = Utils.fastToImmutableList(columnToOutput.values());
        if (child instanceof LogicalOneRowRelation) {
            // remove default value slot in one row relation
            child = ((LogicalOneRowRelation) child).withProjects(((LogicalOneRowRelation) child)
                    .getProjects().stream()
                    .filter(p -> !(p instanceof DefaultValueSlot))
                    .collect(ImmutableList.toImmutableList()));
        }
        LogicalProject<?> fullOutputProject = new LogicalProject<>(fullOutputExprs, child);

        // add cast project
        List<NamedExpression> castExprs = Lists.newArrayList();
        for (int i = 0; i < tableSchema.size(); ++i) {
            Column col = tableSchema.get(i);
            NamedExpression expr = columnToOutput.get(col.getName()); // relative outputExpr
            if (expr == null) {
                // If `expr` is null, it means that the current load is a partial update
                // and `col` should not be contained in the output of the sink node so
                // we skip it.
                continue;
            }
            expr = expr.toSlot();
            DataType inputType = expr.getDataType();
            DataType targetType = DataType.fromCatalogType(tableSchema.get(i).getType());
            Expression castExpr = expr;
            // TODO move string like type logic into TypeCoercionUtils#castIfNotSameType
            if (isSourceAndTargetStringLikeType(inputType, targetType) && !inputType.equals(targetType)) {
                int sourceLength = ((CharacterType) inputType).getLen();
                int targetLength = ((CharacterType) targetType).getLen();
                if (sourceLength == targetLength) {
                    castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType);
                } else if (targetType.isStringType()) {
                    castExpr = new Cast(castExpr, StringType.INSTANCE);
                }
            } else {
                castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType);
            }
            if (castExpr instanceof NamedExpression) {
                castExprs.add(((NamedExpression) castExpr));
            } else {
                castExprs.add(new Alias(castExpr));
            }
        }
        if (!castExprs.equals(fullOutputExprs)) {
            fullOutputProject = new LogicalProject<Plan>(castExprs, fullOutputProject);
        }
        return fullOutputProject;
    }

    private static Map<String, NamedExpression> getColumnToOutput(
            MatchingContext<? extends UnboundLogicalSink<Plan>> ctx,
            TableIf table, boolean isPartialUpdate, LogicalTableSink<?> boundSink, LogicalPlan child) {
        // we need to insert all the columns of the target table
        // although some columns are not mentions.
        // so we add a projects to supply the default value.
        Map<Column, NamedExpression> columnToChildOutput = Maps.newHashMap();
        for (int i = 0; i < child.getOutput().size(); ++i) {
            columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i));
        }
        Map<String, NamedExpression> columnToOutput = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
        Map<String, NamedExpression> columnToReplaced = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
        Map<Expression, Expression> replaceMap = Maps.newHashMap();
        NereidsParser expressionParser = new NereidsParser();
        List<Column> generatedColumns = Lists.newArrayList();
        List<Column> materializedViewColumn = Lists.newArrayList();
        List<Column> shadowColumns = Lists.newArrayList();
        // generate slots not mentioned in sql, mv slots and shaded slots.
        for (Column column : boundSink.getTargetTable().getFullSchema()) {
            if (column.getGeneratedColumnInfo() != null) {
                generatedColumns.add(column);
                continue;
            } else if (column.isMaterializedViewColumn()) {
                materializedViewColumn.add(column);
                continue;
            } else if (Column.isShadowColumn(column.getName())) {
                shadowColumns.add(column);
                continue;
            }
            if (columnToChildOutput.containsKey(column)
                    // do not process explicitly use DEFAULT value here:
                    // insert into table t values(DEFAULT)
                    && !(columnToChildOutput.get(column) instanceof DefaultValueSlot)) {
                Alias output = new Alias(TypeCoercionUtils.castIfNotSameType(
                        columnToChildOutput.get(column), DataType.fromCatalogType(column.getType())),
                        column.getName());
                columnToOutput.put(column.getName(), output);
                columnToReplaced.put(column.getName(), output.toSlot());
                replaceMap.put(output.toSlot(), output.child());
            } else {
                if (table instanceof OlapTable && ((OlapTable) table).hasSequenceCol()
                        && column.getName().equals(Column.SEQUENCE_COL)
                        && ((OlapTable) table).getSequenceMapCol() != null) {
                    Optional<Column> seqCol = table.getFullSchema().stream()
                            .filter(col -> col.getName().equals(((OlapTable) table).getSequenceMapCol()))
                            .findFirst();
                    if (!seqCol.isPresent()) {
                        throw new AnalysisException("sequence column is not contained in"
                                + " target table " + table.getName());
                    }
                    if (columnToOutput.get(seqCol.get().getName()) != null) {
                        // should generate diff exprId for seq column
                        NamedExpression seqColumn = columnToOutput.get(seqCol.get().getName());
                        if (seqColumn instanceof Alias) {
                            seqColumn = new Alias(((Alias) seqColumn).child(), column.getName());
                        } else {
                            seqColumn = new Alias(seqColumn, column.getName());
                        }
                        columnToOutput.put(column.getName(), seqColumn);
                        columnToReplaced.put(column.getName(), seqColumn.toSlot());
                        replaceMap.put(seqColumn.toSlot(), seqColumn.child(0));
                    }
                } else if (isPartialUpdate) {
                    // If the current load is a partial update, the values of unmentioned
                    // columns will be filled in SegmentWriter. And the output of sink node
                    // should not contain these unmentioned columns, so we just skip them.

                    // But if the column has 'on update value', we should unconditionally
                    // update the value of the column to the current timestamp whenever there
                    // is an update on the row
                    if (column.hasOnUpdateDefaultValue()) {
                        Expression unboundFunctionDefaultValue = new NereidsParser().parseExpression(
                                column.getOnUpdateDefaultValueExpr().toSqlWithoutTbl()
                        );
                        Expression defualtValueExpression = ExpressionAnalyzer.analyzeFunction(
                                boundSink, ctx.cascadesContext, unboundFunctionDefaultValue
                        );
                        Alias output = new Alias(TypeCoercionUtils.castIfNotSameType(
                                defualtValueExpression, DataType.fromCatalogType(column.getType())),
                                column.getName());
                        columnToOutput.put(column.getName(), output);
                        columnToReplaced.put(column.getName(), output.toSlot());
                        replaceMap.put(output.toSlot(), output.child());
                    } else {
                        continue;
                    }
                } else if (column.getDefaultValue() == null) {
                    // throw exception if explicitly use Default value but no default value present
                    // insert into table t values(DEFAULT)
                    if (!column.isAllowNull() && !column.isAutoInc()) {
                        throw new AnalysisException("Column has no default value,"
                                + " column=" + column.getName());
                    }
                    // Otherwise, the unmentioned columns should be filled with default values
                    // or null values
                    Alias output = new Alias(new NullLiteral(DataType.fromCatalogType(column.getType())),
                            column.getName());
                    columnToOutput.put(column.getName(), output);
                    columnToReplaced.put(column.getName(), output.toSlot());
                    replaceMap.put(output.toSlot(), output.child());
                } else {
                    try {
                        // it comes from the original planner, if default value expression is
                        // null, we use the literal string of the default value, or it may be
                        // default value function, like CURRENT_TIMESTAMP.
                        if (column.getDefaultValueExpr() == null) {
                            columnToOutput.put(column.getName(),
                                    new Alias(Literal.of(column.getDefaultValue())
                                            .checkedCastTo(DataType.fromCatalogType(column.getType())),
                                            column.getName()));
                        } else {
                            Expression unboundDefaultValue = new NereidsParser().parseExpression(
                                    column.getDefaultValueExpr().toSqlWithoutTbl());
                            Expression defualtValueExpression = ExpressionAnalyzer.analyzeFunction(
                                    boundSink, ctx.cascadesContext, unboundDefaultValue);
                            if (defualtValueExpression instanceof Alias) {
                                defualtValueExpression = ((Alias) defualtValueExpression).child();
                            }
                            Alias output = new Alias((TypeCoercionUtils.castIfNotSameType(
                                    defualtValueExpression, DataType.fromCatalogType(column.getType()))),
                                    column.getName());
                            columnToOutput.put(column.getName(), output);
                            columnToReplaced.put(column.getName(), output.toSlot());
                            replaceMap.put(output.toSlot(), output.child());
                        }
                    } catch (Exception e) {
                        throw new AnalysisException(e.getMessage(), e.getCause());
                    }
                }
            }
        }
        // the generated columns can use all ordinary columns,
        // if processed in upper for loop, will lead to not found slot error
        // It's the same reason for moving the processing of materialized columns down.
        for (Column column : generatedColumns) {
            GeneratedColumnInfo info = column.getGeneratedColumnInfo();
            Expression parsedExpression = new NereidsParser().parseExpression(info.getExpr().toSqlWithoutTbl());
            Expression boundExpression = new CustomExpressionAnalyzer(boundSink, ctx.cascadesContext, columnToReplaced)
                    .analyze(parsedExpression);
            if (boundExpression instanceof Alias) {
                boundExpression = ((Alias) boundExpression).child();
            }
            boundExpression = ExpressionUtils.replace(boundExpression, replaceMap);
            Alias output = new Alias(boundExpression, info.getExprSql());
            columnToOutput.put(column.getName(), output);
            columnToReplaced.put(column.getName(), output.toSlot());
            replaceMap.put(output.toSlot(), output.child());
        }
        for (Column column : materializedViewColumn) {
            if (column.isMaterializedViewColumn()) {
                List<SlotRef> refs = column.getRefColumns();
                // now we have to replace the column to slots.
                Preconditions.checkArgument(refs != null,
                        "mv column %s 's ref column cannot be null", column);
                Expression parsedExpression = expressionParser.parseExpression(
                        column.getDefineExpr().toSqlWithoutTbl());
                // the boundSlotExpression is an expression whose slots are bound but function
                // may not be bound, we have to bind it again.
                // for example: to_bitmap.
                Expression boundExpression = new CustomExpressionAnalyzer(
                        boundSink, ctx.cascadesContext, columnToReplaced).analyze(parsedExpression);
                if (boundExpression instanceof Alias) {
                    boundExpression = ((Alias) boundExpression).child();
                }
                boundExpression = ExpressionUtils.replace(boundExpression, replaceMap);
                boundExpression = TypeCoercionUtils.castIfNotSameType(boundExpression,
                        DataType.fromCatalogType(column.getType()));
                Alias output = new Alias(boundExpression, column.getDefineExpr().toSqlWithoutTbl());
                columnToOutput.put(column.getName(), output);
            }
        }
        for (Column column : shadowColumns) {
            NamedExpression expression = columnToOutput.get(column.getNonShadowName());
            if (expression != null) {
                Alias alias = (Alias) expression;
                Expression newExpr = TypeCoercionUtils.castIfNotSameType(alias.child(),
                        DataType.fromCatalogType(column.getType()));
                columnToOutput.put(column.getName(), new Alias(newExpr, column.getName()));
            }
        }
        return columnToOutput;
    }

    private Plan bindHiveTableSink(MatchingContext<UnboundHiveTableSink<Plan>> ctx) {
        UnboundHiveTableSink<?> sink = ctx.root;
        Pair<HMSExternalDatabase, HMSExternalTable> pair = bind(ctx.cascadesContext, sink);
        HMSExternalDatabase database = pair.first;
        HMSExternalTable table = pair.second;
        LogicalPlan child = ((LogicalPlan) sink.child());

        if (!sink.getPartitions().isEmpty()) {
            throw new AnalysisException("Not support insert with partition spec in hive catalog.");
        }

        List<Column> bindColumns;
        if (sink.getColNames().isEmpty()) {
            bindColumns = table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList());
        } else {
            bindColumns = sink.getColNames().stream().map(cn -> {
                Column column = table.getColumn(cn);
                if (column == null) {
                    throw new AnalysisException(String.format("column %s is not found in table %s",
                            cn, table.getName()));
                }
                return column;
            }).collect(ImmutableList.toImmutableList());
        }
        LogicalHiveTableSink<?> boundSink = new LogicalHiveTableSink<>(
                database,
                table,
                bindColumns,
                child.getOutput().stream()
                    .map(NamedExpression.class::cast)
                    .collect(ImmutableList.toImmutableList()),
                sink.getDMLCommandType(),
                Optional.empty(),
                Optional.empty(),
                child);
        // we need to insert all the columns of the target table
        if (boundSink.getCols().size() != child.getOutput().size()) {
            throw new AnalysisException("insert into cols should be corresponding to the query output");
        }
        Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx, table, false,
                boundSink, child);
        LogicalProject<?> fullOutputProject = getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput);
        return boundSink.withChildAndUpdateOutput(fullOutputProject);
    }

    private Plan bindIcebergTableSink(MatchingContext<UnboundIcebergTableSink<Plan>> ctx) {
        UnboundIcebergTableSink<?> sink = ctx.root;
        Pair<IcebergExternalDatabase, IcebergExternalTable> pair = bind(ctx.cascadesContext, sink);
        IcebergExternalDatabase database = pair.first;
        IcebergExternalTable table = pair.second;
        LogicalPlan child = ((LogicalPlan) sink.child());

        List<Column> bindColumns;
        if (sink.getColNames().isEmpty()) {
            bindColumns = table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList());
        } else {
            bindColumns = sink.getColNames().stream().map(cn -> {
                Column column = table.getColumn(cn);
                if (column == null) {
                    throw new AnalysisException(String.format("column %s is not found in table %s",
                            cn, table.getName()));
                }
                return column;
            }).collect(ImmutableList.toImmutableList());
        }
        LogicalIcebergTableSink<?> boundSink = new LogicalIcebergTableSink<>(
                database,
                table,
                bindColumns,
                child.getOutput().stream()
                        .map(NamedExpression.class::cast)
                        .collect(ImmutableList.toImmutableList()),
                sink.getDMLCommandType(),
                Optional.empty(),
                Optional.empty(),
                child);
        // we need to insert all the columns of the target table
        if (boundSink.getCols().size() != child.getOutput().size()) {
            throw new AnalysisException("insert into cols should be corresponding to the query output");
        }
        Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx, table, false,
                boundSink, child);
        LogicalProject<?> fullOutputProject = getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput);
        return boundSink.withChildAndUpdateOutput(fullOutputProject);
    }

    private Plan bindJdbcTableSink(MatchingContext<UnboundJdbcTableSink<Plan>> ctx) {
        UnboundJdbcTableSink<?> sink = ctx.root;
        Pair<JdbcExternalDatabase, JdbcExternalTable> pair = bind(ctx.cascadesContext, sink);
        JdbcExternalDatabase database = pair.first;
        JdbcExternalTable table = pair.second;
        LogicalPlan child = ((LogicalPlan) sink.child());

        List<Column> bindColumns;
        if (sink.getColNames().isEmpty()) {
            bindColumns = table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList());
        } else {
            bindColumns = sink.getColNames().stream().map(cn -> {
                Column column = table.getColumn(cn);
                if (column == null) {
                    throw new AnalysisException(String.format("column %s is not found in table %s",
                            cn, table.getName()));
                }
                return column;
            }).collect(ImmutableList.toImmutableList());
        }
        LogicalJdbcTableSink<?> boundSink = new LogicalJdbcTableSink<>(
                database,
                table,
                bindColumns,
                child.getOutput().stream()
                        .map(NamedExpression.class::cast)
                        .collect(ImmutableList.toImmutableList()),
                sink.getDMLCommandType(),
                Optional.empty(),
                Optional.empty(),
                child);
        // we need to insert all the columns of the target table
        if (boundSink.getCols().size() != child.getOutput().size()) {
            throw new AnalysisException("insert into cols should be corresponding to the query output");
        }
        Map<String, NamedExpression> columnToOutput = getJdbcColumnToOutput(bindColumns, child);
        // We don't need to insert unmentioned columns, only user specified columns
        LogicalProject<?> outputProject = getOutputProjectByCoercion(bindColumns, child, columnToOutput);
        return boundSink.withChildAndUpdateOutput(outputProject);
    }

    private static Map<String, NamedExpression> getJdbcColumnToOutput(
            List<Column> bindColumns, LogicalPlan child) {
        Map<String, NamedExpression> columnToOutput = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);

        for (int i = 0; i < bindColumns.size(); i++) {
            Column column = bindColumns.get(i);
            NamedExpression outputExpr = child.getOutput().get(i);
            Alias output = new Alias(
                    TypeCoercionUtils.castIfNotSameType(outputExpr, DataType.fromCatalogType(column.getType())),
                    column.getName()
            );
            columnToOutput.put(column.getName(), output);
        }

        return columnToOutput;
    }

    private Plan bindDictionarySink(MatchingContext<UnboundDictionarySink<Plan>> ctx) {
        UnboundDictionarySink<?> sink = ctx.root;
        Pair<Database, Dictionary> pair = bind(ctx.cascadesContext, sink);
        Database database = pair.first;
        Dictionary dictionary = pair.second;
        LogicalPlan child = ((LogicalPlan) sink.child());

        // 1. bind target columns: from sink's column names to target tables' Columns
        // bindTargetColumns for dictionary: now will sink exactly all dictionaries' columns.
        List<Column> sinkColumns = dictionary.getFullSchema();

        // Dictionary sink is special. It allows sink with columns which not SAME like upstream's output.
        // e.g. Scan Column(A|B|C|D), Sink Column(D|B|A) is OK.
        // so we have to re-calculate LogicalDictionarySink's output expr.
        List<NamedExpression> upstreamOutput = child.getOutput().stream().map(NamedExpression.class::cast)
                .collect(ImmutableList.toImmutableList());
        List<NamedExpression> outputExprs = new ArrayList<>(sinkColumns.size());
        for (Column column : sinkColumns) {
            // find the SlotRef from child's output
            Optional<NamedExpression> output = upstreamOutput.stream()
                    .filter(expr -> expr.getName().equalsIgnoreCase(column.getName())).findFirst();
            if (output.isPresent()) {
                outputExprs.add(output.get());
            } else {
                throw new AnalysisException("Unknown column " + column.getName());
            }
        }

        // Create LogicalDictionarySink. from child's output to OutputExprs.
        // if source table has A,B,C,D, dictionary has D,B,A, then outputExprs and sinkColumns are both D,B,A
        LogicalDictionarySink<?> boundSink = new LogicalDictionarySink<>(database, dictionary, sink.allowAdaptiveLoad(),
                sinkColumns, outputExprs, child);

        // Get column to output mapping and handle type coercion. sink column to its accepted expr
        Map<String, NamedExpression> sinkColumnToExpr = getDictColumnToOutput(ctx, sinkColumns, boundSink, child);
        // before we get A|B|C|D and only sink D|B|A. here deal the PROJECT between them.
        LogicalProject<?> fullOutputProject = getOutputProjectByCoercion(sinkColumns, child, sinkColumnToExpr);

        // Return the bound sink with updated child and outputExprs here.
        return boundSink.withChildAndUpdateOutput(fullOutputProject);
    }

    private static Map<String, NamedExpression> getDictColumnToOutput(
            MatchingContext<? extends UnboundLogicalSink<Plan>> ctx, List<Column> sinkSchema,
            LogicalDictionarySink<?> boundSink, LogicalPlan child) {
        // as we said, dictionary sink is special - unordered and inconsistent.
        Map<String, NamedExpression> upstreamOutputs = Maps.newHashMap();

        // push A|B|C|D.
        for (int i = 0; i < child.getOutput().size(); ++i) {
            upstreamOutputs.put(child.getOutput().get(i).getName(), child.getOutput().get(i));
        }
        Map<String, NamedExpression> columnToOutput = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
        // for sink columns D|B|A, link them with child's outputs
        for (Column column : sinkSchema) {
            if (upstreamOutputs.containsKey(column.getName())) {
                // dictionary's type must be same with source table.
                Alias output = new Alias(upstreamOutputs.get(column.getName()), column.getName());
                columnToOutput.put(column.getName(), output);
            } else {
                throw new AnalysisException("Unknown column " + column.getName());
            }
        }
        return columnToOutput;
    }

    private Pair<Database, OlapTable> bind(CascadesContext cascadesContext, UnboundTableSink<? extends Plan> sink) {
        List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
                sink.getNameParts());
        Pair<DatabaseIf<?>, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier,
                cascadesContext.getConnectContext().getEnv());
        if (!(pair.second instanceof OlapTable)) {
            throw new AnalysisException("the target table of insert into is not an OLAP table");
        }
        return Pair.of(((Database) pair.first), (OlapTable) pair.second);
    }

    private Pair<HMSExternalDatabase, HMSExternalTable> bind(CascadesContext cascadesContext,
                                                             UnboundHiveTableSink<? extends Plan> sink) {
        List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
                sink.getNameParts());
        Pair<DatabaseIf<?>, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier,
                cascadesContext.getConnectContext().getEnv());
        if (pair.second instanceof HMSExternalTable) {
            HMSExternalTable table = (HMSExternalTable) pair.second;
            if (table.getDlaType() == HMSExternalTable.DLAType.HIVE) {
                return Pair.of(((HMSExternalDatabase) pair.first), table);
            }
        }
        throw new AnalysisException("the target table of insert into is not a Hive table");
    }

    private Pair<IcebergExternalDatabase, IcebergExternalTable> bind(CascadesContext cascadesContext,
                                                                     UnboundIcebergTableSink<? extends Plan> sink) {
        List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
                sink.getNameParts());
        Pair<DatabaseIf<?>, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier,
                cascadesContext.getConnectContext().getEnv());
        if (pair.second instanceof IcebergExternalTable) {
            return Pair.of(((IcebergExternalDatabase) pair.first), (IcebergExternalTable) pair.second);
        }
        throw new AnalysisException("the target table of insert into is not an iceberg table");
    }

    private Pair<JdbcExternalDatabase, JdbcExternalTable> bind(CascadesContext cascadesContext,
            UnboundJdbcTableSink<? extends Plan> sink) {
        List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
                sink.getNameParts());
        Pair<DatabaseIf<?>, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier,
                cascadesContext.getConnectContext().getEnv());
        if (pair.second instanceof JdbcExternalTable) {
            return Pair.of(((JdbcExternalDatabase) pair.first), (JdbcExternalTable) pair.second);
        }
        throw new AnalysisException("the target table of insert into is not an jdbc table");
    }

    private Pair<Database, Dictionary> bind(CascadesContext cascadesContext,
            UnboundDictionarySink<? extends Plan> sink) {
        Dictionary dictionary = sink.getDictionary();
        Database db;
        try {
            db = cascadesContext.getConnectContext().getEnv().getInternalCatalog()
                    .getDbOrAnalysisException(dictionary.getDatabase().getName());
        } catch (org.apache.doris.common.AnalysisException e) {
            throw new AnalysisException(e.getMessage());
        }
        return Pair.of(db, dictionary);
    }

    private List<Long> bindPartitionIds(OlapTable table, List<String> partitions, boolean temp) {
        return partitions.isEmpty()
                ? ImmutableList.of()
                : partitions.stream().map(pn -> {
                    Partition partition = table.getPartition(pn, temp);
                    if (partition == null) {
                        throw new AnalysisException(String.format("partition %s is not found in table %s",
                                pn, table.getName()));
                    }
                    return partition.getId();
                }).collect(Collectors.toList());
    }

    // bindTargetColumns means bind sink node's target columns' names to target table's columns
    private Pair<List<Column>, Integer> bindTargetColumns(OlapTable table, List<String> colsName,
            boolean childHasSeqCol, boolean needExtraSeqCol, boolean isGroupCommit) {
        // if the table set sequence column in stream load phase, the sequence map column is null, we query it.
        if (colsName.isEmpty()) {
            // ATTN: group commit without column list should return all base index column
            //   because it already prepares data for these columns.
            return Pair.of(table.getBaseSchema(true).stream()
                    .filter(c -> isGroupCommit || validColumn(c, childHasSeqCol))
                    .collect(ImmutableList.toImmutableList()), 0);
        } else {
            int extraColumnsNum = (needExtraSeqCol ? 1 : 0);
            List<String> processedColsName = Lists.newArrayList(colsName);
            for (Column col : table.getFullSchema()) {
                if (col.hasOnUpdateDefaultValue()) {
                    Optional<String> colName = colsName.stream().filter(c -> c.equals(col.getName())).findFirst();
                    if (!colName.isPresent()) {
                        ++extraColumnsNum;
                        processedColsName.add(col.getName());
                    }
                } else if (col.getGeneratedColumnInfo() != null) {
                    ++extraColumnsNum;
                    processedColsName.add(col.getName());
                }
            }
            if (!processedColsName.contains(Column.SEQUENCE_COL) && (childHasSeqCol || needExtraSeqCol)) {
                processedColsName.add(Column.SEQUENCE_COL);
            }
            return Pair.of(processedColsName.stream().map(cn -> {
                Column column = table.getColumn(cn);
                if (column == null) {
                    throw new AnalysisException(String.format("column %s is not found in table %s",
                            cn, table.getName()));
                }
                return column;
            }).collect(ImmutableList.toImmutableList()), extraColumnsNum);
        }
    }

    private boolean isSourceAndTargetStringLikeType(DataType input, DataType target) {
        return input.isStringLikeType() && target.isStringLikeType();
    }

    private boolean validColumn(Column column, boolean isNeedSequenceCol) {
        return (column.isVisible() || (isNeedSequenceCol && column.isSequenceColumn()))
                && !column.isMaterializedViewColumn();
    }

    private static class CustomExpressionAnalyzer extends ExpressionAnalyzer {
        private Map<String, NamedExpression> slotBinder;

        public CustomExpressionAnalyzer(
                Plan currentPlan, CascadesContext cascadesContext, Map<String, NamedExpression> slotBinder) {
            super(currentPlan, new Scope(ImmutableList.of()), cascadesContext, false, false);
            this.slotBinder = slotBinder;
        }

        @Override
        public Expression visitUnboundSlot(UnboundSlot unboundSlot, ExpressionRewriteContext context) {
            if (!slotBinder.containsKey(unboundSlot.getName())) {
                throw new AnalysisException("cannot find column from target table " + unboundSlot.getNameParts());
            }
            return slotBinder.get(unboundSlot.getName());
        }
    }

    private static class LegacyExprTranslator {
        private final Map<String, Slot> nereidsSlotReplaceMap = new HashMap<>();
        private final OlapTable olapTable;

        public LegacyExprTranslator(OlapTable table, List<Slot> outputs) {
            for (Slot expression : outputs) {
                String name = expression.getName();
                nereidsSlotReplaceMap.put(name.toLowerCase(), expression.toSlot());
            }
            this.olapTable = table;
        }

        public List<Expression> createPartitionExprList() {
            return olapTable.getPartitionInfo().getPartitionExprs().stream()
                    .map(expr -> analyze(expr.toSql())).collect(Collectors.toList());
        }

        public Map<Long, Expression> createSyncMvWhereClause() {
            Map<Long, Expression> mvWhereClauses = new HashMap<>();
            long baseIndexId = olapTable.getBaseIndexId();
            for (Map.Entry<Long, MaterializedIndexMeta> entry : olapTable.getVisibleIndexIdToMeta().entrySet()) {
                if (entry.getKey() != baseIndexId && entry.getValue().getWhereClause() != null) {
                    mvWhereClauses.put(entry.getKey(), analyze(entry.getValue().getWhereClause().toSql()));
                }
            }
            return mvWhereClauses;
        }

        private Expression analyze(String exprSql) {
            Expression expression = new NereidsParser().parseExpression(exprSql);
            Expression boundSlotExpression = SlotReplacer.INSTANCE.replace(expression, nereidsSlotReplaceMap);
            Scope scope = new Scope(Lists.newArrayList(nereidsSlotReplaceMap.values()));
            StatementContext statementContext = new StatementContext();
            LogicalEmptyRelation dummyPlan = new LogicalEmptyRelation(
                    statementContext.getNextRelationId(), new ArrayList<>());
            CascadesContext cascadesContext = CascadesContext.initContext(
                    statementContext, dummyPlan, PhysicalProperties.ANY);
            ExpressionAnalyzer analyzer = new ExpressionAnalyzer(null, scope, cascadesContext, false, false);
            return analyzer.analyze(boundSlotExpression, new ExpressionRewriteContext(cascadesContext));
        }

        private static class SlotReplacer extends DefaultExpressionRewriter<Map<String, Slot>> {
            public static final SlotReplacer INSTANCE = new SlotReplacer();

            public Expression replace(Expression e, Map<String, Slot> replaceMap) {
                return e.accept(this, replaceMap);
            }

            @Override
            public Expression visitUnboundSlot(UnboundSlot unboundSlot,
                                               Map<String, Slot> replaceMap) {
                if (!replaceMap.containsKey(unboundSlot.getName().toLowerCase())) {
                    throw new org.apache.doris.nereids.exceptions.AnalysisException(
                            "Unknown column " + unboundSlot.getName());
                }
                return replaceMap.get(unboundSlot.getName().toLowerCase());
            }
        }
    }
}