OlapTableSink.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.planner;

import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertCommandContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TExprNode;
import org.apache.doris.thrift.TNodeInfo;
import org.apache.doris.thrift.TOlapTableIndex;
import org.apache.doris.thrift.TOlapTableIndexSchema;
import org.apache.doris.thrift.TOlapTableIndexTablets;
import org.apache.doris.thrift.TOlapTableLocationParam;
import org.apache.doris.thrift.TOlapTablePartition;
import org.apache.doris.thrift.TOlapTablePartitionParam;
import org.apache.doris.thrift.TOlapTableSchemaParam;
import org.apache.doris.thrift.TOlapTableSink;
import org.apache.doris.thrift.TPaloNodesInfo;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TTabletLocation;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TUniqueKeyUpdateMode;

import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;

public class OlapTableSink extends DataSink {
    private static final Logger LOG = LogManager.getLogger(OlapTableSink.class);

    // input variables
    private OlapTable dstTable;
    private TupleDescriptor tupleDescriptor;
    // specified partition ids.
    private List<Long> partitionIds;
    // partial update input columns
    private TUniqueKeyUpdateMode uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPSERT;
    private HashSet<String> partialUpdateInputColumns;

    // set after init called
    protected TDataSink tDataSink;

    private boolean singleReplicaLoad;

    private boolean isStrictMode = false;
    private long txnId = -1;

    private List<Expr> partitionExprs;
    private Map<Long, Expr> syncMvWhereClauses;

    private TOlapTableSchemaParam tOlapTableSchemaParam;
    private TOlapTablePartitionParam tOlapTablePartitionParam;
    private List<TOlapTableLocationParam> tOlapTableLocationParams;

    public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds,
            boolean singleReplicaLoad) {
        this.dstTable = dstTable;
        this.tupleDescriptor = tupleDescriptor;
        this.partitionIds = partitionIds;
        this.singleReplicaLoad = singleReplicaLoad;
    }

    // new constructor for nereids
    public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds,
                         boolean singleReplicaLoad, List<Expr> partitionExprs, Map<Long, Expr> syncMvWhereClauses) {
        this.dstTable = dstTable;
        this.tupleDescriptor = tupleDescriptor;
        this.partitionIds = partitionIds;
        this.singleReplicaLoad = singleReplicaLoad;
        this.partitionExprs = partitionExprs;
        this.syncMvWhereClauses = syncMvWhereClauses;
    }

    public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeoutS, int sendBatchParallelism,
            boolean loadToSingleTablet, boolean isStrictMode, long txnExpirationS) throws AnalysisException {
        TOlapTableSink tSink = new TOlapTableSink();
        tSink.setLoadId(loadId);
        tSink.setTxnId(txnId);
        tSink.setDbId(dbId);
        tSink.setBaseSchemaVersion(dstTable.getBaseSchemaVersion());
        tSink.setLoadChannelTimeoutS(loadChannelTimeoutS);
        tSink.setSendBatchParallelism(sendBatchParallelism);
        tSink.setWriteFileCache(ConnectContext.get() != null
                ? !ConnectContext.get().getSessionVariable().isDisableFileCache()
                : false);
        this.isStrictMode = isStrictMode;
        this.txnId = txnId;
        if (loadToSingleTablet && !(dstTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo)) {
            throw new AnalysisException(
                    "if load_to_single_tablet set to true," + " the olap table must be with random distribution");
        }
        tSink.setLoadToSingleTablet(loadToSingleTablet);
        tSink.setTxnTimeoutS(txnExpirationS);
        String vaultId = dstTable.getStorageVaultId();
        if (vaultId != null && !vaultId.isEmpty()) {
            tSink.setStorageVaultId(vaultId);
        }
        tDataSink = new TDataSink(getDataSinkType());
        tDataSink.setOlapTableSink(tSink);

        if (partitionIds == null) {
            partitionIds = dstTable.getPartitionIds();
            if (partitionIds.isEmpty() && dstTable.getPartitionInfo().enableAutomaticPartition() == false) {
                ErrorReport.reportAnalysisException(ErrorCode.ERR_EMPTY_PARTITION_IN_TABLE, dstTable.getName());
            }
        }
        for (Long partitionId : partitionIds) {
            Partition part = dstTable.getPartition(partitionId);
            if (part == null) {
                ErrorReport.reportAnalysisException(ErrorCode.ERR_UNKNOWN_PARTITION, partitionId, dstTable.getName());
            }
        }

        if (singleReplicaLoad && dstTable.getStorageFormat() == TStorageFormat.V1) {
            // Single replica load not supported by TStorageFormat.V1
            singleReplicaLoad = false;
            LOG.warn("Single replica load not supported by TStorageFormat.V1. table: {}", dstTable.getName());
        }
        if (dstTable.getEnableUniqueKeyMergeOnWrite()) {
            singleReplicaLoad = false;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Single replica load not supported by merge-on-write table: {}", dstTable.getName());
            }
        }
    }

    // init for nereids insert into
    public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeoutS,
            int sendBatchParallelism, boolean loadToSingleTablet, boolean isStrictMode,
            long txnExpirationS, OlapInsertCommandContext olapInsertCtx) throws UserException {
        init(loadId, txnId, dbId, loadChannelTimeoutS, sendBatchParallelism, loadToSingleTablet,
                isStrictMode, txnExpirationS);
        for (Long partitionId : partitionIds) {
            Partition partition = dstTable.getPartition(partitionId);
            if (dstTable.getIndexNumber() != partition.getMaterializedIndices(IndexExtState.ALL).size()) {
                throw new UserException(
                        "table's index number not equal with partition's index number. table's index number="
                                + dstTable.getIndexIdToMeta().size() + ", partition's index number="
                                + partition.getMaterializedIndices(IndexExtState.ALL).size());
            }
        }

        TOlapTableSink tSink = tDataSink.getOlapTableSink();
        tOlapTableSchemaParam = createSchema(tSink.getDbId(), dstTable);
        tOlapTablePartitionParam = createPartition(tSink.getDbId(), dstTable);
        tOlapTableLocationParams = createLocation(tSink.getDbId(), dstTable);

        tSink.setTableId(dstTable.getId());
        tSink.setTupleId(tupleDescriptor.getId().asInt());
        int numReplicas = dstTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum();
        tSink.setNumReplicas(numReplicas);
        tSink.setNeedGenRollup(dstTable.shouldLoadToNewRollup());
        tSink.setSchema(tOlapTableSchemaParam);
        tSink.setPartition(tOlapTablePartitionParam);
        tSink.setLocation(tOlapTableLocationParams.get(0));
        if (singleReplicaLoad) {
            tSink.setSlaveLocation(tOlapTableLocationParams.get(1));
        }
        tSink.setWriteSingleReplica(singleReplicaLoad);
        tSink.setNodesInfo(createPaloNodesInfo());

        if (!olapInsertCtx.isAllowAutoPartition()) {
            setAutoPartition(false);
        }
        if (olapInsertCtx.isAutoDetectOverwrite()) {
            setAutoDetectOverwite(true);
            setOverwriteGroupId(olapInsertCtx.getOverwriteGroupId());
        }
    }

    // init for nereids stream load
    public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeoutS,
            int sendBatchParallelism, boolean loadToSingleTablet, boolean isStrictMode,
            long txnExpirationS, TUniqueKeyUpdateMode uniquekeyUpdateMode,
            HashSet<String> partialUpdateInputColumns) throws UserException {
        setPartialUpdateInfo(uniquekeyUpdateMode, partialUpdateInputColumns);
        init(loadId, txnId, dbId, loadChannelTimeoutS, sendBatchParallelism, loadToSingleTablet,
                isStrictMode, txnExpirationS);
        for (Long partitionId : partitionIds) {
            Partition partition = dstTable.getPartition(partitionId);
            if (dstTable.getIndexNumber() != partition.getMaterializedIndices(IndexExtState.ALL).size()) {
                throw new UserException(
                        "table's index number not equal with partition's index number. table's index number="
                                + dstTable.getIndexIdToMeta().size() + ", partition's index number="
                                + partition.getMaterializedIndices(IndexExtState.ALL).size());
            }
        }

        TOlapTableSink tSink = tDataSink.getOlapTableSink();
        tOlapTableSchemaParam = createSchema(tSink.getDbId(), dstTable);
        tOlapTablePartitionParam = createPartition(tSink.getDbId(), dstTable);
        tOlapTableLocationParams = createLocation(tSink.getDbId(), dstTable);

        tSink.setTableId(dstTable.getId());
        tSink.setTupleId(tupleDescriptor.getId().asInt());
        int numReplicas = dstTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum();
        tSink.setNumReplicas(numReplicas);
        tSink.setNeedGenRollup(dstTable.shouldLoadToNewRollup());
        tSink.setSchema(tOlapTableSchemaParam);
        tSink.setPartition(tOlapTablePartitionParam);
        tSink.setLocation(tOlapTableLocationParams.get(0));
        if (singleReplicaLoad) {
            tSink.setSlaveLocation(tOlapTableLocationParams.get(1));
        }
        tSink.setWriteSingleReplica(singleReplicaLoad);
        tSink.setNodesInfo(createPaloNodesInfo());
    }

    public TOlapTableSchemaParam getOlapTableSchemaParam() {
        return tOlapTableSchemaParam;
    }

    public TOlapTablePartitionParam getOlapTablePartitionParam() {
        return tOlapTablePartitionParam;
    }

    public List<TOlapTableLocationParam> getOlapTableLocationParams() {
        return tOlapTableLocationParams;
    }

    public void setPartialUpdateInputColumns(boolean isPartialUpdate, HashSet<String> columns) {
        if (isPartialUpdate) {
            this.uniqueKeyUpdateMode = TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS;
            this.partialUpdateInputColumns = columns;
        }
    }

    public void setPartialUpdateInfo(TUniqueKeyUpdateMode uniqueKeyUpdateMode, HashSet<String> columns) {
        this.uniqueKeyUpdateMode = uniqueKeyUpdateMode;
        if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
            this.partialUpdateInputColumns = columns;
        }
    }

    public void updateLoadId(TUniqueId newLoadId) {
        tDataSink.getOlapTableSink().setLoadId(newLoadId);
    }

    public void setAutoPartition(boolean var) {
        tDataSink.getOlapTableSink().getPartition().setEnableAutomaticPartition(var);
    }

    public void setAutoDetectOverwite(boolean var) {
        tDataSink.getOlapTableSink().getPartition().setEnableAutoDetectOverwrite(var);
    }

    public void setOverwriteGroupId(long var) {
        tDataSink.getOlapTableSink().getPartition().setOverwriteGroupId(var);
    }

    // must called after tupleDescriptor is computed
    public void complete(Analyzer analyzer) throws UserException {
        for (Long partitionId : partitionIds) {
            Partition partition = dstTable.getPartition(partitionId);
            if (dstTable.getIndexNumber() != partition.getMaterializedIndices(IndexExtState.ALL).size()) {
                throw new UserException(
                        "table's index number not equal with partition's index number. table's index number="
                                + dstTable.getIndexIdToMeta().size() + ", partition's index number="
                                + partition.getMaterializedIndices(IndexExtState.ALL).size());
            }
        }

        TOlapTableSink tSink = tDataSink.getOlapTableSink();

        tSink.setTableId(dstTable.getId());
        tSink.setTupleId(tupleDescriptor.getId().asInt());
        int numReplicas = dstTable.getTableProperty().getReplicaAllocation().getTotalReplicaNum();
        tSink.setNumReplicas(numReplicas);
        tSink.setNeedGenRollup(dstTable.shouldLoadToNewRollup());
        tSink.setSchema(createSchema(tSink.getDbId(), dstTable, analyzer));
        tSink.setPartition(createPartition(tSink.getDbId(), dstTable, analyzer));
        List<TOlapTableLocationParam> locationParams = createLocation(tSink.getDbId(), dstTable);
        tSink.setLocation(locationParams.get(0));
        if (singleReplicaLoad) {
            tSink.setSlaveLocation(locationParams.get(1));
        }
        tSink.setWriteSingleReplica(singleReplicaLoad);
        tSink.setNodesInfo(createPaloNodesInfo());
    }

    @Override
    public String getExplainString(String prefix, TExplainLevel explainLevel) {
        StringBuilder strBuilder = new StringBuilder();
        strBuilder.append(prefix + "OLAP TABLE SINK\n");
        if (explainLevel == TExplainLevel.BRIEF) {
            return strBuilder.toString();
        }
        strBuilder.append(prefix + "  TUPLE ID: " + tupleDescriptor.getId() + "\n");
        strBuilder.append(prefix + "  " + DataPartition.RANDOM.getExplainString(explainLevel));
        boolean isPartialUpdate = uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT;
        strBuilder.append(prefix + "  IS_PARTIAL_UPDATE: " + isPartialUpdate);
        if (isPartialUpdate) {
            if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
                strBuilder.append(prefix + "  PARTIAL_UPDATE_MODE: UPDATE_FIXED_COLUMNS");
            } else {
                strBuilder.append(prefix + "  PARTIAL_UPDATE_MODE: UPDATE_FLEXIBLE_COLUMNS");
            }
        }
        return strBuilder.toString();
    }

    @Override
    public PlanNodeId getExchNodeId() {
        return null;
    }

    @Override
    public DataPartition getOutputPartition() {
        return DataPartition.RANDOM;
    }

    @Override
    protected TDataSink toThrift() {
        return tDataSink;
    }

    public TOlapTableSchemaParam createSchema(long dbId, OlapTable table, Analyzer analyzer) throws AnalysisException {
        TOlapTableSchemaParam schemaParam = new TOlapTableSchemaParam();
        schemaParam.setDbId(dbId);
        schemaParam.setTableId(table.getId());
        schemaParam.setVersion(table.getIndexMetaByIndexId(table.getBaseIndexId()).getSchemaVersion());
        schemaParam.setIsStrictMode(isStrictMode);

        schemaParam.tuple_desc = tupleDescriptor.toThrift();
        for (SlotDescriptor slotDesc : tupleDescriptor.getSlots()) {
            schemaParam.addToSlotDescs(slotDesc.toThrift());
        }

        for (Map.Entry<Long, MaterializedIndexMeta> pair : table.getIndexIdToMeta().entrySet()) {
            MaterializedIndexMeta indexMeta = pair.getValue();
            List<String> columns = Lists.newArrayList();
            List<TColumn> columnsDesc = Lists.newArrayList();
            List<TOlapTableIndex> indexDesc = Lists.newArrayList();
            columns.addAll(indexMeta.getSchema().stream().map(Column::getNonShadowName).collect(Collectors.toList()));
            for (Column column : indexMeta.getSchema()) {
                TColumn tColumn = column.toThrift();
                // When schema change is doing, some modified column has prefix in name. Columns here
                // is for the schema in rowset meta, which should be no column with shadow prefix.
                // So we should remove the shadow prefix here.
                if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
                    tColumn.setColumnName(column.getNonShadowName());
                }
                column.setIndexFlag(tColumn, table);
                columnsDesc.add(tColumn);
            }
            List<Index> indexes = indexMeta.getIndexes();
            if (indexes.size() == 0 && pair.getKey() == table.getBaseIndexId()) {
                // for compatible with old version befor 2.0-beta
                // if indexMeta.getIndexes() is empty, use table.getIndexes()
                indexes = table.getIndexes();
            }
            for (Index index : indexes) {
                TOlapTableIndex tIndex = index.toThrift(index.getColumnUniqueIds(table.getBaseSchema()));
                indexDesc.add(tIndex);
            }
            TOlapTableIndexSchema indexSchema = new TOlapTableIndexSchema(pair.getKey(), columns,
                    indexMeta.getSchemaHash());
            if (indexMeta.getWhereClause() != null) {
                Expr expr = indexMeta.getWhereClause().clone();
                expr.replaceSlot(tupleDescriptor);
                if (analyzer != null) {
                    tupleDescriptor.setTable(table);
                    analyzer.registerTupleDescriptor(tupleDescriptor);
                    expr.analyze(analyzer);
                }
                indexSchema.setWhereClause(expr.treeToThrift());
            }
            indexSchema.setColumnsDesc(columnsDesc);
            indexSchema.setIndexesDesc(indexDesc);
            schemaParam.addToIndexes(indexSchema);
        }
        // for backward compatibility
        schemaParam.setIsPartialUpdate(uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
        schemaParam.setUniqueKeyUpdateMode(uniqueKeyUpdateMode);
        if (uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT) {
            if (table.getState() == OlapTable.OlapTableState.ROLLUP
                    || table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE) {
                throw new AnalysisException("Can't do partial update when table is doing schema change.");
            }

        }
        if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && table.getSequenceMapCol() != null) {
            Column seqMapCol = table.getFullSchema().stream()
                    .filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol()))
                    .findFirst().get();
            schemaParam.setSequenceMapColUniqueId(seqMapCol.getUniqueId());
        }
        if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
            for (String s : partialUpdateInputColumns) {
                schemaParam.addToPartialUpdateInputColumns(s);
            }
            for (Column col : table.getFullSchema()) {
                if (col.isAutoInc()) {
                    schemaParam.setAutoIncrementColumn(col.getName());
                    schemaParam.setAutoIncrementColumnUniqueId(col.getUniqueId());
                }
            }
        }
        schemaParam.setInvertedIndexFileStorageFormat(table.getInvertedIndexFileStorageFormat());
        return schemaParam;
    }

    private TOlapTableSchemaParam createSchema(long dbId, OlapTable table) throws AnalysisException {
        TOlapTableSchemaParam schemaParam = new TOlapTableSchemaParam();
        schemaParam.setDbId(dbId);
        schemaParam.setTableId(table.getId());
        schemaParam.setVersion(table.getIndexMetaByIndexId(table.getBaseIndexId()).getSchemaVersion());
        schemaParam.setIsStrictMode(isStrictMode);

        schemaParam.tuple_desc = tupleDescriptor.toThrift();
        for (SlotDescriptor slotDesc : tupleDescriptor.getSlots()) {
            schemaParam.addToSlotDescs(slotDesc.toThrift());
        }

        for (Map.Entry<Long, MaterializedIndexMeta> pair : table.getIndexIdToMeta().entrySet()) {
            MaterializedIndexMeta indexMeta = pair.getValue();
            List<String> columns = Lists.newArrayList();
            List<TColumn> columnsDesc = Lists.newArrayList();
            List<TOlapTableIndex> indexDesc = Lists.newArrayList();
            columns.addAll(indexMeta.getSchema().stream().map(Column::getNonShadowName).collect(Collectors.toList()));
            for (Column column : indexMeta.getSchema()) {
                TColumn tColumn = column.toThrift();
                // When schema change is doing, some modified column has prefix in name. Columns here
                // is for the schema in rowset meta, which should be no column with shadow prefix.
                // So we should remove the shadow prefix here.
                if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
                    tColumn.setColumnName(column.getNonShadowName());
                }
                column.setIndexFlag(tColumn, table);
                columnsDesc.add(tColumn);
            }
            List<Index> indexes = indexMeta.getIndexes();
            if (indexes.size() == 0 && pair.getKey() == table.getBaseIndexId()) {
                // for compatible with old version befor 2.0-beta
                // if indexMeta.getIndexes() is empty, use table.getIndexes()
                indexes = table.getIndexes();
            }
            for (Index index : indexes) {
                TOlapTableIndex tIndex = index.toThrift(index.getColumnUniqueIds(table.getBaseSchema()));
                indexDesc.add(tIndex);
            }
            TOlapTableIndexSchema indexSchema = new TOlapTableIndexSchema(pair.getKey(), columns,
                    indexMeta.getSchemaHash());
            Expr whereClause = indexMeta.getWhereClause();
            if (whereClause != null) {
                Expr expr = syncMvWhereClauses.getOrDefault(pair.getKey(), null);
                if (expr == null) {
                    throw new AnalysisException(String.format("%s is not analyzed", whereClause.toSql()));
                }
                indexSchema.setWhereClause(expr.treeToThrift());
            }
            indexSchema.setColumnsDesc(columnsDesc);
            indexSchema.setIndexesDesc(indexDesc);
            schemaParam.addToIndexes(indexSchema);
        }
        // for backward compatibility
        schemaParam.setIsPartialUpdate(uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS);
        schemaParam.setUniqueKeyUpdateMode(uniqueKeyUpdateMode);
        if (uniqueKeyUpdateMode != TUniqueKeyUpdateMode.UPSERT) {
            if (table.getState() == OlapTable.OlapTableState.ROLLUP
                    || table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE) {
                throw new AnalysisException("Can't do partial update when table is doing schema change.");
            }
        }
        if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FLEXIBLE_COLUMNS && table.getSequenceMapCol() != null) {
            Column seqMapCol = table.getFullSchema().stream()
                    .filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol()))
                    .findFirst().get();
            schemaParam.setSequenceMapColUniqueId(seqMapCol.getUniqueId());
        }
        if (uniqueKeyUpdateMode == TUniqueKeyUpdateMode.UPDATE_FIXED_COLUMNS) {
            for (String s : partialUpdateInputColumns) {
                schemaParam.addToPartialUpdateInputColumns(s);
            }
            for (Column col : table.getFullSchema()) {
                if (col.isAutoInc()) {
                    schemaParam.setAutoIncrementColumn(col.getName());
                    schemaParam.setAutoIncrementColumnUniqueId(col.getUniqueId());
                }
            }
        }
        schemaParam.setInvertedIndexFileStorageFormat(table.getInvertedIndexFileStorageFormat());
        return schemaParam;
    }

    private List<String> getDistColumns(DistributionInfo distInfo) throws UserException {
        List<String> distColumns = Lists.newArrayList();
        switch (distInfo.getType()) {
            case HASH: {
                HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distInfo;
                for (Column column : hashDistributionInfo.getDistributionColumns()) {
                    distColumns.add(column.getName());
                }
                break;
            }
            case RANDOM: {
                // RandomDistributionInfo doesn't have distributedColumns
                break;
            }
            default:
                throw new UserException("unsupported distributed type, type=" + distInfo.getType());
        }
        return distColumns;
    }

    private PartitionItem createDummyPartitionItem(PartitionType partType) throws UserException {
        if (partType == PartitionType.LIST) {
            return ListPartitionItem.DUMMY_ITEM;
        } else if (partType == PartitionType.RANGE) {
            return RangePartitionItem.DUMMY_ITEM;
        } else {
            throw new UserException("unsupported partition for OlapTable, partition=" + partType);
        }
    }

    private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable table, Analyzer analyzer,
            TOlapTablePartitionParam partitionParam, PartitionInfo partitionInfo, PartitionType partType)
            throws UserException {
        partitionParam.setEnableAutomaticPartition(true);
        // these partitions only use in locations. not find partition.
        partitionParam.setPartitionsIsFake(true);

        // set columns
        for (Column partCol : partitionInfo.getPartitionColumns()) {
            partitionParam.addToPartitionColumns(partCol.getName());
        }

        int partColNum = partitionInfo.getPartitionColumns().size();

        TOlapTablePartition fakePartition = new TOlapTablePartition();
        fakePartition.setId(0);
        // set partition keys
        setPartitionKeys(fakePartition, createDummyPartitionItem(partType), partColNum);

        for (Long indexId : table.getIndexIdToMeta().keySet()) {
            fakePartition.addToIndexes(new TOlapTableIndexTablets(indexId, Arrays.asList(0L)));
            fakePartition.setNumBuckets(1);
        }
        fakePartition.setIsMutable(true);

        DistributionInfo distInfo = table.getDefaultDistributionInfo();
        partitionParam.setDistributedColumns(getDistColumns(distInfo));
        partitionParam.addToPartitions(fakePartition);

        ArrayList<Expr> exprSource = partitionInfo.getPartitionExprs();
        if (exprSource != null && analyzer != null) {
            Analyzer funcAnalyzer = new Analyzer(analyzer.getEnv(), analyzer.getContext());
            tupleDescriptor.setTable(table);
            funcAnalyzer.registerTupleDescriptor(tupleDescriptor);
            // we must clone the exprs. otherwise analyze will influence the origin exprs.
            ArrayList<Expr> exprs = new ArrayList<Expr>();
            for (Expr e : exprSource) {
                exprs.add(e.clone());
            }
            for (Expr e : exprs) {
                e.reset();
                e.analyze(funcAnalyzer);
            }
            partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs));
        }

        return partitionParam;
    }

    private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable table,
            TOlapTablePartitionParam partitionParam, PartitionInfo partitionInfo, PartitionType partType)
            throws UserException {
        partitionParam.setEnableAutomaticPartition(true);
        // these partitions only use in locations. not find partition.
        partitionParam.setPartitionsIsFake(true);

        // set columns
        for (Column partCol : partitionInfo.getPartitionColumns()) {
            partitionParam.addToPartitionColumns(partCol.getName());
        }

        int partColNum = partitionInfo.getPartitionColumns().size();

        TOlapTablePartition fakePartition = new TOlapTablePartition();
        fakePartition.setId(0);
        // set partition keys
        setPartitionKeys(fakePartition, createDummyPartitionItem(partType), partColNum);

        for (Long indexId : table.getIndexIdToMeta().keySet()) {
            fakePartition.addToIndexes(new TOlapTableIndexTablets(indexId, Arrays.asList(0L)));
            fakePartition.setNumBuckets(1);
        }
        fakePartition.setIsMutable(true);

        DistributionInfo distInfo = table.getDefaultDistributionInfo();
        partitionParam.setDistributedColumns(getDistColumns(distInfo));
        partitionParam.addToPartitions(fakePartition);

        ArrayList<Expr> exprSource = partitionInfo.getPartitionExprs();
        if (exprSource != null && !exprSource.isEmpty()) {
            if (exprSource.size() != partitionExprs.size()) {
                throw new UserException(String.format("%s is not analyzed", exprSource));
            }
            partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(partitionExprs));
        }

        return partitionParam;
    }

    public TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Analyzer analyzer)
            throws UserException {
        TOlapTablePartitionParam partitionParam = new TOlapTablePartitionParam();
        PartitionInfo partitionInfo = table.getPartitionInfo();
        boolean enableAutomaticPartition = partitionInfo.enableAutomaticPartition();
        PartitionType partType = table.getPartitionInfo().getType();
        partitionParam.setDbId(dbId);
        partitionParam.setTableId(table.getId());
        partitionParam.setVersion(0);
        partitionParam.setPartitionType(partType.toThrift());

        // create shadow partition for empty auto partition table. only use in this load.
        if (enableAutomaticPartition && partitionIds.isEmpty()) {
            return createDummyPartition(dbId, table, analyzer, partitionParam, partitionInfo, partType);
        }

        switch (partType) {
            case LIST:
            case RANGE: {
                for (Column partCol : partitionInfo.getPartitionColumns()) {
                    partitionParam.addToPartitionColumns(partCol.getName());
                }

                int partColNum = partitionInfo.getPartitionColumns().size();
                DistributionInfo selectedDistInfo = null;

                for (Long partitionId : partitionIds) {
                    Partition partition = table.getPartition(partitionId);
                    TOlapTablePartition tPartition = new TOlapTablePartition();
                    tPartition.setId(partition.getId());
                    // set partition keys
                    setPartitionKeys(tPartition, partitionInfo.getItem(partition.getId()), partColNum);

                    for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
                        tPartition.addToIndexes(new TOlapTableIndexTablets(index.getId(), Lists.newArrayList(
                                index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList()))));
                        tPartition.setNumBuckets(index.getTablets().size());
                    }
                    tPartition.setIsMutable(table.getPartitionInfo().getIsMutable(partitionId));
                    if (partition.getDistributionInfo().getType() == DistributionInfoType.RANDOM) {
                        int tabletIndex;
                        if (tDataSink != null && tDataSink.type == TDataSinkType.GROUP_COMMIT_BLOCK_SINK) {
                            tabletIndex = 0;
                        } else {
                            tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
                                    .getCurrentTabletLoadIndex(dbId, table.getId(), partition);
                        }
                        tPartition.setLoadTabletIdx(tabletIndex);
                    }

                    partitionParam.addToPartitions(tPartition);

                    DistributionInfo distInfo = partition.getDistributionInfo();
                    if (selectedDistInfo == null) {
                        partitionParam.setDistributedColumns(getDistColumns(distInfo));
                        selectedDistInfo = distInfo;
                    } else {
                        if (selectedDistInfo.getType() != distInfo.getType()) {
                            throw new UserException("different distribute types in two different partitions, type1="
                                    + selectedDistInfo.getType() + ", type2=" + distInfo.getType());
                        }
                    }
                }
                // for auto create partition by function expr, there is no any partition firstly,
                // But this is required in thrift struct.
                if (enableAutomaticPartition && partitionIds.isEmpty()) {
                    partitionParam.setDistributedColumns(getDistColumns(table.getDefaultDistributionInfo()));
                    partitionParam.setPartitions(new ArrayList<TOlapTablePartition>());
                }

                ArrayList<Expr> exprSource = partitionInfo.getPartitionExprs();
                if (enableAutomaticPartition && exprSource != null && analyzer != null) {
                    Analyzer funcAnalyzer = new Analyzer(analyzer.getEnv(), analyzer.getContext());
                    tupleDescriptor.setTable(table);
                    funcAnalyzer.registerTupleDescriptor(tupleDescriptor);
                    // we must clone the exprs. otherwise analyze will influence the origin exprs.
                    ArrayList<Expr> exprs = new ArrayList<Expr>();
                    for (Expr e : exprSource) {
                        exprs.add(e.clone());
                    }
                    for (Expr e : exprs) {
                        e.reset();
                        e.analyze(funcAnalyzer);
                    }
                    partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs));
                }

                partitionParam.setEnableAutomaticPartition(enableAutomaticPartition);
                break;
            }
            case UNPARTITIONED: {
                // there is no partition columns for single partition
                Preconditions.checkArgument(table.getPartitions().size() == 1,
                        "Number of table partitions is not 1 for unpartitioned table, partitionNum="
                                + table.getPartitions().size());
                Partition partition;
                if (partitionIds != null && partitionIds.size() == 1) {
                    partition = table.getPartition(partitionIds.get(0));
                } else {
                    partition = table.getPartitions().iterator().next();
                }

                TOlapTablePartition tPartition = new TOlapTablePartition();
                tPartition.setId(partition.getId());
                tPartition.setIsMutable(table.getPartitionInfo().getIsMutable(partition.getId()));
                // No lowerBound and upperBound for this range
                for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
                    tPartition.addToIndexes(new TOlapTableIndexTablets(index.getId(), Lists.newArrayList(
                            index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList()))));
                    tPartition.setNumBuckets(index.getTablets().size());
                }

                if (partition.getDistributionInfo().getType() == DistributionInfoType.RANDOM) {
                    int tabletIndex;
                    if (tDataSink != null && tDataSink.type == TDataSinkType.GROUP_COMMIT_BLOCK_SINK) {
                        tabletIndex = 0;
                    } else {
                        tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
                                .getCurrentTabletLoadIndex(dbId, table.getId(), partition);
                    }
                    tPartition.setLoadTabletIdx(tabletIndex);
                }
                partitionParam.addToPartitions(tPartition);
                partitionParam.setDistributedColumns(getDistColumns(partition.getDistributionInfo()));
                partitionParam.setEnableAutomaticPartition(false);
                break;
            }
            default: {
                throw new UserException("unsupported partition for OlapTable, partition=" + partType);
            }
        }
        return partitionParam;
    }

    private TOlapTablePartitionParam createPartition(long dbId, OlapTable table)
            throws UserException {
        TOlapTablePartitionParam partitionParam = new TOlapTablePartitionParam();
        PartitionInfo partitionInfo = table.getPartitionInfo();
        boolean enableAutomaticPartition = partitionInfo.enableAutomaticPartition();
        PartitionType partType = table.getPartitionInfo().getType();
        partitionParam.setDbId(dbId);
        partitionParam.setTableId(table.getId());
        partitionParam.setVersion(0);
        partitionParam.setPartitionType(partType.toThrift());

        // create shadow partition for empty auto partition table. only use in this load.
        if (enableAutomaticPartition && partitionIds.isEmpty()) {
            return createDummyPartition(dbId, table, partitionParam, partitionInfo, partType);
        }

        switch (partType) {
            case LIST:
            case RANGE: {
                for (Column partCol : partitionInfo.getPartitionColumns()) {
                    partitionParam.addToPartitionColumns(partCol.getName());
                }

                int partColNum = partitionInfo.getPartitionColumns().size();
                DistributionInfo selectedDistInfo = null;

                for (Long partitionId : partitionIds) {
                    Partition partition = table.getPartition(partitionId);
                    TOlapTablePartition tPartition = new TOlapTablePartition();
                    tPartition.setId(partition.getId());
                    // set partition keys
                    setPartitionKeys(tPartition, partitionInfo.getItem(partition.getId()), partColNum);

                    for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
                        tPartition.addToIndexes(new TOlapTableIndexTablets(index.getId(), Lists.newArrayList(
                                index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList()))));
                        tPartition.setNumBuckets(index.getTablets().size());
                    }
                    tPartition.setIsMutable(table.getPartitionInfo().getIsMutable(partitionId));
                    if (partition.getDistributionInfo().getType() == DistributionInfoType.RANDOM) {
                        int tabletIndex;
                        if (tDataSink != null && tDataSink.type == TDataSinkType.GROUP_COMMIT_BLOCK_SINK) {
                            tabletIndex = 0;
                        } else {
                            tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
                                    .getCurrentTabletLoadIndex(dbId, table.getId(), partition);
                        }
                        tPartition.setLoadTabletIdx(tabletIndex);
                    }

                    partitionParam.addToPartitions(tPartition);

                    DistributionInfo distInfo = partition.getDistributionInfo();
                    if (selectedDistInfo == null) {
                        partitionParam.setDistributedColumns(getDistColumns(distInfo));
                        selectedDistInfo = distInfo;
                    } else {
                        if (selectedDistInfo.getType() != distInfo.getType()) {
                            throw new UserException("different distribute types in two different partitions, type1="
                                    + selectedDistInfo.getType() + ", type2=" + distInfo.getType());
                        }
                    }
                }
                // for auto create partition by function expr, there is no any partition firstly,
                // But this is required in thrift struct.
                if (enableAutomaticPartition && partitionIds.isEmpty()) {
                    partitionParam.setDistributedColumns(getDistColumns(table.getDefaultDistributionInfo()));
                    partitionParam.setPartitions(new ArrayList<TOlapTablePartition>());
                }

                ArrayList<Expr> exprSource = partitionInfo.getPartitionExprs();
                if (enableAutomaticPartition && exprSource != null && !exprSource.isEmpty()) {
                    if (exprSource.size() != partitionExprs.size()) {
                        throw new UserException(String.format("%s is not analyzed", exprSource));
                    }
                    partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(partitionExprs));
                }

                partitionParam.setEnableAutomaticPartition(enableAutomaticPartition);
                break;
            }
            case UNPARTITIONED: {
                // there is no partition columns for single partition
                Preconditions.checkArgument(table.getPartitions().size() == 1,
                        "Number of table partitions is not 1 for unpartitioned table, partitionNum="
                                + table.getPartitions().size());
                Partition partition;
                if (partitionIds != null && partitionIds.size() == 1) {
                    partition = table.getPartition(partitionIds.get(0));
                } else {
                    partition = table.getPartitions().iterator().next();
                }

                TOlapTablePartition tPartition = new TOlapTablePartition();
                tPartition.setId(partition.getId());
                tPartition.setIsMutable(table.getPartitionInfo().getIsMutable(partition.getId()));
                // No lowerBound and upperBound for this range
                for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
                    tPartition.addToIndexes(new TOlapTableIndexTablets(index.getId(), Lists.newArrayList(
                            index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList()))));
                    tPartition.setNumBuckets(index.getTablets().size());
                }

                if (partition.getDistributionInfo().getType() == DistributionInfoType.RANDOM) {
                    int tabletIndex;
                    if (tDataSink != null && tDataSink.type == TDataSinkType.GROUP_COMMIT_BLOCK_SINK) {
                        tabletIndex = 0;
                    } else {
                        tabletIndex = Env.getCurrentEnv().getTabletLoadIndexRecorderMgr()
                                .getCurrentTabletLoadIndex(dbId, table.getId(), partition);
                    }
                    tPartition.setLoadTabletIdx(tabletIndex);
                }
                partitionParam.addToPartitions(tPartition);
                partitionParam.setDistributedColumns(getDistColumns(partition.getDistributionInfo()));
                partitionParam.setEnableAutomaticPartition(false);
                break;
            }
            default: {
                throw new UserException("unsupported partition for OlapTable, partition=" + partType);
            }
        }
        return partitionParam;
    }

    public static void setPartitionKeys(TOlapTablePartition tPartition, PartitionItem partitionItem, int partColNum)
            throws UserException {
        if (partitionItem instanceof RangePartitionItem) {
            Range<PartitionKey> range = partitionItem.getItems();
            // set start keys. min value is a REAL value. should be legal.
            if (range.hasLowerBound() && !range.lowerEndpoint().isMinValue()) {
                for (int i = 0; i < partColNum; i++) {
                    tPartition.addToStartKeys(range.lowerEndpoint().getKeys().get(i).treeToThrift().getNodes().get(0));
                }
            }
            // TODO: support real MaxLiteral in thrift.
            // now we dont send it to BE. if BE meet it, treat it as default value.
            // see VOlapTablePartition's ctor in tablet_info.h
            if (range.hasUpperBound() && !range.upperEndpoint().isMaxValue()) {
                for (int i = 0; i < partColNum; i++) {
                    tPartition.addToEndKeys(range.upperEndpoint().getKeys().get(i).treeToThrift().getNodes().get(0));
                }
            }
        } else if (partitionItem instanceof ListPartitionItem) {
            List<PartitionKey> partitionKeys = partitionItem.getItems();
            // set in keys
            for (PartitionKey partitionKey : partitionKeys) {
                List<TExprNode> tExprNodes = new ArrayList<>();
                for (int i = 0; i < partColNum; i++) {
                    LiteralExpr literalExpr = partitionKey.getKeys().get(i);
                    if (literalExpr.isNullLiteral()) {
                        tExprNodes.add(NullLiteral.create(literalExpr.getType()).treeToThrift().getNodes().get(0));
                    } else {
                        tExprNodes.add(literalExpr.treeToThrift().getNodes().get(0));
                    }
                }
                tPartition.addToInKeys(tExprNodes);
                tPartition.setIsDefaultPartition(partitionItem.isDefaultPartition());
            }
        }
    }

    public List<TOlapTableLocationParam> createDummyLocation(OlapTable table) throws UserException {
        TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
        TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam();

        final long fakeTabletId = 0;
        SystemInfoService clusterInfo = Env.getCurrentSystemInfo();
        List<Long> aliveBe = clusterInfo.getAllBackendIds(true);
        if (aliveBe.isEmpty()) {
            throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, "no available BE in cluster");
        }
        for (int i = 0; i < table.getIndexNumber(); i++) {
            // only one fake tablet here
            if (singleReplicaLoad) {
                Long[] nodes = aliveBe.toArray(new Long[0]);
                List<Long> slaveBe = aliveBe;

                Random random = new SecureRandom();
                int masterNode = random.nextInt(nodes.length);
                locationParam.addToTablets(new TTabletLocation(fakeTabletId,
                        Arrays.asList(nodes[masterNode])));

                slaveBe.remove(masterNode);
                slaveLocationParam.addToTablets(new TTabletLocation(fakeTabletId,
                        slaveBe));
            } else {
                locationParam.addToTablets(new TTabletLocation(fakeTabletId,
                        Arrays.asList(aliveBe.get(0)))); // just one fake location is enough

                LOG.info("created dummy location tablet_id={}, be_id={}", fakeTabletId, aliveBe.get(0));
            }
        }

        return Arrays.asList(locationParam, slaveLocationParam);
    }

    private List<TOlapTableLocationParam> createLocation(long dbId, OlapTable table) throws UserException {
        if (table.getPartitionInfo().enableAutomaticPartition() && partitionIds.isEmpty()) {
            return createDummyLocation(table);
        }

        TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
        TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam();
        // BE id -> path hash
        Multimap<Long, Long> allBePathsMap = HashMultimap.create();
        for (long partitionId : partitionIds) {
            Partition partition = table.getPartition(partitionId);
            int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partition.getId());
            for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
                // we should ensure the replica backend is alive
                // otherwise, there will be a 'unknown node id, id=xxx' error for stream load
                for (Tablet tablet : index.getTablets()) {
                    StringBuilder errMsgBuilder = new StringBuilder();
                    Multimap<Long, Long> bePathsMap = HashMultimap.create();
                    try {
                        bePathsMap = tablet.getNormalReplicaBackendPathMap();
                        if (bePathsMap.keySet().size() < loadRequiredReplicaNum) {
                            errMsgBuilder.append("tablet ").append(tablet.getId())
                                    .append(" alive replica num ").append(bePathsMap.keySet().size())
                                    .append(" < load required replica num ").append(loadRequiredReplicaNum)
                                    .append(", alive backends: [")
                                    .append(StringUtils.join(bePathsMap.keySet(), ","))
                                    .append("]");
                            if (!Config.isCloudMode()) {
                                // in cloud mode, partition get visible version is a rpc,
                                // and each cluster has only one replica, no need to detail the replicas in cloud mode.
                                errMsgBuilder.append(", detail: ")
                                        .append(tablet.getDetailsStatusForQuery(partition.getVisibleVersion()));
                            }
                            long now = System.currentTimeMillis();
                            long lastLoadFailedTime = tablet.getLastLoadFailedTime();
                            tablet.setLastLoadFailedTime(now);
                            if (now - lastLoadFailedTime >= 5000L) {
                                Env.getCurrentEnv().getTabletScheduler().tryAddRepairTablet(
                                        tablet, dbId, table, partition, index, 0);
                            }
                            throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, errMsgBuilder.toString());
                        }
                    } catch (ComputeGroupException e) {
                        LOG.warn("failed to get replica backend path for tablet " + tablet.getId(), e);
                        errMsgBuilder.append(", ").append(e.toString());
                        throw new UserException(InternalErrorCode.INTERNAL_ERR, errMsgBuilder.toString());
                    }
                    if (!Config.isCloudMode()) {
                        debugWriteRandomChooseSink(tablet, partition.getVisibleVersion(), bePathsMap);
                    }
                    if (bePathsMap.keySet().isEmpty()) {
                        throw new UserException(InternalErrorCode.REPLICA_FEW_ERR,
                                "tablet " + tablet.getId() + " no available replica");
                    }

                    if (singleReplicaLoad) {
                        Long[] nodes = bePathsMap.keySet().toArray(new Long[0]);
                        Random random = new SecureRandom();
                        Long masterNode = nodes[random.nextInt(nodes.length)];
                        Multimap<Long, Long> slaveBePathsMap = bePathsMap;
                        slaveBePathsMap.removeAll(masterNode);
                        locationParam.addToTablets(new TTabletLocation(tablet.getId(),
                                Lists.newArrayList(Sets.newHashSet(masterNode))));
                        slaveLocationParam.addToTablets(new TTabletLocation(tablet.getId(),
                                Lists.newArrayList(slaveBePathsMap.keySet())));
                    } else {
                        locationParam.addToTablets(new TTabletLocation(tablet.getId(),
                                Lists.newArrayList(bePathsMap.keySet())));
                    }
                    allBePathsMap.putAll(bePathsMap);
                }
            }
        }

        // for partition by function expr, there is no any partition firstly, But this is required in thrift struct.
        if (partitionIds.isEmpty()) {
            locationParam.setTablets(new ArrayList<TTabletLocation>());
            slaveLocationParam.setTablets(new ArrayList<TTabletLocation>());
        }
        // check if disk capacity reach limit
        // this is for load process, so use high water mark to check
        Status st = Env.getCurrentSystemInfo().checkExceedDiskCapacityLimit(allBePathsMap, true);
        if (!st.ok()) {
            throw new DdlException(st.getErrorMsg());
        }
        return Arrays.asList(locationParam, slaveLocationParam);
    }

    private void debugWriteRandomChooseSink(Tablet tablet, long version, Multimap<Long, Long> bePathsMap) {
        DebugPoint debugPoint = DebugPointUtil.getDebugPoint("OlapTableSink.write_random_choose_sink");
        if (debugPoint == null) {
            return;
        }

        boolean needCatchup = debugPoint.param("needCatchUp", false);
        int sinkNum = debugPoint.param("sinkNum", 0);
        if (sinkNum == 0) {
            sinkNum = new SecureRandom().nextInt() % bePathsMap.size() + 1;
        }
        List<Long> candidatePaths = tablet.getReplicas().stream()
                .filter(replica -> !needCatchup || replica.getVersion() >= version)
                .map(Replica::getPathHash)
                .collect(Collectors.toList());
        if (sinkNum > 0 && sinkNum < candidatePaths.size()) {
            Collections.shuffle(candidatePaths);
            while (candidatePaths.size() > sinkNum) {
                candidatePaths.remove(candidatePaths.size() - 1);
            }
        }

        Multimap<Long, Long> result = HashMultimap.create();
        bePathsMap.forEach((tabletId, pathHash) -> {
            if (candidatePaths.contains(pathHash)) {
                result.put(tabletId, pathHash);
            }
        });

        bePathsMap.clear();
        bePathsMap.putAll(result);
    }

    public TPaloNodesInfo createPaloNodesInfo() {
        TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
        SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
        for (Long id : systemInfoService.getAllBackendIds(false)) {
            Backend backend = systemInfoService.getBackend(id);
            nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort()));
        }
        return nodesInfo;
    }

    protected TDataSinkType getDataSinkType() {
        return TDataSinkType.OLAP_TABLE_SINK;
    }

    public OlapTable getDstTable() {
        return dstTable;
    }

    public TupleDescriptor getTupleDescriptor() {
        return tupleDescriptor;
    }

    public long getTxnId() {
        return txnId;
    }
}