SchemaChangeHandler.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.alter;

import org.apache.doris.analysis.AddColumnClause;
import org.apache.doris.analysis.AddColumnsClause;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.BuildIndexClause;
import org.apache.doris.analysis.CancelAlterTableStmt;
import org.apache.doris.analysis.CancelStmt;
import org.apache.doris.analysis.ColumnPosition;
import org.apache.doris.analysis.CreateIndexClause;
import org.apache.doris.analysis.DropColumnClause;
import org.apache.doris.analysis.DropIndexClause;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.IndexDef;
import org.apache.doris.analysis.ModifyColumnClause;
import org.apache.doris.analysis.ModifyTablePropertiesClause;
import org.apache.doris.analysis.ReorderColumnsClause;
import org.apache.doris.analysis.ShowAlterStmt.AlterType;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ColumnType;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.GeneratedColumnInfo;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaContext;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DbUtil;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.IdGeneratorUtil;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.commands.AlterCommand;
import org.apache.doris.nereids.trees.plans.commands.CancelAlterTableCommand;
import org.apache.doris.nereids.trees.plans.commands.CancelBuildIndexCommand;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.persist.AlterLightSchemaChangeInfo;
import org.apache.doris.persist.RemoveAlterJobV2OperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
import org.apache.doris.persist.TableAddOrDropInvertedIndicesInfo;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.PolicyTypeEnum;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.ClearAlterTask;
import org.apache.doris.task.UpdateTabletMetaInfoTask;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TTaskType;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.IntSupplier;
import java.util.stream.Collectors;

public class SchemaChangeHandler extends AlterHandler {
    private static final Logger LOG = LogManager.getLogger(SchemaChangeHandler.class);

    // all shadow indexes should have this prefix in name
    public static final String SHADOW_NAME_PREFIX = "__doris_shadow_";

    public static final int MAX_ACTIVE_SCHEMA_CHANGE_JOB_V2_SIZE = 10;

    public static final int CYCLE_COUNT_TO_CHECK_EXPIRE_SCHEMA_CHANGE_JOB = 20;

    public final ThreadPoolExecutor schemaChangeThreadPool = ThreadPoolManager.newDaemonCacheThreadPool(
            MAX_ACTIVE_SCHEMA_CHANGE_JOB_V2_SIZE, "schema-change-pool", true);

    public final Map<Long, AlterJobV2> activeSchemaChangeJobsV2 = Maps.newConcurrentMap();

    public final Map<Long, AlterJobV2> runnableSchemaChangeJobV2 = Maps.newConcurrentMap();

    // queue of inverted index job
    public ConcurrentMap<Long, IndexChangeJob> indexChangeJobs = Maps.newConcurrentMap();

    public final Map<Long, IndexChangeJob> activeIndexChangeJob = Maps.newConcurrentMap();

    public final Map<Long, IndexChangeJob> runnableIndexChangeJob = Maps.newConcurrentMap();

    public int cycleCount = 0;

    public SchemaChangeHandler() {
        super("schema change", Config.default_schema_change_scheduler_interval_millisecond);
    }

    /**
     * @param alterClause
     * @param olapTable
     * @param indexSchemaMap
     * @param colUniqueIdSupplierMap for multi add columns clause, we need stash middle state of maxColUniqueId
     * @return true: can light schema change, false: cannot light schema change
     * @throws DdlException
     */
    private boolean processAddColumn(AddColumnClause alterClause, OlapTable olapTable,
                                     Map<Long, LinkedList<Column>> indexSchemaMap,
                                     Map<Long, IntSupplier> colUniqueIdSupplierMap)
            throws DdlException {
        Column column = alterClause.getColumn();
        ColumnPosition columnPos = alterClause.getColPos();
        String targetIndexName = alterClause.getRollupName();
        checkIndexExists(olapTable, targetIndexName);

        String baseIndexName = olapTable.getName();
        checkAssignedTargetIndexName(baseIndexName, targetIndexName);

        long baseIndexId = olapTable.getBaseIndexId();
        long targetIndexId = -1L;
        if (targetIndexName != null) {
            targetIndexId = olapTable.getIndexIdByName(targetIndexName);
        }

        Set<String> newColNameSet = Sets.newHashSet(column.getName());

        return addColumnInternal(olapTable, column, columnPos, targetIndexId, baseIndexId, indexSchemaMap,
                newColNameSet, false, colUniqueIdSupplierMap);
    }

    private void processAddColumn(AddColumnClause alterClause, Table externalTable, List<Column> newSchema)
            throws DdlException {
        Column column = alterClause.getColumn();
        ColumnPosition columnPos = alterClause.getColPos();
        Set<String> newColNameSet = Sets.newHashSet(column.getName());

        addColumnInternal(column, columnPos, newSchema, newColNameSet);
    }

    private void processAddColumns(AddColumnsClause alterClause, Table externalTable, List<Column> newSchema)
            throws DdlException {
        List<Column> columns = alterClause.getColumns();
        Set<String> newColNameSet = Sets.newHashSet();
        for (Column column : alterClause.getColumns()) {
            newColNameSet.add(column.getName());
        }

        for (Column newColumn : columns) {
            addColumnInternal(newColumn, null, newSchema, newColNameSet);
        }
    }

    /**
     * @param alterClause
     * @param olapTable
     * @param indexSchemaMap
     * @param ignoreSameColumn
     * @param colUniqueIdSupplierMap for multi add columns clause, we need stash middle state of maxColUniqueId
     * @return true: can light schema change, false: cannot light schema change
     * @throws DdlException
     */
    public boolean processAddColumns(AddColumnsClause alterClause, OlapTable olapTable,
                                     Map<Long, LinkedList<Column>> indexSchemaMap, boolean ignoreSameColumn,
                                     Map<Long, IntSupplier> colUniqueIdSupplierMap) throws DdlException {
        List<Column> columns = alterClause.getColumns();
        String targetIndexName = alterClause.getRollupName();
        checkIndexExists(olapTable, targetIndexName);

        Set<String> newColNameSet = Sets.newHashSet();
        for (Column column : columns) {
            newColNameSet.add(column.getName());
        }

        String baseIndexName = olapTable.getName();
        checkAssignedTargetIndexName(baseIndexName, targetIndexName);

        long baseIndexId = olapTable.getBaseIndexId();
        long targetIndexId = -1L;
        if (targetIndexName != null) {
            targetIndexId = olapTable.getIndexIdByName(targetIndexName);
        }

        boolean lightSchemaChange = true;
        for (Column column : columns) {
            boolean result = addColumnInternal(olapTable, column, null, targetIndexId, baseIndexId, indexSchemaMap,
                    newColNameSet, ignoreSameColumn, colUniqueIdSupplierMap);
            if (!result) {
                lightSchemaChange = false;
            }
        }
        return lightSchemaChange;
    }

    private void processDropColumn(DropColumnClause alterClause, Table externalTable, List<Column> newSchema)
            throws DdlException {
        String dropColName = alterClause.getColName();

        // find column in base index and remove it
        boolean found = false;
        Iterator<Column> baseIter = newSchema.iterator();
        while (baseIter.hasNext()) {
            Column column = baseIter.next();
            if (column.getName().equalsIgnoreCase(dropColName)) {
                if (newSchema.size() > 1) {
                    baseIter.remove();
                    found = true;
                } else {
                    throw new DdlException(
                            "Do not allow remove last column of table: " + externalTable.getName() + " column: "
                                    + dropColName);
                }
                break;
            }
        }

        if (!found) {
            throw new DdlException("Column does not exists: " + dropColName);
        }
    }

    /**
     * @param alterClause
     * @param olapTable
     * @param indexSchemaMap
     * @param indexes
     * @return true: can light schema change, false: cannot
     * @throws DdlException
     */
    private boolean processDropColumn(DropColumnClause alterClause, OlapTable olapTable,
                                      Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> indexes)
            throws DdlException {

        String dropColName = alterClause.getColName();
        String targetIndexName = alterClause.getRollupName();
        checkIndexExists(olapTable, targetIndexName);

        String baseIndexName = olapTable.getName();
        checkAssignedTargetIndexName(baseIndexName, targetIndexName);
        long baseIndexId = olapTable.getBaseIndexId();

        long targetIndexId = -1L;
        if (targetIndexName != null) {
            targetIndexId = olapTable.getIndexIdByName(targetIndexName);
        }

        boolean lightSchemaChange = olapTable.getEnableLightSchemaChange();
        /*
         * UNIQUE:
         *      Can not drop any key column, cluster key column
         * AGGREGATION:
         *      Can not drp any key column is has value with REPLACE method
         */
        if (KeysType.UNIQUE_KEYS == olapTable.getKeysType()) {
            List<Column> baseSchema = indexSchemaMap.get(baseIndexId);
            for (Column column : baseSchema) {
                if (column.getName().equalsIgnoreCase(dropColName)) {
                    if (column.isKey()) {
                        throw new DdlException("Can not drop key column in Unique data model table");
                    } else if (column.isClusterKey()) {
                        throw new DdlException("Can not drop cluster key column in Unique data model table");
                    }
                }
            }
            if (olapTable.hasSequenceCol() && dropColName.equalsIgnoreCase(olapTable.getSequenceMapCol())) {
                throw new DdlException("Can not drop sequence mapping column[" + dropColName
                        + "] in Unique data model table[" + olapTable.getName() + "]");
            }
        } else if (KeysType.AGG_KEYS == olapTable.getKeysType()) {
            if (null == targetIndexName) {
                // drop column in base table
                List<Column> baseSchema = indexSchemaMap.get(baseIndexId);
                boolean isKey = false;
                boolean hasReplaceColumn = false;
                for (Column column : baseSchema) {
                    if (column.isKey() && column.getName().equalsIgnoreCase(dropColName)) {
                        isKey = true;
                        lightSchemaChange = false;
                    } else if (AggregateType.REPLACE == column.getAggregationType()
                            || AggregateType.REPLACE_IF_NOT_NULL == column.getAggregationType()) {
                        hasReplaceColumn = true;
                    }
                }
                if (isKey && hasReplaceColumn) {
                    throw new DdlException(
                            "Can not drop key column when table has value column with REPLACE aggregation method");
                }
            } else {
                // drop column in rollup and base index
                // find column
                List<Column> targetIndexSchema = indexSchemaMap.get(targetIndexId);
                boolean isKey = false;
                boolean hasReplaceColumn = false;
                for (Column column : targetIndexSchema) {
                    if (column.isKey() && column.getName().equalsIgnoreCase(dropColName)) {
                        isKey = true;
                        lightSchemaChange = false;
                    } else if (AggregateType.REPLACE == column.getAggregationType()
                            || AggregateType.REPLACE_IF_NOT_NULL == column.getAggregationType()) {
                        hasReplaceColumn = true;
                    }
                }
                if (isKey && hasReplaceColumn) {
                    throw new DdlException(
                            "Can not drop key column when rollup has value column with REPLACE aggregation method");
                }
            }
        } else if (KeysType.DUP_KEYS == olapTable.getKeysType()) {
            List<Column> baseSchema = indexSchemaMap.get(baseIndexId);
            for (Column column : baseSchema) {
                if (column.isKey() && column.getName().equalsIgnoreCase(dropColName)) {
                    lightSchemaChange = false;
                    break;
                }
            }
        }

        // generated column check
        Map<String, Column> nameToColumn = new HashMap<>();
        for (Column c : indexSchemaMap.get(baseIndexId)) {
            nameToColumn.put(c.getName(), c);
        }
        if (null == targetIndexName) {
            if (nameToColumn.containsKey(dropColName)) {
                Column column = nameToColumn.get(dropColName);
                Set<String> generatedColumnsThatReferToThis = column.getGeneratedColumnsThatReferToThis();
                if (!generatedColumnsThatReferToThis.isEmpty()) {
                    throw new DdlException(
                            "Column '" + dropColName + "' has a generated column dependency on :"
                                    + generatedColumnsThatReferToThis);
                }
            }
        }

        Iterator<Index> it = indexes.iterator();
        while (it.hasNext()) {
            Index index = it.next();
            for (String indexCol : index.getColumns()) {
                if (dropColName.equalsIgnoreCase(indexCol)) {
                    it.remove();
                    break;
                }
            }
        }

        if (targetIndexName == null) {
            // if not specify rollup index, column should be dropped from both base and rollup indexes.
            List<Long> indexIds = new ArrayList<Long>();
            indexIds.add(baseIndexId);
            indexIds.addAll(olapTable.getIndexIdListExceptBaseIndex());

            // find column in base index and remove it
            List<Column> baseSchema = indexSchemaMap.get(baseIndexId);
            boolean found = false;
            Iterator<Column> baseIter = baseSchema.iterator();
            while (baseIter.hasNext()) {
                Column column = baseIter.next();
                if (column.getName().equalsIgnoreCase(dropColName)) {
                    baseIter.remove();
                    found = true;
                    // find generated column referred column
                    removeColumnWhenDropGeneratedColumn(column, nameToColumn);
                    break;
                }
            }
            if (!found) {
                throw new DdlException("Column does not exists: " + dropColName);
            }

            // drop bloom filter column
            Set<String> bfCols = olapTable.getCopiedBfColumns();
            if (bfCols != null) {
                Set<String> newBfCols = null;
                for (String bfCol : bfCols) {
                    if (!bfCol.equalsIgnoreCase(dropColName)) {
                        if (newBfCols == null) {
                            newBfCols = Sets.newHashSet();
                        }
                        newBfCols.add(bfCol);
                    }
                }
                olapTable.setBloomFilterInfo(newBfCols, olapTable.getBfFpp());
            }

            for (int i = 1; i < indexIds.size(); i++) {
                List<Column> rollupSchema = indexSchemaMap.get(indexIds.get(i));
                Iterator<Column> iter = rollupSchema.iterator();
                while (iter.hasNext()) {
                    Column column = iter.next();
                    boolean containedByMV = column.getName().equalsIgnoreCase(dropColName);
                    if (!containedByMV && column.getDefineExpr() != null) {
                        List<SlotRef> slots = new ArrayList<>();
                        column.getDefineExpr().collect(SlotRef.class, slots);
                        for (SlotRef slot : slots) {
                            if (slot.getColumnName().equalsIgnoreCase(dropColName)) {
                                containedByMV = true;
                                break;
                            }
                        }
                    }
                    if (containedByMV) {
                        throw new DdlException("Can not drop column contained by mv, mv="
                                + olapTable.getIndexNameById(indexIds.get(i)));
                    }
                }
            }
        } else {
            // if specify rollup index, only drop column from specified rollup index
            // find column
            List<Column> targetIndexSchema = indexSchemaMap.get(targetIndexId);
            boolean found = false;
            Iterator<Column> iter = targetIndexSchema.iterator();
            while (iter.hasNext()) {
                Column column = iter.next();
                if (column.getName().equalsIgnoreCase(dropColName)) {
                    iter.remove();
                    found = true;
                    if (column.isKey()) {
                        lightSchemaChange = false;
                    }
                    break;
                }
            }
            if (!found) {
                throw new DdlException("Column does not exists: " + dropColName);
            }
        }
        return lightSchemaChange;
    }

    // User can modify column type and column position
    private void processModifyColumn(ModifyColumnClause alterClause, Table externalTable, List<Column> newSchema)
            throws DdlException {
        Column modColumn = alterClause.getColumn();
        ColumnPosition columnPos = alterClause.getColPos();

        // find modified column
        String newColName = modColumn.getName();
        boolean hasColPos = (columnPos != null && !columnPos.isFirst());
        boolean found = false;

        int modColIndex = -1;
        int lastColIndex = -1;
        for (int i = 0; i < newSchema.size(); i++) {
            Column col = newSchema.get(i);
            if (col.getName().equalsIgnoreCase(newColName)) {
                modColIndex = i;
                found = true;
            }
            if (hasColPos) {
                if (col.getNonShadowName().equalsIgnoreCase(columnPos.getLastCol())) {
                    lastColIndex = i;
                }
            } else {
                // save the last Key position
                if (col.isKey()) {
                    lastColIndex = i;
                }
            }
        }
        // mod col not find
        if (!found) {
            throw new DdlException("Column[" + newColName + "] does not exists");
        }

        // last col not find
        if (hasColPos && lastColIndex == -1) {
            throw new DdlException("Column[" + columnPos.getLastCol() + "] does not exists");
        }

        // check if add to first
        if (columnPos != null && columnPos.isFirst()) {
            lastColIndex = -1;
            hasColPos = true;
        }

        Column oriColumn = newSchema.get(modColIndex);
        // retain old column name
        modColumn.setName(oriColumn.getName());

        // handle the move operation in 'indexForFindingColumn' if has
        if (hasColPos) {
            // move col
            if (lastColIndex > modColIndex) {
                newSchema.add(lastColIndex + 1, modColumn);
                newSchema.remove(modColIndex);
            } else if (lastColIndex < modColIndex) {
                newSchema.remove(modColIndex);
                newSchema.add(lastColIndex + 1, modColumn);
            } else {
                throw new DdlException("Column[" + columnPos.getLastCol() + "] modify position is invalid");
            }
        } else {
            newSchema.set(modColIndex, modColumn);
        }
    }

    // User can modify column type and column position
    private boolean processModifyColumn(ModifyColumnClause alterClause, OlapTable olapTable,
                                        Map<Long, LinkedList<Column>> indexSchemaMap) throws DdlException {
        Column modColumn = alterClause.getColumn();
        boolean lightSchemaChange = false;

        if (KeysType.AGG_KEYS == olapTable.getKeysType()) {
            if (modColumn.isKey() && null != modColumn.getAggregationType()) {
                throw new DdlException("Can not assign aggregation method on key column: " + modColumn.getName());
            } else if (!modColumn.isKey() && null == modColumn.getAggregationType()) {
                throw new DdlException("Aggregate method must be specified for value column: " + modColumn.getName());
            }
        } else if (KeysType.UNIQUE_KEYS == olapTable.getKeysType()) {
            if (null != modColumn.getAggregationType() && modColumn.isKey()) {
                throw new DdlException("Can not assign aggregation method" + " on column in Unique data model table: "
                        + modColumn.getName());
            }
            if (!modColumn.isKey()) {
                if (olapTable.getEnableUniqueKeyMergeOnWrite()) {
                    modColumn.setAggregationType(AggregateType.NONE, true);
                } else {
                    modColumn.setAggregationType(AggregateType.REPLACE, true);
                }
            }
        } else {
            if (null != modColumn.getAggregationType() && modColumn.isKey()) {
                throw new DdlException(
                        "Can not assign aggregation method" + " on column in Duplicate data model table: "
                                + modColumn.getName());
            }
            if (!modColumn.isKey()) {
                modColumn.setAggregationType(AggregateType.NONE, true);
            }
        }
        ColumnPosition columnPos = alterClause.getColPos();
        String targetIndexName = alterClause.getRollupName();
        checkIndexExists(olapTable, targetIndexName);

        String baseIndexName = olapTable.getName();
        checkAssignedTargetIndexName(baseIndexName, targetIndexName);

        if (targetIndexName != null && columnPos == null) {
            throw new DdlException("Do not need to specify index name when just modifying column type");
        }

        String indexNameForFindingColumn = targetIndexName;
        if (indexNameForFindingColumn == null) {
            indexNameForFindingColumn = baseIndexName;
        }

        long indexIdForFindingColumn = olapTable.getIndexIdByName(indexNameForFindingColumn);

        // find modified column
        List<Column> schemaForFinding = indexSchemaMap.get(indexIdForFindingColumn);
        String newColName = modColumn.getName();
        boolean hasColPos = (columnPos != null && !columnPos.isFirst());
        boolean found = false;
        boolean typeChanged = false;
        int modColIndex = -1;
        int lastColIndex = -1;
        for (int i = 0; i < schemaForFinding.size(); i++) {
            Column col = schemaForFinding.get(i);
            if (col.getName().equalsIgnoreCase(newColName)) {
                modColIndex = i;
                found = true;
                if (!col.equals(modColumn)) {
                    typeChanged = true;
                    // TODO:the case where columnPos is not empty has not been considered
                    if (columnPos == null && col.getDataType() == PrimitiveType.VARCHAR
                            && modColumn.getDataType() == PrimitiveType.VARCHAR) {
                        ColumnType.checkForTypeLengthChange(col.getType(), modColumn.getType());
                        lightSchemaChange = olapTable.getEnableLightSchemaChange();
                    }
                    if (columnPos == null && col.getDataType().isComplexType()
                            && modColumn.getDataType().isComplexType()) {
                        ColumnType.checkSupportSchemaChangeForComplexType(col.getType(), modColumn.getType(), true);
                        lightSchemaChange = olapTable.getEnableLightSchemaChange();
                    }
                    if (col.isClusterKey()) {
                        throw new DdlException("Can not modify cluster key column: " + col.getName());
                    }
                }
            }
            if (hasColPos) {
                if (col.getNonShadowName().equalsIgnoreCase(columnPos.getLastCol())) {
                    lastColIndex = i;
                }
            } else {
                // save the last Key position
                if (col.isKey()) {
                    lastColIndex = i;
                }
            }
        }
        // mod col not find
        if (!found) {
            throw new DdlException("Column[" + newColName + "] does not exists");
        }

        // last col not find
        if (hasColPos && lastColIndex == -1) {
            throw new DdlException("Column[" + columnPos.getLastCol() + "] does not exists");
        }

        // sequence col can not change type
        if (KeysType.UNIQUE_KEYS == olapTable.getKeysType() && typeChanged && modColumn.getName()
                .equalsIgnoreCase(olapTable.getSequenceMapCol())) {
            throw new DdlException("Can not alter sequence column[" + modColumn.getName() + "]");
        }

        // check if add to first
        if (columnPos != null && columnPos.isFirst()) {
            lastColIndex = -1;
            hasColPos = true;
        }

        Column oriColumn = schemaForFinding.get(modColIndex);
        // retain old column name
        modColumn.setName(oriColumn.getName());
        modColumn.setUniqueId(oriColumn.getUniqueId());

        if (!modColumn.equals(oriColumn) && oriColumn.isAutoInc() != modColumn.isAutoInc()) {
            throw new DdlException("Can't modify the column["
                    + oriColumn.getName() + "]'s auto-increment attribute.");
        }

        // handle the move operation in 'indexForFindingColumn' if has
        if (hasColPos) {
            // move col
            if (lastColIndex > modColIndex) {
                schemaForFinding.add(lastColIndex + 1, modColumn);
                schemaForFinding.remove(modColIndex);
            } else if (lastColIndex < modColIndex) {
                schemaForFinding.remove(modColIndex);
                schemaForFinding.add(lastColIndex + 1, modColumn);
            } else {
                throw new DdlException("Column[" + columnPos.getLastCol() + "] modify position is invalid");
            }
        } else {
            schemaForFinding.set(modColIndex, modColumn);
        }

        // check if column being mod
        if (!modColumn.equals(oriColumn)) {
            // column is mod. we have to mod this column in all indices

            // handle other indices
            // 1 find other indices which contain this column
            List<Long> otherIndexIds = new ArrayList<Long>();
            for (Map.Entry<Long, List<Column>> entry : olapTable.getIndexIdToSchema().entrySet()) {
                if (entry.getKey() == indexIdForFindingColumn) {
                    // skip the index we used to find column. it has been handled before
                    continue;
                }
                List<Column> schema = entry.getValue();
                for (Column column : schema) {
                    String columnName = column.getName();
                    if (column.isMaterializedViewColumn()) {
                        throw new DdlException("Can not modify column contained by mv, mv="
                                + olapTable.getIndexNameById(entry.getKey()));
                    }
                    if (columnName.equalsIgnoreCase(modColumn.getName())) {
                        otherIndexIds.add(entry.getKey());
                        break;
                    }
                }
            }
            for (Long otherIndexId : otherIndexIds) {
                List<Column> otherIndexSchema = indexSchemaMap.get(otherIndexId);
                for (int i = 0; i < otherIndexSchema.size(); i++) {
                    modColIndex = -1;
                    Column otherCol = null;
                    Column col = otherIndexSchema.get(i);
                    String columnName = col.getName();
                    if (col.isMaterializedViewColumn()) {
                        throw new DdlException("Can not modify column contained by mv, mv="
                                + olapTable.getIndexNameById(otherIndexId));
                    }
                    if (!columnName.equalsIgnoreCase(modColumn.getName())) {
                        continue;
                    }
                    modColIndex = i;
                    otherCol = new Column(modColumn);
                    otherCol.setName(col.getName());
                    otherCol.setDefineExpr(col.getDefineExpr());
                    Preconditions.checkState(modColIndex != -1);
                    Preconditions.checkState(otherCol != null);
                    // replace the old column
                    if (KeysType.AGG_KEYS != olapTable.getKeysType()
                            && KeysType.UNIQUE_KEYS != olapTable.getKeysType()) {
                        Column oldCol = otherIndexSchema.get(modColIndex);
                        otherCol.setIsKey(oldCol.isKey());
                        otherCol.setAggregationType(oldCol.getAggregationType(), oldCol.isAggregationTypeImplicit());
                    }
                    if (typeChanged && !lightSchemaChange) {
                        otherCol.setName(SHADOW_NAME_PREFIX + otherCol.getName());
                    }
                    otherIndexSchema.set(modColIndex, otherCol);
                }
            } //  end for other indices
        } // end for handling other indices

        if (typeChanged && !lightSchemaChange) {
            /*
             * In new alter table process (AlterJobV2), any modified columns are treated as new columns.
             * But the modified columns' name does not changed. So in order to distinguish this, we will add
             * a prefix in the name of these modified columns.
             * This prefix only exist during the schema change process. Once the schema change is finished,
             * it will be removed.
             *
             * After adding this prefix, modify a column is just same as 'add' a column.
             *
             * And if the column type is not changed, the same column name is still to the same column type,
             * so no need to add prefix.
             */
            modColumn.setName(SHADOW_NAME_PREFIX + modColumn.getName());
        }
        LOG.info("modify column {} ", modColumn);
        return lightSchemaChange;
    }

    private void processReorderColumn(ReorderColumnsClause alterClause, Table externalTable, List<Column> newSchema)
            throws DdlException {
        List<String> orderedColNames = alterClause.getColumnsByPos();

        newSchema.clear();
        List<Column> targetIndexSchema = externalTable.getBaseSchema();

        // check and create new ordered column list
        Set<String> colNameSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
        for (String colName : orderedColNames) {
            Column oneCol = null;
            for (Column column : targetIndexSchema) {
                if (column.getName().equalsIgnoreCase(colName) && column.isVisible()) {
                    oneCol = column;
                    break;
                }
            }
            if (oneCol == null) {
                throw new DdlException("Column[" + colName + "] not exists");
            }
            newSchema.add(oneCol);
            if (colNameSet.contains(colName)) {
                throw new DdlException("Reduplicative column[" + colName + "]");
            } else {
                colNameSet.add(colName);
            }
        }

        if (newSchema.size() != targetIndexSchema.size()) {
            throw new DdlException("Reorder stmt should contains all columns");
        }
    }

    private void processReorderColumn(ReorderColumnsClause alterClause, OlapTable olapTable,
                                      Map<Long, LinkedList<Column>> indexSchemaMap) throws DdlException {
        List<String> orderedColNames = alterClause.getColumnsByPos();
        String targetIndexName = alterClause.getRollupName();
        checkIndexExists(olapTable, targetIndexName);

        String baseIndexName = olapTable.getName();
        checkAssignedTargetIndexName(baseIndexName, targetIndexName);

        if (targetIndexName == null) {
            targetIndexName = baseIndexName;
        }
        long targetIndexId = olapTable.getIndexIdByName(targetIndexName);

        LinkedList<Column> newSchema = new LinkedList<Column>();
        List<Column> targetIndexSchema = indexSchemaMap.get(targetIndexId);
        // When rollup is specified, there is no need to check the order of generated columns.
        // When rollup is not specified and the order of baseIndex needs to be modified, the order needs to be checked.
        if (alterClause.getRollupName() == null) {
            checkOrder(targetIndexSchema, orderedColNames);
        }
        // check and create new ordered column list
        Set<String> colNameSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
        for (String colName : orderedColNames) {
            Column oneCol = null;
            for (Column column : targetIndexSchema) {
                if (column.getName().equalsIgnoreCase(colName) && column.isVisible()) {
                    oneCol = column;
                    break;
                }
            }
            if (oneCol == null) {
                throw new DdlException("Column[" + colName + "] not exists");
            }
            newSchema.add(oneCol);
            if (colNameSet.contains(colName)) {
                throw new DdlException("Reduplicative column[" + colName + "]");
            } else {
                colNameSet.add(colName);
            }
        }
        if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS) {
            for (Column column : targetIndexSchema) {
                if (!column.isVisible()) {
                    newSchema.add(column);
                }
            }
        }
        if (newSchema.size() != targetIndexSchema.size()) {
            throw new DdlException("Reorder stmt should contains all columns");
        }
        // replace the old column list
        indexSchemaMap.put(targetIndexId, newSchema);
    }

    /*
     * Add 'newColumn' to specified index.
     * Modified schema will be saved in 'indexSchemaMap'
     */
    private void addColumnInternal(Column newColumn, ColumnPosition columnPos, List<Column> modIndexSchema,
                                   Set<String> newColNameSet) throws DdlException {
        String newColName = newColumn.getName();
        int posIndex = -1;
        boolean hasPos = (columnPos != null && !columnPos.isFirst());

        if (newColumn.isAutoInc()) {
            throw new DdlException("Can not add auto-increment column " + newColumn.getName());
        }

        for (int i = 0; i < modIndexSchema.size(); i++) {
            Column col = modIndexSchema.get(i);
            if (col.getName().equalsIgnoreCase(newColName)) {
                if (!newColNameSet.contains(newColName)) {
                    // if this is not a base index, we should check if user repeatedly add columns
                    throw new DdlException("Repeatedly add column: " + newColName);
                }
                // this is a base index, and the column we check here is added by previous 'add column clause'
                // in same ALTER stmt.
                // so here we will check if the 2 columns is exactly same. if not, throw exception
                if (!col.equals(newColumn)) {
                    throw new DdlException("Repeatedly add same column with different definition: " + newColName);
                }

                // column already exist, return
                return;
            }

            if (hasPos) {
                // after the field
                if (col.getName().equalsIgnoreCase(columnPos.getLastCol())) {
                    posIndex = i;
                }
            }
        }

        // check if lastCol was found
        if (hasPos && posIndex == -1) {
            throw new DdlException("Column[" + columnPos.getLastCol() + "] does not found");
        }

        // check if add to first
        if (columnPos != null && columnPos.isFirst()) {
            posIndex = -1;
            hasPos = true;
        }

        if (hasPos) {
            // key
            modIndexSchema.add(posIndex + 1, newColumn);
        } else {
            modIndexSchema.add(newColumn);
        }
    }

    /**
     * @param olapTable
     * @param newColumn              Add 'newColumn' to specified index.
     * @param columnPos
     * @param targetIndexId
     * @param baseIndexId
     * @param indexSchemaMap         Modified schema will be saved in 'indexSchemaMap'
     * @param newColNameSet
     * @param ignoreSameColumn
     * @param colUniqueIdSupplierMap
     * @return true: can light schema change, false: cannot
     * @throws DdlException
     */
    private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnPosition columnPos,
                                      long targetIndexId, long baseIndexId,
                                      Map<Long, LinkedList<Column>> indexSchemaMap,
                                      Set<String> newColNameSet, boolean ignoreSameColumn,
                                      Map<Long, IntSupplier> colUniqueIdSupplierMap)
            throws DdlException {

        //only new table generate ColUniqueId, exist table do not.
        boolean lightSchemaChange = olapTable.getEnableLightSchemaChange();
        String newColName = newColumn.getName();

        if (newColumn.isAutoInc()) {
            throw new DdlException("Can not add auto-increment column " + newColumn.getName());
        }

        // check the validation of aggregation method on column.
        // also fill the default aggregation method if not specified.
        if (KeysType.AGG_KEYS == olapTable.getKeysType()) {
            if (newColumn.isKey() && newColumn.getAggregationType() != null) {
                throw new DdlException("Can not assign aggregation method on key column: " + newColName);
            } else if (null == newColumn.getAggregationType()) {
                newColumn.setIsKey(true);
            } else if (newColumn.getAggregationType() == AggregateType.SUM && newColumn.getDefaultValue() != null
                    && !newColumn.getDefaultValue().equals("0")) {
                throw new DdlException(
                        "The default value of '" + newColName + "' with SUM aggregation function must be zero");
            } else if (olapTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo) {
                if (newColumn.getAggregationType() == AggregateType.REPLACE
                        || newColumn.getAggregationType() == AggregateType.REPLACE_IF_NOT_NULL) {
                    throw new DdlException(
                            "Can not add value column with aggregation type " + newColumn.getAggregationType()
                                    + " for olap table with random distribution : " + newColName);
                }
            }
        } else if (KeysType.UNIQUE_KEYS == olapTable.getKeysType()) {
            if (newColumn.getAggregationType() != null && newColumn.isKey()) {
                throw new DdlException(
                        "Can not assign aggregation method" + " on column in Unique data model table: " + newColName);
            }
            if (!newColumn.isKey()) {
                if (olapTable.getEnableUniqueKeyMergeOnWrite()) {
                    newColumn.setAggregationType(AggregateType.NONE, false);
                } else {
                    newColumn.setAggregationType(AggregateType.REPLACE, true);
                }
            }
        } else {
            if (newColumn.getAggregationType() != null && newColumn.isKey()) {
                throw new DdlException(
                        "Can not assign aggregation method" + " on column in Duplicate data model table: "
                                + newColName);
            }
            if (!newColumn.isKey()) {
                if (targetIndexId != -1L
                        && olapTable.getIndexMetaByIndexId(targetIndexId).getKeysType() == KeysType.AGG_KEYS) {
                    throw new DdlException("Please add non-key column on base table directly");
                }
                newColumn.setAggregationType(AggregateType.NONE, true);
            } else if (olapTable.isDuplicateWithoutKey()) {
                throw new DdlException("Duplicate table without keys do not support add key column!");
            }
        }

        if (newColumn.getType().isTime() || newColumn.getType().isTimeV2()) {
            throw new DdlException("Time type is not supported for olap table");
        }

        // hll must be used in agg_keys
        if (newColumn.getType().isHllType() && KeysType.AGG_KEYS != olapTable.getKeysType()) {
            throw new DdlException("HLL type column can only be in Aggregation data model table: " + newColName);
        }

        if (newColumn.getAggregationType() == AggregateType.BITMAP_UNION
                && KeysType.AGG_KEYS != olapTable.getKeysType()) {
            throw new DdlException("BITMAP_UNION must be used in AGG_KEYS");
        }

        //type key column do not allow light schema change.
        if (newColumn.isKey()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("newColumn: {}, isKey()==true", newColumn);
            }
            lightSchemaChange = false;
        }

        // check if the new column already exist in base schema.
        // do not support adding new column which already exist in base schema.
        List<Column> baseSchema = olapTable.getBaseSchema(true);
        boolean found = false;
        Column foundColumn = null;
        for (Column column : baseSchema) {
            if (column.getName().equalsIgnoreCase(newColName)) {
                found = true;
                foundColumn = column;
                break;
            }
        }
        if (found) {
            if (newColName.equalsIgnoreCase(Column.DELETE_SIGN)) {
                throw new DdlException("Can not enable batch delete support, already supported batch delete.");
            } else if (newColName.equalsIgnoreCase(Column.SEQUENCE_COL)) {
                throw new DdlException("Can not enable sequence column support, already supported sequence column.");
            } else if (newColName.equalsIgnoreCase(Column.VERSION_COL)) {
                throw new DdlException("Can not enable version column support, already supported version column.");
            } else {
                if (ignoreSameColumn && newColumn.equals(foundColumn)) {
                    //for add columns rpc, allow add same type column.
                } else {
                    throw new DdlException("Can not add column which already exists in base table: " + newColName);
                }
            }
        }

        if (newColumn.getGeneratedColumnInfo() != null) {
            throw new DdlException("Not supporting alter table add generated columns.");
        }

        /*
         * add new column to indexes.
         * UNIQUE:
         *      1. If new column is key, it should be added to all indexes.
         *      2. Else, add the new column to base index and specified rollup index.
         * DUPLICATE:
         *      1. If not specify rollup index, just add it to base index.
         *      2. Else, first add it to specify rollup index. Then if the new column is key, add it to base
         *          index, at the end of all other existing key columns. If new new column is value, add it to
         *          base index by user specified position.
         * AGGREGATION:
         *      1. Add it to base index, as well as specified rollup index.
         */
        if (KeysType.UNIQUE_KEYS == olapTable.getKeysType()) {
            List<Column> modIndexSchema;
            if (newColumn.isKey()) {
                // add key column to unique key table
                // add to all indexes including base and rollup
                for (Map.Entry<Long, LinkedList<Column>> entry : indexSchemaMap.entrySet()) {
                    modIndexSchema = entry.getValue();
                    boolean isBaseIdex = entry.getKey() == baseIndexId;
                    IntSupplier colUniqueIdSupplier = colUniqueIdSupplierMap.get(entry.getKey());
                    int newColumnUniqueId = olapTable.getEnableLightSchemaChange() ? colUniqueIdSupplier.getAsInt()
                            : Column.COLUMN_UNIQUE_ID_INIT_VALUE;
                    checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, isBaseIdex,
                            newColumnUniqueId);
                }
            } else {
                // 1. add to base table
                modIndexSchema = indexSchemaMap.get(baseIndexId);
                IntSupplier baseIndexColUniqueIdSupplier = colUniqueIdSupplierMap.get(baseIndexId);
                int baseIndexNewColumnUniqueId = olapTable.getEnableLightSchemaChange()
                        ? baseIndexColUniqueIdSupplier.getAsInt() : Column.COLUMN_UNIQUE_ID_INIT_VALUE;
                checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, true,
                        baseIndexNewColumnUniqueId);
                if (targetIndexId == -1L) {
                    return lightSchemaChange;
                }
                // 2. add to rollup
                modIndexSchema = indexSchemaMap.get(targetIndexId);
                IntSupplier targetIndexColUniqueIdSupplier = colUniqueIdSupplierMap.get(targetIndexId);
                int rollUpNewColumnUniqueId = olapTable.getEnableLightSchemaChange()
                        ? targetIndexColUniqueIdSupplier.getAsInt() : Column.COLUMN_UNIQUE_ID_INIT_VALUE;
                checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, false, rollUpNewColumnUniqueId);
            }
        } else if (KeysType.DUP_KEYS == olapTable.getKeysType()) {
            //get baseIndexColUniqueIdSupplier
            IntSupplier baseIndexColUniqueIdSupplier = colUniqueIdSupplierMap.get(baseIndexId);
            int baseIndexNewColumnUniqueId = olapTable.getEnableLightSchemaChange()
                    ? baseIndexColUniqueIdSupplier.getAsInt() : Column.COLUMN_UNIQUE_ID_INIT_VALUE;

            if (targetIndexId == -1L) {
                // add to base index
                List<Column> modIndexSchema = indexSchemaMap.get(baseIndexId);
                checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, true,
                        baseIndexNewColumnUniqueId);
                // no specified target index. return
                return lightSchemaChange;
            } else {
                // add to rollup index
                List<Column> modIndexSchema = indexSchemaMap.get(targetIndexId);
                IntSupplier targetIndexColUniqueIdSupplier = colUniqueIdSupplierMap.get(targetIndexId);
                int rollUpNewColumnUniqueId = olapTable.getEnableLightSchemaChange()
                        ? targetIndexColUniqueIdSupplier.getAsInt() : Column.COLUMN_UNIQUE_ID_INIT_VALUE;
                checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, false, rollUpNewColumnUniqueId);

                if (newColumn.isKey()) {
                    /*
                     * if add column in rollup is key,
                     * then put the column in base table as the last key column
                     */

                    modIndexSchema = indexSchemaMap.get(baseIndexId);
                    checkAndAddColumn(modIndexSchema, newColumn, null, newColNameSet, true, baseIndexNewColumnUniqueId);
                } else {
                    modIndexSchema = indexSchemaMap.get(baseIndexId);
                    checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, true,
                            baseIndexNewColumnUniqueId);
                }
            }
        } else {
            // check if has default value. this should be done in Analyze phase
            // 1. add to base index first
            IntSupplier baseIndexColUniqueIdSupplier = colUniqueIdSupplierMap.get(baseIndexId);
            int baseIndexNewColumnUniqueId = olapTable.getEnableLightSchemaChange()
                    ? baseIndexColUniqueIdSupplier.getAsInt() : Column.COLUMN_UNIQUE_ID_INIT_VALUE;
            List<Column> modIndexSchema = indexSchemaMap.get(baseIndexId);
            checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, true, baseIndexNewColumnUniqueId);

            if (targetIndexId == -1L) {
                // no specified target index. return
                return lightSchemaChange;
            }

            // 2. add to rollup index
            IntSupplier targetIndexColUniqueIdSupplier = colUniqueIdSupplierMap.get(targetIndexId);
            int rollUpNewColumnUniqueId = olapTable.getEnableLightSchemaChange()
                    ? targetIndexColUniqueIdSupplier.getAsInt() : Column.COLUMN_UNIQUE_ID_INIT_VALUE;
            modIndexSchema = indexSchemaMap.get(targetIndexId);
            checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, false, rollUpNewColumnUniqueId);
        }
        return lightSchemaChange;
    }

    /*
     * add new column to specified index schema('modIndexSchema').
     * if 'isBaseIndex' is true, which means 'modIndexSchema' is base index's schema.
     * so we will not check repeat adding of column.
     * For example, user want to add column k1 to both rollup1 and rollup2 in one alter stmt:
     *      ADD COLUMN k1 int to rollup1,
     *      ADD COLUMN k1 int to rollup2
     * So that k1 will be added to base index 'twice', and we just ignore this repeat adding.
     */
    private void checkAndAddColumn(List<Column> modIndexSchema, Column newColumn, ColumnPosition columnPos,
                                   Set<String> newColNameSet, boolean isBaseIndex, int newColumnUniqueId)
            throws DdlException {
        int posIndex = -1;
        int lastVisibleIdx = -1;
        String newColName = newColumn.getName();
        boolean hasPos = (columnPos != null && !columnPos.isFirst());
        for (int i = 0; i < modIndexSchema.size(); i++) {
            Column col = modIndexSchema.get(i);
            if (col.getName().equalsIgnoreCase(newColName)) {
                if (!isBaseIndex || !newColNameSet.contains(newColName)) {
                    // if this is not a base index, we should check if user repeatedly add columns
                    throw new DdlException("Repeatedly add column: " + newColName);
                }
                // this is a base index, and the column we check here is added by previous 'add column clause'
                // in same ALTER stmt.
                // so here we will check if the 2 columns is exactly same. if not, throw exception
                if (!col.equals(newColumn)) {
                    throw new DdlException("Repeatedly add same column with different definition: " + newColName);
                }

                // column already exist, return
                return;
            }
            if (col.isVisible()) {
                lastVisibleIdx = i;
            }

            if (hasPos) {
                // after the field
                if (col.getName().equalsIgnoreCase(columnPos.getLastCol())) {
                    posIndex = i;
                }
            } else {
                // save the last Key position
                if (col.isKey()) {
                    posIndex = i;
                }
            }
        }

        // check if lastCol was found
        if (hasPos && posIndex == -1) {
            throw new DdlException("Column[" + columnPos.getLastCol() + "] does not found");
        }

        // check if add to first
        if (columnPos != null && columnPos.isFirst()) {
            posIndex = -1;
            hasPos = true;
        }

        // newColumn may add to baseIndex or rollups, so we need copy before change UniqueId
        Column toAddColumn = new Column(newColumn);
        toAddColumn.setUniqueId(newColumnUniqueId);
        if (hasPos) {
            modIndexSchema.add(posIndex + 1, toAddColumn);
        } else if (toAddColumn.isKey()) {
            // key
            modIndexSchema.add(posIndex + 1, toAddColumn);
        } else if (lastVisibleIdx != -1 && lastVisibleIdx < modIndexSchema.size() - 1) {
            // has hidden columns
            modIndexSchema.add(lastVisibleIdx + 1, toAddColumn);
        } else {
            // value
            modIndexSchema.add(toAddColumn);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("newColumn setUniqueId({}), modIndexSchema:{}", newColumnUniqueId, modIndexSchema);
        }
    }

    private void checkIndexExists(OlapTable olapTable, String targetIndexName) throws DdlException {
        if (targetIndexName != null && !olapTable.hasMaterializedIndex(targetIndexName)) {
            throw new DdlException(
                    "Index[" + targetIndexName + "] does not exist in table[" + olapTable.getName() + "]");
        }
    }

    private void checkAssignedTargetIndexName(String baseIndexName, String targetIndexName) throws DdlException {
        // user cannot assign base index to do schema change
        if (targetIndexName != null) {
            if (targetIndexName.equals(baseIndexName)) {
                throw new DdlException("Do not need to assign base index[" + baseIndexName + "] to do schema change");
            }
        }
    }

    private void createJob(String rawSql, long dbId, OlapTable olapTable, Map<Long, LinkedList<Column>> indexSchemaMap,
                           Map<String, String> propertyMap, List<Index> indexes,
                           boolean isBuildIndex) throws UserException {
        if (isBuildIndex) {
            // remove the index which is not the base index, only base index can be built index
            indexSchemaMap.entrySet().removeIf(entry -> !entry.getKey().equals(olapTable.getBaseIndexId()));
        }
        checkReplicaCount(olapTable);

        // process properties first
        // for now. properties has 3 options
        // property 1. to specify short key column count.
        // eg.
        //     "indexname1#short_key" = "3"
        //     "indexname2#short_key" = "4"
        Map<Long, Map<String, String>> indexIdToProperties = new HashMap<Long, Map<String, String>>();
        if (propertyMap.size() > 0) {
            for (String key : propertyMap.keySet()) {
                if (key.endsWith(PropertyAnalyzer.PROPERTIES_SHORT_KEY)) {
                    // short key
                    String[] keyArray = key.split("#");
                    if (keyArray.length != 2 || keyArray[0].isEmpty() || !keyArray[1].equals(
                            PropertyAnalyzer.PROPERTIES_SHORT_KEY)) {
                        throw new DdlException("Invalid alter table property: " + key);
                    }

                    HashMap<String, String> prop = new HashMap<String, String>();

                    if (!olapTable.hasMaterializedIndex(keyArray[0])) {
                        throw new DdlException("Index[" + keyArray[0] + "] does not exist");
                    }

                    prop.put(PropertyAnalyzer.PROPERTIES_SHORT_KEY, propertyMap.get(key));
                    indexIdToProperties.put(olapTable.getIndexIdByName(keyArray[0]), prop);
                }
            } // end for property keys
        }

        // for bitmapIndex
        boolean hasIndexChange = false;
        Set<Index> newSet = new HashSet<>(indexes);
        Set<Index> oriSet = new HashSet<>(olapTable.getIndexes());
        if (!newSet.equals(oriSet) || isBuildIndex) {
            hasIndexChange = true;
        }

        // property 2. bloom filter
        // eg. "bloom_filter_columns" = "k1,k2", "bloom_filter_fpp" = "0.05"
        Set<String> bfColumns = null;
        double bfFpp = 0;
        try {
            bfColumns = PropertyAnalyzer.analyzeBloomFilterColumns(propertyMap,
                    indexSchemaMap.get(olapTable.getBaseIndexId()), olapTable.getKeysType());
            bfFpp = PropertyAnalyzer.analyzeBloomFilterFpp(propertyMap);
        } catch (AnalysisException e) {
            throw new DdlException(e.getMessage());
        }

        // check bloom filter has been changed
        boolean hasBfChange = false;
        Set<String> oriBfColumns = olapTable.getCopiedBfColumns();
        double oriBfFpp = olapTable.getBfFpp();
        if (bfColumns != null) {
            if (bfFpp == 0) {
                // columns: yes, fpp: no
                if (bfColumns.equals(oriBfColumns)) {
                    throw new DdlException("Bloom filter index has no change");
                }

                if (oriBfColumns == null) {
                    bfFpp = FeConstants.default_bloom_filter_fpp;
                } else {
                    bfFpp = oriBfFpp;
                }
            } else {
                // columns: yes, fpp: yes
                if (bfColumns.equals(oriBfColumns) && bfFpp == oriBfFpp) {
                    throw new DdlException("Bloom filter index has no change");
                }
            }

            hasBfChange = true;
        } else {
            if (bfFpp == 0) {
                // columns: no, fpp: no
                bfFpp = oriBfFpp;
            } else {
                // columns: no, fpp: yes
                if (bfFpp == oriBfFpp) {
                    throw new DdlException("Bloom filter index has no change");
                }
                if (oriBfColumns == null) {
                    throw new DdlException("Bloom filter index has no change");
                }

                hasBfChange = true;
            }

            bfColumns = oriBfColumns;
        }

        if (bfColumns != null && bfColumns.isEmpty()) {
            bfColumns = null;
        }
        if (bfColumns == null) {
            bfFpp = 0;
        }

        Index.checkConflict(newSet, bfColumns);

        // property 3: timeout
        long timeoutSecond = PropertyAnalyzer.analyzeTimeout(propertyMap, Config.alter_table_timeout_second);

        TStorageFormat storageFormat = PropertyAnalyzer.analyzeStorageFormat(propertyMap);

        // property store_row_column && row_store_columns
        // eg. "store_row_column" = "true"
        // eg. "row_store_columns" = "k1, k2"
        List<String> rsColumns = Lists.newArrayList();
        boolean storeRowColumn = false;
        try {
            storeRowColumn = PropertyAnalyzer.analyzeStoreRowColumn(propertyMap);
            rsColumns = PropertyAnalyzer.analyzeRowStoreColumns(propertyMap,
                        olapTable.getColumns().stream().map(Column::getName).collect(Collectors.toList()));
        } catch (AnalysisException e) {
            throw new DdlException(e.getMessage());
        }

        boolean hasRowStoreChanged = false;
        if (storeRowColumn || rsColumns != null) {
            List<String> oriRowStoreColumns = olapTable.getTableProperty().getCopiedRowStoreColumns();
            // correct the comparison logic for null and empty list
            boolean columnsChanged = false;
            if (rsColumns == null) {
                columnsChanged = oriRowStoreColumns != null;
            } else if (oriRowStoreColumns == null) {
                columnsChanged = true;
            } else {
                columnsChanged = !oriRowStoreColumns.equals(rsColumns);
            }
            // partial row store columns changed, or store_row_column enabled, not supported to disable at present
            if (columnsChanged || (!olapTable.storeRowColumn() && storeRowColumn)) {
                // only support mow and duplicate model
                if (!(olapTable.getKeysType() == KeysType.DUP_KEYS
                        || olapTable.getEnableUniqueKeyMergeOnWrite())) {
                    throw new DdlException("`store_row_column` only support duplicate model or mow model");
                }
                hasRowStoreChanged = true;
            }
        }

        // begin checking each table
        // ATTN: DO NOT change any meta in this loop
        long tableId = olapTable.getId();
        Map<Long, Short> indexIdToShortKeyColumnCount = Maps.newHashMap();
        Map<Long, List<Column>> changedIndexIdToSchema = Maps.newHashMap();
        for (Long alterIndexId : indexSchemaMap.keySet()) {
            // Must get all columns including invisible columns.
            // Because in alter process, all columns must be considered.
            List<Column> originSchema = olapTable.getSchemaByIndexId(alterIndexId, true);
            List<Column> alterSchema = indexSchemaMap.get(alterIndexId);
            Set<Column> needAlterColumns = Sets.newHashSet();

            // 0. check if unchanged
            boolean hasColumnChange = false;
            if (alterSchema.size() != originSchema.size()) {
                hasColumnChange = true;
            } else {
                for (int i = 0; i < alterSchema.size(); i++) {
                    Column alterColumn = alterSchema.get(i);
                    if (!alterColumn.equals(originSchema.get(i))) {
                        needAlterColumns.add(alterColumn);
                        hasColumnChange = true;
                    } else {
                        Column oriColumn = originSchema.get(i);
                        if ((oriColumn.getGeneratedColumnInfo() != null
                                || alterColumn.getGeneratedColumnInfo() != null)
                                        && !oriColumn.getGeneratedColumnInfo().getExprSql()
                                                .equals(alterColumn.getGeneratedColumnInfo().getExprSql())) {
                            throw new DdlException("Not supporting alter table modify generated columns.");
                        }
                    }
                }
            }

            // if has column changed, alter it.
            // else:
            //     if no bf change, no alter
            //     if has bf change, should check
            boolean needAlter = false;
            if (hasColumnChange) {
                needAlter = true;
            } else if (hasBfChange) {
                for (Column alterColumn : alterSchema) {
                    String columnName = alterColumn.getName();

                    boolean isOldBfColumn = false;
                    if (oriBfColumns != null && oriBfColumns.contains(columnName)) {
                        isOldBfColumn = true;
                    }

                    boolean isNewBfColumn = false;
                    if (bfColumns != null && bfColumns.contains(columnName)) {
                        isNewBfColumn = true;
                    }

                    if (isOldBfColumn != isNewBfColumn) {
                        // bf column change
                        needAlter = true;
                    } else if (isOldBfColumn && isNewBfColumn && oriBfFpp != bfFpp) {
                        // bf fpp change
                        needAlter = true;
                    }

                    if (needAlter) {
                        break;
                    }
                }
            } else if (hasIndexChange) {
                needAlter = true;
            } else if (hasRowStoreChanged) {
                needAlter = true;
            } else if (storageFormat == TStorageFormat.V2) {
                if (olapTable.getStorageFormat() != TStorageFormat.V2) {
                    needAlter = true;
                }
            }

            if (!needAlter) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("index[{}] is not changed. ignore", alterIndexId);
                }
                continue;
            }

            if (LOG.isDebugEnabled()) {
                LOG.debug("index[{}] is changed. start checking...", alterIndexId);
            }
            // 1. check order: a) has key; b) value after key
            boolean meetValue = false;
            boolean hasKey = false;
            for (Column column : alterSchema) {
                if (column.isKey() && meetValue) {
                    throw new DdlException(
                            "Invalid column order. value should be after key. index[" + olapTable.getIndexNameById(
                                    alterIndexId) + "]");
                }
                if (!column.isKey()) {
                    meetValue = true;
                } else {
                    hasKey = true;
                }
            }
            if (!hasKey && !olapTable.isDuplicateWithoutKey()) {
                throw new DdlException("No key column left. index[" + olapTable.getIndexNameById(alterIndexId) + "]");
            }

            // 2. check compatible
            for (Column alterColumn : alterSchema) {
                for (Column oriColumn : originSchema) {
                    if (alterColumn.nameEquals(oriColumn.getName(), true /* ignore prefix */)) {
                        if (!alterColumn.equals(oriColumn)) {
                            // 3.1 check type
                            oriColumn.checkSchemaChangeAllowed(alterColumn);
                        }
                    }
                } // end for ori
            } // end for alter

            // 3. check partition key
            PartitionInfo partitionInfo = olapTable.getPartitionInfo();
            if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
                List<Column> partitionColumns = partitionInfo.getPartitionColumns();
                for (Column partitionCol : partitionColumns) {
                    boolean found = false;
                    for (Column alterColumn : alterSchema) {
                        if (alterColumn.nameEquals(partitionCol.getName(), true)) {
                            // 2.1 partition column cannot be modified
                            if (needAlterColumns.contains(alterColumn) && !alterColumn.equals(partitionCol)) {
                                throw new DdlException(
                                        "Can not modify partition column[" + partitionCol.getName() + "]. index["
                                                + olapTable.getIndexNameById(alterIndexId) + "]");
                            }
                            found = true;
                            break;
                        }
                    } // end for alterColumns

                    if (!found && alterIndexId == olapTable.getBaseIndexId()) {
                        // 2.1 partition column cannot be deleted.
                        throw new DdlException(
                                "Partition column[" + partitionCol.getName() + "] cannot be dropped. index["
                                        + olapTable.getIndexNameById(alterIndexId) + "]");
                        // ATTN. partition columns' order also need remaining unchanged.
                        // for now, we only allow one partition column, so no need to check order.
                    }
                } // end for partitionColumns
            }

            // 4. check distribution key:
            DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
            if (distributionInfo.getType() == DistributionInfoType.HASH) {
                List<Column> distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns();
                for (Column distributionCol : distributionColumns) {
                    boolean found = false;
                    for (Column alterColumn : alterSchema) {
                        if (alterColumn.nameEquals(distributionCol.getName(), true)) {
                            // 3.1 distribution column cannot be modified
                            if (needAlterColumns.contains(alterColumn) && !alterColumn.equals(distributionCol)) {
                                throw new DdlException(
                                        "Can not modify distribution column[" + distributionCol.getName() + "]. index["
                                                + olapTable.getIndexNameById(alterIndexId) + "]");
                            }
                            found = true;
                            break;
                        }
                    } // end for alterColumns

                    if (!found && alterIndexId == olapTable.getBaseIndexId()) {
                        // 2.2 distribution column cannot be deleted.
                        throw new DdlException(
                                "Distribution column[" + distributionCol.getName() + "] cannot be dropped. index["
                                        + olapTable.getIndexNameById(alterIndexId) + "]");
                    }
                } // end for distributionCols
            }

            // 5. calc short key
            short newShortKeyColumnCount = Env.calcShortKeyColumnCount(alterSchema,
                    indexIdToProperties.get(alterIndexId), !olapTable.isDuplicateWithoutKey());
            if (LOG.isDebugEnabled()) {
                LOG.debug("alter index[{}] short key column count: {}", alterIndexId, newShortKeyColumnCount);
            }
            indexIdToShortKeyColumnCount.put(alterIndexId, newShortKeyColumnCount);

            // 6. store the changed columns for edit log
            changedIndexIdToSchema.put(alterIndexId, alterSchema);

            if (LOG.isDebugEnabled()) {
                LOG.debug("schema change[{}-{}-{}] check pass.", dbId, tableId, alterIndexId);
            }
        } // end for indices

        if (changedIndexIdToSchema.isEmpty() && !hasIndexChange && !hasRowStoreChanged) {
            throw new DdlException("Nothing is changed. please check your alter stmt.");
        }

        // create job
        long bufferSize = IdGeneratorUtil.getBufferSizeForAlterTable(olapTable, changedIndexIdToSchema.keySet());
        IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize);
        long jobId = idGeneratorBuffer.getNextId();
        SchemaChangeJobV2 schemaChangeJob =
                AlterJobV2Factory.createSchemaChangeJobV2(rawSql, jobId, dbId, olapTable.getId(), olapTable.getName(),
                        timeoutSecond * 1000);
        schemaChangeJob.setBloomFilterInfo(hasBfChange, bfColumns, bfFpp);
        schemaChangeJob.setAlterIndexInfo(hasIndexChange, indexes);
        schemaChangeJob.setStoreRowColumnInfo(hasRowStoreChanged, storeRowColumn, rsColumns);

        // If StorageFormat is set to TStorageFormat.V2
        // which will create tablet with preferred_rowset_type set to BETA
        // for both base table and rollup index
        if (hasIndexChange) {
            // only V2 support index, so if there is index changed, storage format must be V2
            storageFormat = TStorageFormat.V2;
        }
        schemaChangeJob.setStorageFormat(storageFormat);

        // the following operations are done outside the 'for indices' loop
        // to avoid partial check success

        /*
         * Create schema change job
         * 1. For each index which has been changed, create a SHADOW index,
         *    and save the mapping of origin index to SHADOW index.
         * 2. Create all tablets and replicas of all SHADOW index, add them to tablet inverted index.
         * 3. Change table's state as SCHEMA_CHANGE
         */
        for (Map.Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) {
            long originIndexId = entry.getKey();
            MaterializedIndexMeta currentIndexMeta = olapTable.getIndexMetaByIndexId(originIndexId);
            // 1. get new schema version/schema version hash, short key column count
            int currentSchemaVersion = currentIndexMeta.getSchemaVersion();
            int newSchemaVersion = currentSchemaVersion + 1;
            // generate schema hash for new index has to generate a new schema hash not equal to current schema hash
            int currentSchemaHash = currentIndexMeta.getSchemaHash();
            int newSchemaHash = Util.generateSchemaHash();
            while (currentSchemaHash == newSchemaHash) {
                newSchemaHash = Util.generateSchemaHash();
            }
            String newIndexName = SHADOW_NAME_PREFIX + olapTable.getIndexNameById(originIndexId);
            short newShortKeyColumnCount = indexIdToShortKeyColumnCount.get(originIndexId);
            long shadowIndexId = idGeneratorBuffer.getNextId();

            // create SHADOW index for each partition
            List<Tablet> addedTablets = Lists.newArrayList();
            for (Partition partition : olapTable.getPartitions()) {
                long partitionId = partition.getId();
                TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId).getStorageMedium();
                // index state is SHADOW
                MaterializedIndex shadowIndex = new MaterializedIndex(shadowIndexId, IndexState.SHADOW);
                MaterializedIndex originIndex = partition.getIndex(originIndexId);
                ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo().getReplicaAllocation(partitionId);
                Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
                for (Tablet originTablet : originIndex.getTablets()) {
                    TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, partitionId, shadowIndexId,
                            newSchemaHash, medium);
                    long originTabletId = originTablet.getId();
                    long shadowTabletId = idGeneratorBuffer.getNextId();

                    Tablet shadowTablet = EnvFactory.getInstance().createTablet(shadowTabletId);
                    shadowIndex.addTablet(shadowTablet, shadowTabletMeta);
                    addedTablets.add(shadowTablet);

                    schemaChangeJob.addTabletIdMap(partitionId, shadowIndexId, shadowTabletId, originTabletId);
                    List<Replica> originReplicas = originTablet.getReplicas();

                    int healthyReplicaNum = 0;
                    for (Replica originReplica : originReplicas) {
                        long shadowReplicaId = idGeneratorBuffer.getNextId();
                        long backendId = originReplica.getBackendId();

                        if (originReplica.getState() == Replica.ReplicaState.CLONE
                                || originReplica.getState() == Replica.ReplicaState.DECOMMISSION
                                || originReplica.getState() == ReplicaState.COMPACTION_TOO_SLOW
                                || originReplica.getLastFailedVersion() > 0) {
                            LOG.info("origin replica {} of tablet {} state is {},"
                                            + " and last failed version is {}, skip creating shadow replica",
                                    originReplica.getId(), originReplica, originReplica.getState(),
                                    originReplica.getLastFailedVersion());
                            continue;
                        }
                        Preconditions.checkState(originReplica.getState() == ReplicaState.NORMAL,
                                originReplica.getState());
                        ReplicaContext context = new ReplicaContext();
                        context.replicaId = shadowReplicaId;
                        context.backendId = backendId;
                        context.state = ReplicaState.ALTER;
                        context.version = Partition.PARTITION_INIT_VERSION;
                        context.schemaHash = newSchemaHash;
                        context.dbId = dbId;
                        context.tableId = tableId;
                        context.partitionId = partitionId;
                        context.indexId = shadowIndexId;
                        context.originReplica = originReplica;
                        // replica's init state is ALTER, so that tablet report process will ignore its report
                        Replica shadowReplica = EnvFactory.getInstance().createReplica(context);
                        shadowTablet.addReplica(shadowReplica);
                        healthyReplicaNum++;
                    }

                    if (healthyReplicaNum < totalReplicaNum / 2 + 1) {
                        /*
                         * TODO(cmy): This is a bad design.
                         * Because in the schema change job, we will only send tasks to the shadow replicas
                         * that have been created, without checking whether the quorum of replica number are satisfied.
                         * This will cause the job to fail until we find that the quorum of replica number
                         * is not satisfied until the entire job is done.
                         * So here we check the replica number strictly and do not allow to submit the job
                         * if the quorum of replica number is not satisfied.
                         */
                        for (Tablet tablet : addedTablets) {
                            Env.getCurrentInvertedIndex().deleteTablet(tablet.getId());
                        }
                        throw new DdlException(
                                "tablet " + originTabletId + " has few healthy replica: " + healthyReplicaNum);
                    }
                }

                schemaChangeJob.addPartitionShadowIndex(partitionId, shadowIndexId, shadowIndex);
            } // end for partition
            schemaChangeJob.addIndexSchema(shadowIndexId, originIndexId, newIndexName, newSchemaVersion, newSchemaHash,
                    newShortKeyColumnCount, entry.getValue());
        } // end for index

        // set table state
        olapTable.setState(OlapTableState.SCHEMA_CHANGE);

        // 2. add schemaChangeJob
        addAlterJobV2(schemaChangeJob);

        // 3. write edit log
        Env.getCurrentEnv().getEditLog().logAlterJob(schemaChangeJob);
        LOG.info("finished to create schema change job: {}", schemaChangeJob.getJobId());
    }

    @Override
    protected void runAfterCatalogReady() {
        if (cycleCount >= CYCLE_COUNT_TO_CHECK_EXPIRE_SCHEMA_CHANGE_JOB) {
            clearFinishedOrCancelledSchemaChangeJobV2();
            clearExpireFinishedOrCancelledIndexChangeJobs();
            super.runAfterCatalogReady();
            cycleCount = 0;
        }
        runAlterJobV2();
        runIndexChangeJob();
        cycleCount++;
    }

    private void runAlterJobV2() {
        if (Config.forbid_running_alter_job) {
            return;
        }
        runnableSchemaChangeJobV2.values().forEach(alterJobsV2 -> {
            if (!alterJobsV2.isDone() && !activeSchemaChangeJobsV2.containsKey(alterJobsV2.getJobId())
                    && activeSchemaChangeJobsV2.size() < MAX_ACTIVE_SCHEMA_CHANGE_JOB_V2_SIZE) {
                if (FeConstants.runningUnitTest) {
                    alterJobsV2.run();
                } else {
                    schemaChangeThreadPool.submit(() -> {
                        if (activeSchemaChangeJobsV2.putIfAbsent(alterJobsV2.getJobId(), alterJobsV2) == null) {
                            try {
                                alterJobsV2.run();
                            } finally {
                                activeSchemaChangeJobsV2.remove(alterJobsV2.getJobId());
                            }
                        }
                    });
                }
            }
        });
    }

    private void runIndexChangeJob() {
        runnableIndexChangeJob.values().forEach(indexChangeJob -> {
            if (!indexChangeJob.isDone() && !activeIndexChangeJob.containsKey(indexChangeJob.getJobId())
                    && activeIndexChangeJob.size() < MAX_ACTIVE_SCHEMA_CHANGE_JOB_V2_SIZE) {
                if (FeConstants.runningUnitTest) {
                    indexChangeJob.run();
                } else {
                    schemaChangeThreadPool.submit(() -> {
                        if (activeIndexChangeJob.putIfAbsent(
                                    indexChangeJob.getJobId(), indexChangeJob) == null) {
                            try {
                                indexChangeJob.run();
                            } finally {
                                activeIndexChangeJob.remove(indexChangeJob.getJobId());
                            }
                        }
                    });
                }
            }

            if (indexChangeJob.isDone()) {
                runnableIndexChangeJob.remove(indexChangeJob.getJobId());
            }
        });
    }

    private void changeTableState(long dbId, long tableId, OlapTableState olapTableState) {
        try {
            Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
            OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
            olapTable.writeLockOrMetaException();
            try {
                if (olapTable.getState() == olapTableState) {
                    return;
                } else if (olapTable.getState() == OlapTableState.SCHEMA_CHANGE) {
                    olapTable.setState(olapTableState);
                }
            } finally {
                olapTable.writeUnlock();
            }
        } catch (MetaNotFoundException e) {
            LOG.warn("[INCONSISTENT META] changing table status failed after schema change job done", e);
        }
    }

    public Map<Long, IndexChangeJob> getIndexChangeJobs() {
        return indexChangeJobs;
    }

    public List<List<Comparable>> getAllIndexChangeJobInfos() {
        List<List<Comparable>> indexChangeJobInfos = new LinkedList<>();
        for (IndexChangeJob indexChangeJob : ImmutableList.copyOf(indexChangeJobs.values())) {
            // no need to check priv here. This method is only called in show proc stmt,
            // which already check the ADMIN priv.
            indexChangeJob.getInfo(indexChangeJobInfos);
        }

        // sort by "JobId", "TableName", "PartitionName", "CreateTime", "FinishTime"
        ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(0, 1, 2, 3, 4);
        indexChangeJobInfos.sort(comparator);
        return indexChangeJobInfos;
    }

    public List<List<Comparable>> getAllIndexChangeJobInfos(Database db) {
        List<List<Comparable>> indexChangeJobInfos = new LinkedList<>();
        for (IndexChangeJob indexChangeJob : ImmutableList.copyOf(indexChangeJobs.values())) {
            if (indexChangeJob.getDbId() != db.getId()) {
                continue;
            }
            indexChangeJob.getInfo(indexChangeJobInfos);
        }


        // sort by "JobId", "TableName", "PartitionName", "CreateTime", "FinishTime"
        ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(0, 1, 2, 3, 4);
        indexChangeJobInfos.sort(comparator);
        return indexChangeJobInfos;
    }

    public List<List<Comparable>> getAllAlterJobInfos() {
        List<List<Comparable>> schemaChangeJobInfos = new LinkedList<>();
        for (AlterJobV2 alterJob : ImmutableList.copyOf(alterJobsV2.values())) {
            // no need to check priv here. This method is only called in show proc stmt,
            // which already check the ADMIN priv.
            alterJob.getInfo(schemaChangeJobInfos);
        }

        // sort by "JobId", "PartitionName", "CreateTime", "FinishTime", "IndexName", "IndexState"
        ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(0, 1, 2, 3, 4, 5);
        schemaChangeJobInfos.sort(comparator);
        return schemaChangeJobInfos;
    }

    @Override
    public List<List<Comparable>> getAlterJobInfosByDb(Database db) {
        List<List<Comparable>> schemaChangeJobInfos = new LinkedList<>();
        getAlterJobV2Infos(db, schemaChangeJobInfos);

        // sort by "JobId", "PartitionName", "CreateTime", "FinishTime", "IndexName", "IndexState"
        ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(0, 1, 2, 3, 4, 5);
        schemaChangeJobInfos.sort(comparator);
        return schemaChangeJobInfos;
    }

    private void getAlterJobV2Infos(Database db, List<AlterJobV2> alterJobsV2,
                                    List<List<Comparable>> schemaChangeJobInfos) {
        ConnectContext ctx = ConnectContext.get();
        for (AlterJobV2 alterJob : alterJobsV2) {
            if (alterJob.getDbId() != db.getId()) {
                continue;
            }
            if (ctx != null) {
                if (!Env.getCurrentEnv().getAccessManager()
                        .checkTblPriv(ctx, InternalCatalog.INTERNAL_CATALOG_NAME, db.getFullName(),
                                alterJob.getTableName(), PrivPredicate.ALTER)) {
                    continue;
                }
            }
            alterJob.getInfo(schemaChangeJobInfos);
        }
    }

    private void getAlterJobV2Infos(Database db, List<List<Comparable>> schemaChangeJobInfos) {
        getAlterJobV2Infos(db, ImmutableList.copyOf(alterJobsV2.values()), schemaChangeJobInfos);
    }

    @Override
    public void process(String rawSql, List<AlterClause> alterClauses, Database db,
                        OlapTable olapTable)
            throws UserException {
        olapTable.writeLockOrDdlException();
        try {
            olapTable.checkNormalStateForAlter();
            //alterClauses can or cannot light schema change
            boolean lightSchemaChange = true;
            boolean lightIndexChange = false;
            boolean buildIndexChange = false;

            // index id -> index schema
            Map<Long, LinkedList<Column>> indexSchemaMap = new HashMap<>();

            //for multi add columns clauses
            //index id -> index col_unique_id supplier
            Map<Long, IntSupplier> colUniqueIdSupplierMap = new HashMap<>();
            for (Map.Entry<Long, List<Column>> entry : olapTable.getIndexIdToSchema(true).entrySet()) {
                indexSchemaMap.put(entry.getKey(), new LinkedList<>(entry.getValue()));

                IntSupplier colUniqueIdSupplier = null;
                if (olapTable.getEnableLightSchemaChange()) {
                    colUniqueIdSupplier = new IntSupplier() {
                        public int pendingMaxColUniqueId = olapTable.getIndexMetaByIndexId(entry.getKey())
                                .getMaxColUniqueId();
                        public long indexId = entry.getKey();

                        @Override
                        public int getAsInt() {
                            pendingMaxColUniqueId++;
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("index id:{}, pendingMaxColUniqueId:{}", indexId, pendingMaxColUniqueId);
                            }
                            return pendingMaxColUniqueId;
                        }
                    };
                }
                colUniqueIdSupplierMap.put(entry.getKey(), colUniqueIdSupplier);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("in process indexSchemaMap:{}", indexSchemaMap);
            }

            List<Index> newIndexes = olapTable.getCopiedIndexes();
            List<Index> alterIndexes = new ArrayList<>();
            Map<Long, Set<String>> indexOnPartitions = new HashMap<>();
            boolean isDropIndex = false;
            Map<String, String> propertyMap = new HashMap<>();
            for (AlterClause alterClause : alterClauses) {
                Map<String, String> properties = alterClause.getProperties();
                if (properties != null) {
                    if (propertyMap.isEmpty()) {
                        propertyMap.putAll(properties);
                    } else {
                        throw new DdlException("reduplicated PROPERTIES");
                    }

                    // modification of colocate property is handle alone.
                    // And because there should be only one colocate property modification clause in stmt,
                    // so just return after finished handling.
                    if (properties.containsKey(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH)) {
                        String colocateGroup = properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH);
                        Env.getCurrentEnv().modifyTableColocate(db, olapTable, colocateGroup, false, null);
                        return;
                    } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE)) {
                        String distributionType = properties.get(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE);
                        if (!distributionType.equalsIgnoreCase("random")) {
                            throw new DdlException(
                                    "Only support modifying distribution type of table from" + " hash to random");
                        }
                        Env.getCurrentEnv().convertDistributionType(db, olapTable);
                        return;
                    } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_SEND_CLEAR_ALTER_TASK)) {
                        /*
                         * This is only for fixing bug when upgrading Doris from 0.9.x to 0.10.x.
                         */
                        sendClearAlterTask(db, olapTable);
                        return;
                    } else if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) {
                        DynamicPartitionUtil.checkDynamicPartitionPropertyKeysValid(properties);
                        if (!olapTable.dynamicPartitionExists()) {
                            try {
                                DynamicPartitionUtil.checkInputDynamicPartitionProperties(properties,
                                        olapTable);
                            } catch (DdlException e) {
                                // This table is not a dynamic partition table
                                // and didn't supply all dynamic partition properties
                                throw new DdlException("Table " + db.getFullName() + "." + olapTable.getName()
                                        + " is not a dynamic partition table." + " Use command `HELP ALTER TABLE` "
                                        + "to see how to change a normal table to a dynamic partition table.");
                            }
                        }
                        Env.getCurrentEnv().modifyTableDynamicPartition(db, olapTable, properties);
                        return;
                    } else if (properties.containsKey(
                            "default." + PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
                        Preconditions.checkNotNull(
                                properties.get("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION));
                        Env.getCurrentEnv().modifyTableDefaultReplicaAllocation(db, olapTable, properties);
                        return;
                    } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION)) {
                        Env.getCurrentEnv().modifyTableReplicaAllocation(db, olapTable, properties);
                        return;
                    } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY)) {
                        olapTable.setStoragePolicy(properties.get(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY));
                        return;
                    } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_LIGHT_SCHEMA_CHANGE)) {
                        lightSchemaChange = Boolean.parseBoolean(
                                properties.get(PropertyAnalyzer.PROPERTIES_ENABLE_LIGHT_SCHEMA_CHANGE));
                        if (Objects.equals(olapTable.getEnableLightSchemaChange(), lightSchemaChange)) {
                            return;
                        }
                        if (!lightSchemaChange) {
                            throw new DdlException("Can not alter light_schema_change from true to false currently");
                        }
                        enableLightSchemaChange(db, olapTable);
                        return;
                    } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_STORE_ROW_COLUMN)
                                || properties.containsKey(PropertyAnalyzer.PROPERTIES_ROW_STORE_COLUMNS)) {
                        String value = properties.get(PropertyAnalyzer.PROPERTIES_STORE_ROW_COLUMN);
                        if (value != null && value.equalsIgnoreCase("false")) {
                            throw new DdlException("Can not alter store_row_column from true to false currently");
                        }
                        if (!olapTable.storeRowColumn()) {
                            Column rowColumn = ColumnDefinition
                                    .newRowStoreColumnDefinition(null).translateToCatalogStyle();
                            int maxColUniqueId = olapTable
                                    .getIndexMetaByIndexId(olapTable.getBaseIndexId()).getMaxColUniqueId();
                            rowColumn.setUniqueId(maxColUniqueId + 1);
                            indexSchemaMap.get(olapTable.getBaseIndexId()).add(rowColumn);
                        }
                    } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM)) {
                        Env.getCurrentEnv().modifyTableProperties(db, olapTable, properties);
                        return;
                    }
                }

                // the following operations can not be done when there are temp partitions exist.
                if (olapTable.existTempPartitions()) {
                    throw new DdlException("Can not alter table when there are temp partitions in table");
                }

                if (alterClause instanceof AddColumnClause) {
                    // add column
                    boolean clauseCanLightSchemaChange = processAddColumn((AddColumnClause) alterClause, olapTable,
                            indexSchemaMap, colUniqueIdSupplierMap);
                    if (!clauseCanLightSchemaChange) {
                        lightSchemaChange = false;
                    }
                } else if (alterClause instanceof AddColumnsClause) {
                    // add columns
                    boolean clauseCanLightSchemaChange = processAddColumns((AddColumnsClause) alterClause, olapTable,
                            indexSchemaMap, false, colUniqueIdSupplierMap);
                    if (!clauseCanLightSchemaChange) {
                        lightSchemaChange = false;
                    }
                } else if (alterClause instanceof DropColumnClause) {
                    // drop column and drop indexes on this column
                    boolean clauseCanLightSchemaChange = processDropColumn((DropColumnClause) alterClause, olapTable,
                            indexSchemaMap, newIndexes);
                    if (!clauseCanLightSchemaChange) {
                        lightSchemaChange = false;
                    }
                } else if (alterClause instanceof ModifyColumnClause) {
                    // modify column
                    boolean clauseCanLightSchemaChange = processModifyColumn((ModifyColumnClause) alterClause,
                            olapTable, indexSchemaMap);
                    if (!clauseCanLightSchemaChange) {
                        lightSchemaChange = false;
                    }
                } else if (alterClause instanceof ReorderColumnsClause) {
                    // reorder column
                    processReorderColumn((ReorderColumnsClause) alterClause, olapTable, indexSchemaMap);
                    lightSchemaChange = false;
                } else if (alterClause instanceof ModifyTablePropertiesClause) {
                    // modify table properties
                    // do nothing, properties are already in propertyMap
                    lightSchemaChange = false;
                } else if (alterClause instanceof CreateIndexClause) {
                    CreateIndexClause createIndexClause = (CreateIndexClause) alterClause;
                    Index index = createIndexClause.getIndex();
                    if (processAddIndex(createIndexClause, olapTable, newIndexes)) {
                        return;
                    }
                    lightSchemaChange = false;

                    // ngram_bf index can do light_schema_change in both local and cloud mode
                    // inverted index can only do light_schema_change in local mode
                    if (index.isLightIndexChangeSupported()) {
                        alterIndexes.add(index);
                        isDropIndex = false;
                        lightIndexChange = true;
                    }
                } else if (alterClause instanceof BuildIndexClause) {
                    BuildIndexClause buildIndexClause = (BuildIndexClause) alterClause;
                    IndexDef indexDef = buildIndexClause.getIndexDef();
                    Index index = buildIndexClause.getIndex();
                    if (Config.isCloudMode() && index.getIndexType() == IndexDef.IndexType.INVERTED) {
                        throw new DdlException("BUILD INDEX operation failed: No need to do it in cloud mode.");
                    }

                    if (indexDef.getPartitionNames().isEmpty()) {
                        indexOnPartitions.put(index.getIndexId(), olapTable.getPartitionNames());
                    } else {
                        indexOnPartitions.put(
                                index.getIndexId(), new HashSet<>(indexDef.getPartitionNames()));
                    }

                    alterIndexes.add(index);
                    buildIndexChange = true;
                    lightSchemaChange = false;
                } else if (alterClause instanceof DropIndexClause) {
                    if (processDropIndex((DropIndexClause) alterClause, olapTable, newIndexes)) {
                        return;
                    }
                    lightSchemaChange = false;

                    DropIndexClause dropIndexClause = (DropIndexClause) alterClause;
                    List<Index> existedIndexes = olapTable.getIndexes();
                    Index found = null;
                    for (Index existedIdx : existedIndexes) {
                        if (existedIdx.getIndexName().equalsIgnoreCase(dropIndexClause.getIndexName())) {
                            found = existedIdx;
                            break;
                        }
                    }
                    // only inverted index with local mode can do light drop index change
                    if (found != null && found.getIndexType() == IndexDef.IndexType.INVERTED
                            && Config.isNotCloudMode()) {
                        alterIndexes.add(found);
                        isDropIndex = true;
                        lightIndexChange = true;
                    }
                } else {
                    Preconditions.checkState(false);
                }
            } // end for alter clauses

            if (LOG.isDebugEnabled()) {
                LOG.debug("table: {}({}), lightSchemaChange: {}, lightIndexChange: {},"
                        + " buildIndexChange: {}, indexSchemaMap:{}",
                        olapTable.getName(), olapTable.getId(), lightSchemaChange,
                        lightIndexChange, buildIndexChange, indexSchemaMap);
            }

            if (lightSchemaChange) {
                long jobId = Env.getCurrentEnv().getNextId();
                //for schema change add/drop value column optimize, direct modify table meta.
                modifyTableLightSchemaChange(rawSql, db, olapTable, indexSchemaMap, newIndexes,
                                             null, isDropIndex, jobId, false, propertyMap);
            } else if (Config.enable_light_index_change && lightIndexChange) {
                long jobId = Env.getCurrentEnv().getNextId();
                //for schema change add/drop inverted index and ngram_bf optimize, direct modify table meta firstly.
                modifyTableLightSchemaChange(rawSql, db, olapTable, indexSchemaMap, newIndexes,
                                             alterIndexes, isDropIndex, jobId, false, propertyMap);
            } else if (buildIndexChange) {
                if (alterIndexes.isEmpty()) {
                    throw new DdlException("Altered index is empty. please check your alter stmt.");
                }
                IndexDef.IndexType indexType = alterIndexes.get(0).getIndexType();
                if (Config.enable_light_index_change) {
                    if (indexType == IndexDef.IndexType.INVERTED) {
                        buildOrDeleteTableInvertedIndices(db, olapTable, indexSchemaMap,
                                alterIndexes, indexOnPartitions, false);
                    } else {
                        createJob(rawSql, db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes, true);
                    }
                }
            } else {
                createJob(rawSql, db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes, false);
            }
        } finally {
            olapTable.writeUnlock();
        }
    }

    @Override
    public void processForNereids(String rawSql, List<AlterCommand> alterCommands, Database db,
                               OlapTable olapTable) throws UserException {
        // TODO: convert alterClauses to alterSystemCommands for schema change
    }


    private void enableLightSchemaChange(Database db, OlapTable olapTable) throws DdlException {
        final AlterLightSchChangeHelper alterLightSchChangeHelper = new AlterLightSchChangeHelper(db, olapTable);
        try {
            alterLightSchChangeHelper.enableLightSchemaChange();
        } catch (IllegalStateException e) {
            throw new DdlException(String.format("failed to enable light schema change for table %s.%s",
                    db.getFullName(), olapTable.getName()), e);
        }
    }

    public void replayAlterLightSchChange(AlterLightSchemaChangeInfo info) throws MetaNotFoundException {
        Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrMetaException(info.getDbId());
        OlapTable olapTable = (OlapTable) db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
        olapTable.writeLock();
        final AlterLightSchChangeHelper alterLightSchChangeHelper = new AlterLightSchChangeHelper(db, olapTable);
        try {
            alterLightSchChangeHelper.updateTableMeta(info);
        } catch (IllegalStateException e) {
            LOG.warn("failed to replay alter light schema change", e);
        } finally {
            olapTable.writeUnlock();
        }
    }

    @Override
    public void processExternalTable(List<AlterClause> alterClauses, Database db, Table externalTable)
            throws UserException {
        externalTable.writeLockOrDdlException();
        try {
            // copy the external table schema columns
            List<Column> newSchema = Lists.newArrayList();
            newSchema.addAll(externalTable.getBaseSchema(true));

            for (AlterClause alterClause : alterClauses) {
                if (alterClause instanceof AddColumnClause) {
                    // add column
                    processAddColumn((AddColumnClause) alterClause, externalTable, newSchema);
                } else if (alterClause instanceof AddColumnsClause) {
                    // add columns
                    processAddColumns((AddColumnsClause) alterClause, externalTable, newSchema);
                } else if (alterClause instanceof DropColumnClause) {
                    // drop column and drop indexes on this column
                    processDropColumn((DropColumnClause) alterClause, externalTable, newSchema);
                } else if (alterClause instanceof ModifyColumnClause) {
                    // modify column
                    processModifyColumn((ModifyColumnClause) alterClause, externalTable, newSchema);
                } else if (alterClause instanceof ReorderColumnsClause) {
                    // reorder column
                    processReorderColumn((ReorderColumnsClause) alterClause, externalTable, newSchema);
                } else {
                    Preconditions.checkState(false);
                }
            } // end for alter clauses
            // replace the old column list
            externalTable.setNewFullSchema(newSchema);
            // refresh external table column in edit log
            Env.getCurrentEnv().refreshExternalTableSchema(db, externalTable, newSchema);
        } finally {
            externalTable.writeUnlock();
        }
    }

    private void sendClearAlterTask(Database db, OlapTable olapTable) {
        AgentBatchTask batchTask = new AgentBatchTask();
        olapTable.readLock();
        try {
            for (Partition partition : olapTable.getPartitions()) {
                for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
                    int schemaHash = olapTable.getSchemaHashByIndexId(index.getId());
                    for (Tablet tablet : index.getTablets()) {
                        for (Replica replica : tablet.getReplicas()) {
                            ClearAlterTask alterTask = new ClearAlterTask(replica.getBackendIdWithoutException(),
                                    db.getId(), olapTable.getId(), partition.getId(),
                                    index.getId(), tablet.getId(), schemaHash);
                            batchTask.addTask(alterTask);
                        }
                    }
                }
            }
        } finally {
            olapTable.readUnlock();
        }

        AgentTaskExecutor.submit(batchTask);
        LOG.info("send clear alter task for table {}, number: {}", olapTable.getName(), batchTask.getTaskNum());
    }

    static long storagePolicyNameToId(String storagePolicy) throws DdlException {
        if (storagePolicy == null) {
            return -1; // don't update storage policy
        }
        // if storagePolicy is "", means to reset the storage policy of this table
        if (storagePolicy.isEmpty()) {
            return 0; // reset storage policy
        } else {
            Optional<Policy> policy = Env.getCurrentEnv().getPolicyMgr()
                    .findPolicy(storagePolicy, PolicyTypeEnum.STORAGE);
            if (!policy.isPresent()) {
                throw new DdlException("StoragePolicy[" + storagePolicy + "] not exist");
            }
            return policy.get().getId();
        }
    }

    /**
     * Update all partitions' properties of table
     */
    public void updateTableProperties(Database db, String tableName, Map<String, String> properties)
            throws UserException {
        final Set<String> allowedProps = new HashSet<String>() {
            {
                add(PropertyAnalyzer.PROPERTIES_INMEMORY);
                add(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY);
                add(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED);
                add(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY);
                add(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES);
                add(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD);
                add(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS);
                add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS);
                add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES);
                add(PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE);
                add(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION);
                add(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION);
                add(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD);
                add(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD);
                add(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD);
                add(PropertyAnalyzer.PROPERTIES_AUTO_ANALYZE_POLICY);
                add(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM);
            }
        };
        List<String> notAllowedProps = properties.keySet().stream().filter(s -> !allowedProps.contains(s))
                .collect(Collectors.toList());
        if (!notAllowedProps.isEmpty()) {
            throw new UserException("modifying property " + notAllowedProps + " is forbidden");
        }

        Env.getCurrentEnv().getAlterInstance().checkNoForceProperty(properties);
        List<Partition> partitions = Lists.newArrayList();
        OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP);
        boolean enableUniqueKeyMergeOnWrite = false;
        olapTable.readLock();
        try {
            enableUniqueKeyMergeOnWrite = olapTable.getEnableUniqueKeyMergeOnWrite();
            partitions.addAll(olapTable.getPartitions());
        } finally {
            olapTable.readUnlock();
        }

        String inMemory = properties.get(PropertyAnalyzer.PROPERTIES_INMEMORY);
        int isInMemory = -1; // < 0 means don't update inMemory properties
        if (inMemory != null) {
            isInMemory = Boolean.parseBoolean(inMemory) ? 1 : 0;
            if ((isInMemory > 0) == olapTable.isInMemory()) {
                // already up-to-date
                isInMemory = -1;
            }
        }
        String storagePolicy = properties.get(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY);
        long storagePolicyId = storagePolicyNameToId(storagePolicy);

        String compactionPolicy = properties.get(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY);
        if (compactionPolicy != null
                                    && !compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)
                                    && !compactionPolicy.equals(PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY)) {
            throw new UserException("Table compaction policy only support for "
                                                + PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY
                                                + " or " + PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY);
        }

        if (compactionPolicy != null && compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)
                && olapTable.getKeysType() == KeysType.UNIQUE_KEYS) {
            throw new UserException("Time series compaction policy is not supported for unique key table");
        }

        Map<String, Long> timeSeriesCompactionConfig = new HashMap<>();
        if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)) {
            timeSeriesCompactionConfig
                        .put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES,
                        Long.parseLong(properties
                        .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)));
        }
        if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)) {
            timeSeriesCompactionConfig
                        .put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD,
                        Long.parseLong(properties
                        .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)));
        }
        if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)) {
            timeSeriesCompactionConfig
                        .put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS,
                        Long.parseLong(properties
                        .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)));
        }

        if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)) {
            timeSeriesCompactionConfig
                        .put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD,
                        Long.parseLong(properties
                        .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)));
        }

        if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)) {
            timeSeriesCompactionConfig
                        .put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD,
                        Long.parseLong(properties
                        .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)));
        }


        if (isInMemory < 0 && storagePolicyId < 0 && compactionPolicy == null && timeSeriesCompactionConfig.isEmpty()
                && !properties.containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED)
                && !properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE)
                && !properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION)
                && !properties.containsKey(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION)
                && !properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)
                && !properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES)
                && !properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)
                && !properties.containsKey(PropertyAnalyzer.PROPERTIES_AUTO_ANALYZE_POLICY)
                && !properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM)) {
            LOG.info("Properties already up-to-date");
            return;
        }

        String singleCompaction = properties.get(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION);
        int enableSingleCompaction = -1; // < 0 means don't update
        if (singleCompaction != null) {
            enableSingleCompaction = Boolean.parseBoolean(singleCompaction) ? 1 : 0;
        }

        if (enableUniqueKeyMergeOnWrite && Boolean.parseBoolean(singleCompaction)) {
            throw new UserException(
                    "enable_single_replica_compaction property is not supported for merge-on-write table");
        }

        String enableMowLightDelete = properties.get(
                PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE);
        if (enableMowLightDelete != null && !enableUniqueKeyMergeOnWrite) {
            throw new UserException(
                    "enable_mow_light_delete property is only supported for unique merge-on-write table");
        }

        String disableAutoCompactionBoolean = properties.get(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION);
        int disableAutoCompaction = -1; // < 0 means don't update
        if (disableAutoCompactionBoolean != null) {
            disableAutoCompaction = Boolean.parseBoolean(disableAutoCompactionBoolean) ? 1 : 0;
        }

        String skipWriteIndexOnLoad = properties.get(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD);
        int skip = -1; // < 0 means don't update
        if (skipWriteIndexOnLoad != null) {
            skip = Boolean.parseBoolean(skipWriteIndexOnLoad) ? 1 : 0;
        }

        for (Partition partition : partitions) {
            updatePartitionProperties(db, olapTable.getName(), partition.getName(), storagePolicyId, isInMemory,
                                    null, compactionPolicy, timeSeriesCompactionConfig, enableSingleCompaction, skip,
                                    disableAutoCompaction);
        }

        olapTable.writeLockOrDdlException();
        try {
            Env.getCurrentEnv().modifyTableProperties(db, olapTable, properties);
        } finally {
            olapTable.writeUnlock();
        }
    }

    /**
     * Update some specified partitions' properties of table
     */
    public void updatePartitionsProperties(Database db, String tableName, List<String> partitionNames,
                                           Map<String, String> properties) throws DdlException, MetaNotFoundException {
        Preconditions.checkState(properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)
                || properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY));

        OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName, Table.TableType.OLAP);
        String inMemory = properties.get(PropertyAnalyzer.PROPERTIES_INMEMORY);
        int isInMemory = -1; // < 0 means don't update inMemory properties
        if (inMemory != null) {
            isInMemory = Boolean.parseBoolean(inMemory) ? 1 : 0;
            if ((isInMemory > 0) == olapTable.isInMemory()) {
                // already up-to-date
                isInMemory = -1;
            }
        }
        String storagePolicy = properties.get(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY);
        boolean enableUniqueKeyMergeOnWrite = olapTable.getEnableUniqueKeyMergeOnWrite();
        if (enableUniqueKeyMergeOnWrite && !Strings.isNullOrEmpty(storagePolicy)) {
            throw new DdlException(
                    "Can not set UNIQUE KEY table that enables Merge-On-write" + " with storage policy(" + storagePolicy
                            + ")");
        }
        long storagePolicyId = storagePolicyNameToId(storagePolicy);

        if (isInMemory < 0 && storagePolicyId < 0) {
            LOG.info("Properties already up-to-date");
            return;
        }

        for (String partitionName : partitionNames) {
            try {
                updatePartitionProperties(db, olapTable.getName(), partitionName, storagePolicyId,
                                                                            isInMemory, null, null, null, -1, -1, -1);
            } catch (Exception e) {
                String errMsg = "Failed to update partition[" + partitionName + "]'s 'in_memory' property. "
                        + "The reason is [" + e.getMessage() + "]";
                throw new DdlException(errMsg);
            }
        }
    }

    /**
     * Update one specified partition's properties by partition name of table
     * This operation may return partial successfully, with an exception to inform user to retry
     */
    public void updatePartitionProperties(Database db, String tableName, String partitionName, long storagePolicyId,
                                          int isInMemory, BinlogConfig binlogConfig, String compactionPolicy,
                                          Map<String, Long> timeSeriesCompactionConfig,
                                          int enableSingleCompaction, int skipWriteIndexOnLoad,
                                          int disableAutoCompaction) throws UserException {
        // be id -> <tablet id,schemaHash>
        Map<Long, Set<Pair<Long, Integer>>> beIdToTabletIdWithHash = Maps.newHashMap();
        OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName, Table.TableType.OLAP);
        olapTable.readLock();
        try {
            Partition partition = olapTable.getPartition(partitionName);
            if (partition == null) {
                throw new DdlException(
                        "Partition[" + partitionName + "] does not exist in table[" + olapTable.getName() + "]");
            }

            for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
                int schemaHash = olapTable.getSchemaHashByIndexId(index.getId());
                for (Tablet tablet : index.getTablets()) {
                    for (Replica replica : tablet.getReplicas()) {
                        Set<Pair<Long, Integer>> tabletIdWithHash = beIdToTabletIdWithHash.computeIfAbsent(
                                replica.getBackendId(), k -> Sets.newHashSet());
                        tabletIdWithHash.add(Pair.of(tablet.getId(), schemaHash));
                    }
                }
            }
        } finally {
            olapTable.readUnlock();
        }

        int totalTaskNum = beIdToTabletIdWithHash.keySet().size();
        MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> countDownLatch = new MarkedCountDownLatch<>(totalTaskNum);
        AgentBatchTask batchTask = new AgentBatchTask();
        for (Map.Entry<Long, Set<Pair<Long, Integer>>> kv : beIdToTabletIdWithHash.entrySet()) {
            countDownLatch.addMark(kv.getKey(), kv.getValue());
            UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(kv.getKey(), kv.getValue(), isInMemory,
                                            storagePolicyId, binlogConfig, countDownLatch, compactionPolicy,
                                            timeSeriesCompactionConfig, enableSingleCompaction, skipWriteIndexOnLoad,
                                            disableAutoCompaction);
            batchTask.addTask(task);
        }
        if (!FeConstants.runningUnitTest) {
            // send all tasks and wait them finished
            AgentTaskQueue.addBatchTask(batchTask);
            AgentTaskExecutor.submit(batchTask);
            LOG.info("send update tablet meta task for table {}, partitions {}, number: {}", tableName, partitionName,
                    batchTask.getTaskNum());

            // estimate timeout
            long timeout = DbUtil.getCreateReplicasTimeoutMs(totalTaskNum);
            boolean ok = false;
            try {
                ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                LOG.warn("InterruptedException: ", e);
            }

            if (!ok || !countDownLatch.getStatus().ok()) {
                String errMsg = "Failed to update partition[" + partitionName + "]. tablet meta.";
                // clear tasks
                AgentTaskQueue.removeBatchTask(batchTask, TTaskType.UPDATE_TABLET_META_INFO);

                if (!countDownLatch.getStatus().ok()) {
                    errMsg += " Error: " + countDownLatch.getStatus().getErrorMsg();
                } else {
                    // only show at most 3 results
                    List<String> subList = countDownLatch.getLeftMarks().stream().limit(3)
                            .map(item -> "(backendId = " + item.getKey() + ", tabletsWithHash = "
                                    + item.getValue() + ")")
                            .collect(Collectors.toList());
                    if (!subList.isEmpty()) {
                        errMsg += " Unfinished: " + Joiner.on(", ").join(subList);
                    }
                }
                errMsg += ". This operation maybe partial successfully, You should retry until success.";
                LOG.warn(errMsg);
                throw new DdlException(errMsg);
            }
        }
    }

    public void cancel(CancelAlterTableCommand command) throws DdlException {
        cancelColumnJob(command);
    }

    @Override
    public void cancel(CancelStmt stmt) throws DdlException {
        CancelAlterTableStmt cancelAlterTableStmt = (CancelAlterTableStmt) stmt;
        if (cancelAlterTableStmt.getAlterType() == AlterType.INDEX) {
            cancelIndexJob(cancelAlterTableStmt);
        } else {
            cancelColumnJob(cancelAlterTableStmt);
        }
    }

    private void cancelColumnJob(CancelAlterTableCommand command) throws DdlException {
        String dbName = command.getDbName();
        String tableName = command.getTableName();
        Preconditions.checkState(!Strings.isNullOrEmpty(dbName));
        Preconditions.checkState(!Strings.isNullOrEmpty(tableName));

        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
        AlterJobV2 schemaChangeJobV2 = null;

        OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
        olapTable.writeLockOrDdlException();
        try {
            if (olapTable.getState() != OlapTableState.SCHEMA_CHANGE
                    && olapTable.getState() != OlapTableState.WAITING_STABLE) {
                throw new DdlException("Table[" + tableName + "] is not under SCHEMA_CHANGE.");
            }

            // find from new alter jobs first
            List<AlterJobV2> schemaChangeJobV2List = getUnfinishedAlterJobV2ByTableId(olapTable.getId());
            // current schemaChangeJob job doesn't support batch operation,so just need to get one job
            schemaChangeJobV2 = schemaChangeJobV2List.size() == 0 ? null
                : Iterables.getOnlyElement(schemaChangeJobV2List);
            if (schemaChangeJobV2 == null) {
                throw new DdlException(
                    "Table[" + tableName + "] is under schema change state" + " but could not find related job");
            }
        } finally {
            olapTable.writeUnlock();
        }

        // alter job v2's cancel must be called outside the database lock
        if (schemaChangeJobV2 != null) {
            if (!schemaChangeJobV2.cancel("user cancelled")) {
                throw new DdlException("Job can not be cancelled. State: " + schemaChangeJobV2.getJobState());
            }
        }
    }

    private void cancelColumnJob(CancelAlterTableStmt cancelAlterTableStmt) throws DdlException {
        String dbName = cancelAlterTableStmt.getDbName();
        String tableName = cancelAlterTableStmt.getTableName();
        Preconditions.checkState(!Strings.isNullOrEmpty(dbName));
        Preconditions.checkState(!Strings.isNullOrEmpty(tableName));

        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
        AlterJobV2 schemaChangeJobV2 = null;

        OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
        olapTable.writeLockOrDdlException();
        try {
            if (olapTable.getState() != OlapTableState.SCHEMA_CHANGE
                    && olapTable.getState() != OlapTableState.WAITING_STABLE) {
                throw new DdlException("Table[" + tableName + "] is not under SCHEMA_CHANGE.");
            }

            // find from new alter jobs first
            List<AlterJobV2> schemaChangeJobV2List = getUnfinishedAlterJobV2ByTableId(olapTable.getId());
            // current schemaChangeJob job doesn't support batch operation,so just need to get one job
            schemaChangeJobV2 = schemaChangeJobV2List.size() == 0 ? null
                    : Iterables.getOnlyElement(schemaChangeJobV2List);
            if (schemaChangeJobV2 == null) {
                throw new DdlException(
                        "Table[" + tableName + "] is under schema change state" + " but could not find related job");
            }
        } finally {
            olapTable.writeUnlock();
        }

        // alter job v2's cancel must be called outside the database lock
        if (schemaChangeJobV2 != null) {
            if (!schemaChangeJobV2.cancel("user cancelled")) {
                throw new DdlException("Job can not be cancelled. State: " + schemaChangeJobV2.getJobState());
            }
            return;
        }
    }

    public void cancelIndexJob(CancelBuildIndexCommand command) throws DdlException {
        String dbName = command.getDbName();
        String tableName = command.getTableName();
        Preconditions.checkState(!Strings.isNullOrEmpty(dbName));
        Preconditions.checkState(!Strings.isNullOrEmpty(tableName));

        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);

        List<IndexChangeJob> jobList = new ArrayList<>();

        Table olapTable = db.getTableOrDdlException(tableName, Table.TableType.OLAP);
        olapTable.writeLock();
        try {
            // find from index change jobs first
            if (command.getAlterJobIdList() != null
                    && command.getAlterJobIdList().size() > 0) {
                for (Long jobId : command.getAlterJobIdList()) {
                    IndexChangeJob job = indexChangeJobs.get(jobId);
                    if (job == null) {
                        continue;
                    }
                    jobList.add(job);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("add build index job {} on table {} for specific id", jobId, tableName);
                    }
                }
            } else {
                for (IndexChangeJob job : indexChangeJobs.values()) {
                    if (!job.isDone() && job.getTableId() == olapTable.getId()) {
                        jobList.add(job);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("add build index job {} on table {} for all", job.getJobId(), tableName);
                        }
                    }
                }
            }
        } finally {
            olapTable.writeUnlock();
        }

        // alter job v2's cancel must be called outside the table lock
        if (jobList.size() > 0) {
            for (IndexChangeJob job : jobList) {
                long jobId = job.getJobId();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("cancel build index job {} on table {}", jobId, tableName);
                }
                if (!job.cancel("user cancelled")) {
                    LOG.warn("cancel build index job {} on table {} failed", jobId, tableName);
                    throw new DdlException("Job can not be cancelled. State: " + job.getJobState());
                } else {
                    LOG.info("cancel build index job {} on table {} success", jobId, tableName);
                }
            }
        } else {
            throw new DdlException("No job to cancel for Table[" + tableName + "]");
        }
    }

    private void cancelIndexJob(CancelAlterTableStmt cancelAlterTableStmt) throws DdlException {
        String dbName = cancelAlterTableStmt.getDbName();
        String tableName = cancelAlterTableStmt.getTableName();
        Preconditions.checkState(!Strings.isNullOrEmpty(dbName));
        Preconditions.checkState(!Strings.isNullOrEmpty(tableName));

        Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);

        List<IndexChangeJob> jobList = new ArrayList<>();

        Table olapTable = db.getTableOrDdlException(tableName, Table.TableType.OLAP);
        olapTable.writeLock();
        try {
            // find from index change jobs first
            if (cancelAlterTableStmt.getAlterJobIdList() != null
                    && cancelAlterTableStmt.getAlterJobIdList().size() > 0) {
                for (Long jobId : cancelAlterTableStmt.getAlterJobIdList()) {
                    IndexChangeJob job = indexChangeJobs.get(jobId);
                    if (job == null) {
                        continue;
                    }
                    jobList.add(job);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("add build index job {} on table {} for specific id", jobId, tableName);
                    }
                }
            } else {
                for (IndexChangeJob job : indexChangeJobs.values()) {
                    if (!job.isDone() && job.getTableId() == olapTable.getId()) {
                        jobList.add(job);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("add build index job {} on table {} for all", job.getJobId(), tableName);
                        }
                    }
                }
            }
        } finally {
            olapTable.writeUnlock();
        }

        // if this table has ngram_bf index, we must run cancel for schema change job
        boolean hasNGramBFIndex = ((OlapTable) olapTable).hasIndexOfType(IndexDef.IndexType.NGRAM_BF);
        // alter job v2's cancel must be called outside the table lock
        if (jobList.size() > 0) {
            for (IndexChangeJob job : jobList) {
                long jobId = job.getJobId();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("cancel build index job {} on table {}", jobId, tableName);
                }
                if (!job.cancel("user cancelled")) {
                    LOG.warn("cancel build index job {} on table {} failed", jobId, tableName);
                    throw new DdlException("Job can not be cancelled. State: " + job.getJobState());
                } else {
                    LOG.info("cancel build index job {} on table {} success", jobId, tableName);
                }
            }
        } else if (hasNGramBFIndex) {
            cancelColumnJob(cancelAlterTableStmt);
        } else {
            throw new DdlException("No job to cancel for Table[" + tableName + "]");
        }
    }

    /**
     * Returns true if the index already exists, there is no need to create the job to add the index.
     * Otherwise, return false, there is need to create a job to add the index.
     */
    private boolean processAddIndex(CreateIndexClause alterClause, OlapTable olapTable, List<Index> newIndexes)
            throws UserException {
        Index alterIndex = alterClause.getIndex();
        if (alterIndex == null) {
            return false;
        }

        List<Index> existedIndexes = olapTable.getIndexes();
        IndexDef indexDef = alterClause.getIndexDef();
        Set<String> newColset = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
        newColset.addAll(indexDef.getColumns());
        Set<Long> existedIndexIdSet = Sets.newHashSet();
        if (checkDuplicateIndexes(existedIndexes, indexDef, newColset, existedIndexIdSet, olapTable)) {
            return true;
        }
        if (checkDuplicateIndexes(newIndexes, indexDef, newColset, existedIndexIdSet, olapTable)) {
            return true;
        }

        // The restored olap table may not reset the index id, which comes from the upstream,
        // so we need to check and reset the index id here, to avoid confliction.
        // See OlapTable.resetIdsForRestore for details.
        while (existedIndexIdSet.contains(alterIndex.getIndexId())) {
            alterIndex.setIndexId(Env.getCurrentEnv().getNextId());
        }

        for (String col : indexDef.getColumns()) {
            Column column = olapTable.getColumn(col);
            if (column != null) {
                indexDef.checkColumn(column, olapTable.getKeysType(),
                        olapTable.getEnableUniqueKeyMergeOnWrite(),
                        olapTable.getInvertedIndexFileStorageFormat());
            } else {
                throw new DdlException("index column does not exist in table. invalid column: " + col);
            }
        }

        // the column name in CreateIndexClause is not check case sensitivity,
        // when send index description to BE, there maybe cannot find column by name,
        // so here update column name in CreateIndexClause after checkColumn for indexDef,
        // there will use the column name in olapTable instead of the column name in CreateIndexClause.
        alterIndex.setColumns(indexDef.getColumns());
        newIndexes.add(alterIndex);
        return false;
    }

    private boolean checkDuplicateIndexes(List<Index> indexes, IndexDef indexDef, Set<String> newColset,
                                          Set<Long> existedIndexIdSet, OlapTable olapTable) throws DdlException {
        for (Index index : indexes) {
            if (index.getIndexName().equalsIgnoreCase(indexDef.getIndexName())) {
                if (indexDef.isSetIfNotExists()) {
                    LOG.info("create index[{}] which already exists on table[{}]", indexDef.getIndexName(),
                            olapTable.getName());
                    return true;
                }
                throw new DdlException("index `" + indexDef.getIndexName() + "` already exist.");
            }
            Set<String> existedIdxColSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
            existedIdxColSet.addAll(index.getColumns());
            if (index.getIndexType() == indexDef.getIndexType() && newColset.equals(existedIdxColSet)) {
                throw new DdlException(
                        indexDef.getIndexType() + " index for columns (" + String.join(",", indexDef.getColumns())
                                + ") already exist.");
            }
            existedIndexIdSet.add(index.getIndexId());
        }
        return false;
    }

    /**
     * Returns true if the index does not exist, there is no need to create the job to drop the index.
     * Otherwise return false, there is need to create a job to drop the index.
     */
    private boolean processDropIndex(DropIndexClause alterClause, OlapTable olapTable, List<Index> indexes)
            throws DdlException {
        String indexName = alterClause.getIndexName();
        List<Index> existedIndexes = olapTable.getIndexes();
        Index found = null;
        for (Index existedIdx : existedIndexes) {
            if (existedIdx.getIndexName().equalsIgnoreCase(indexName)) {
                found = existedIdx;
                break;
            }
        }
        if (found == null) {
            if (alterClause.isSetIfExists()) {
                LOG.info("drop index[{}] which does not exist on table[{}]", indexName, olapTable.getName());
                return true;
            }
            throw new DdlException("index " + indexName + " does not exist");
        }

        Iterator<Index> itr = indexes.iterator();
        while (itr.hasNext()) {
            Index idx = itr.next();
            if (idx.getIndexName().equalsIgnoreCase(alterClause.getIndexName())) {
                itr.remove();
                break;
            }
        }
        return false;
    }

    @Override
    public void addAlterJobV2(AlterJobV2 alterJob) throws AnalysisException {
        super.addAlterJobV2(alterJob);
        runnableSchemaChangeJobV2.put(alterJob.getJobId(), alterJob);
    }

    public void addIndexChangeJob(IndexChangeJob indexChangeJob) {
        indexChangeJobs.put(indexChangeJob.getJobId(), indexChangeJob);
        runnableIndexChangeJob.put(indexChangeJob.getJobId(), indexChangeJob);
        LOG.info("add inverted index job {}", indexChangeJob.getJobId());
    }

    private void clearFinishedOrCancelledSchemaChangeJobV2() {
        Iterator<Map.Entry<Long, AlterJobV2>> iterator = runnableSchemaChangeJobV2.entrySet().iterator();
        while (iterator.hasNext()) {
            AlterJobV2 alterJobV2 = iterator.next().getValue();
            if (alterJobV2.isDone()) {
                iterator.remove();
            }
        }
    }

    private void clearExpireFinishedOrCancelledIndexChangeJobs() {
        Iterator<Map.Entry<Long, IndexChangeJob>> iterator = indexChangeJobs.entrySet().iterator();
        while (iterator.hasNext()) {
            IndexChangeJob indexChangeJob = iterator.next().getValue();
            if (indexChangeJob.isExpire()) {
                iterator.remove();
                LOG.info("remove expired inverted index job {}. finish at {}",
                        indexChangeJob.getJobId(), TimeUtils.longToTimeString(indexChangeJob.getFinishedTimeMs()));
            }
        }

        Iterator<Map.Entry<Long, IndexChangeJob>> runnableIterator = runnableIndexChangeJob.entrySet().iterator();
        while (runnableIterator.hasNext()) {
            IndexChangeJob indexChangeJob = runnableIterator.next().getValue();
            if (indexChangeJob.isDone()) {
                runnableIterator.remove();
            }
        }
    }

    @Override
    public void replayRemoveAlterJobV2(RemoveAlterJobV2OperationLog log) {
        if (runnableSchemaChangeJobV2.containsKey(log.getJobId())) {
            runnableSchemaChangeJobV2.remove(log.getJobId());
        }
        super.replayRemoveAlterJobV2(log);
    }

    @Override
    public void replayAlterJobV2(AlterJobV2 alterJob) throws AnalysisException {
        if (!alterJob.isDone() && !runnableSchemaChangeJobV2.containsKey(alterJob.getJobId())) {
            runnableSchemaChangeJobV2.put(alterJob.getJobId(), alterJob);
        }
        super.replayAlterJobV2(alterJob);
    }

    // the invoker should keep table's write lock
    public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable olapTable,
                                             Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> indexes,
                                             List<Index> alterIndexes, boolean isDropIndex,
                                             long jobId, boolean isReplay, Map<String, String> propertyMap)
            throws DdlException, AnalysisException {

        if (LOG.isDebugEnabled()) {
            LOG.debug("indexSchemaMap:{}, indexes:{}", indexSchemaMap, indexes);
        }
        // for bitmapIndex
        boolean hasIndexChange = false;
        Set<Index> newSet = new HashSet<>(indexes);
        Set<Index> oriSet = new HashSet<>(olapTable.getIndexes());
        if (!newSet.equals(oriSet)) {
            hasIndexChange = true;
        }

        // begin checking each table
        Map<Long, List<Column>> changedIndexIdToSchema = Maps.newHashMap();
        try {
            changedIndexIdToSchema = checkTable(db, olapTable, indexSchemaMap);
        } catch (DdlException e) {
            throw new DdlException("Table " + db.getFullName() + "." + olapTable.getName() + " check failed", e);
        }

        if (changedIndexIdToSchema.isEmpty() && !hasIndexChange) {
            throw new DdlException("Nothing is changed. please check your alter stmt.");
        }

        //for compatibility, we need create a finished state schema change job v2

        SchemaChangeJobV2 schemaChangeJob = AlterJobV2Factory.createSchemaChangeJobV2(
                rawSql, jobId, db.getId(), olapTable.getId(),
                olapTable.getName(), 1000);

        for (Map.Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) {
            long originIndexId = entry.getKey();
            String newIndexName = SHADOW_NAME_PREFIX + olapTable.getIndexNameById(originIndexId);
            MaterializedIndexMeta currentIndexMeta = olapTable.getIndexMetaByIndexId(originIndexId);
            // 1. get new schema version/schema version hash, short key column count
            int currentSchemaVersion = currentIndexMeta.getSchemaVersion();
            int newSchemaVersion = currentSchemaVersion + 1;
            // generate schema hash for new index has to generate a new schema hash not equal to current schema hash
            schemaChangeJob.addIndexSchema(originIndexId, originIndexId, newIndexName, newSchemaVersion,
                    currentIndexMeta.getSchemaHash(), currentIndexMeta.getShortKeyColumnCount(), entry.getValue());
        }

        //update base index schema
        Map<Long, List<Column>> oldIndexSchemaMap = olapTable.getCopiedIndexIdToSchema(true);
        try {
            updateBaseIndexSchema(olapTable, indexSchemaMap, indexes);
        } catch (Exception e) {
            throw new DdlException(e.getMessage());
        }

        schemaChangeJob.stateWait("FE.LIGHT_SCHEMA_CHANGE");
        // set Job state then add job
        schemaChangeJob.setJobState(AlterJobV2.JobState.FINISHED);
        schemaChangeJob.setFinishedTimeMs(System.currentTimeMillis());
        this.addAlterJobV2(schemaChangeJob);

        if (alterIndexes != null) {
            if (!isReplay) {
                TableAddOrDropInvertedIndicesInfo info = new TableAddOrDropInvertedIndicesInfo(rawSql, db.getId(),
                        olapTable.getId(), indexSchemaMap, indexes, alterIndexes, isDropIndex, jobId);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("logModifyTableAddOrDropInvertedIndices info:{}", info);
                }
                if (!FeConstants.runningUnitTest) {
                    Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropInvertedIndices(info);
                }
                // Drop table column stats after light schema change finished.
                Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable, null);

                if (isDropIndex) {
                    // send drop rpc to be
                    Map<Long, Set<String>> invertedIndexOnPartitions = new HashMap<>();
                    for (Index index : alterIndexes) {
                        invertedIndexOnPartitions.put(index.getIndexId(), olapTable.getPartitionNames());
                    }
                    try {
                        buildOrDeleteTableInvertedIndices(db, olapTable, indexSchemaMap,
                                alterIndexes, invertedIndexOnPartitions, true);
                    } catch (Exception e) {
                        throw new DdlException(e.getMessage());
                    }
                }
            }

            LOG.info("finished modify table's meta for add or drop inverted index. table: {}, job: {}, is replay: {}",
                    olapTable.getName(), jobId, isReplay);
        } else {
            if (!isReplay) {
                Map<String, Long> indexNameToId = new HashMap<>(olapTable.getIndexNameToId());
                TableAddOrDropColumnsInfo info = new TableAddOrDropColumnsInfo(
                        rawSql, db.getId(), olapTable.getId(), olapTable.getBaseIndexId(),
                        indexSchemaMap, oldIndexSchemaMap, indexNameToId, indexes, jobId);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("logModifyTableAddOrDropColumns info:{}", info);
                }
                Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropColumns(info);
                // Drop table column stats after light schema change finished.
                Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable, null);
            }
            LOG.info("finished modify table's add or drop or modify columns. table: {}, job: {}, is replay: {}",
                    olapTable.getName(), jobId, isReplay);
        }
        // for bloom filter, rebuild bloom filter info by table schema in replay
        if (isReplay) {
            Set<String> bfCols = olapTable.getCopiedBfColumns();
            if (bfCols != null) {
                List<Column> columns = olapTable.getBaseSchema();
                Set<String> newBfCols = null;
                for (String bfCol : bfCols) {
                    for (Column column : columns) {
                        if (column.getName().equalsIgnoreCase(bfCol)) {
                            if (newBfCols == null) {
                                newBfCols = Sets.newHashSet();
                            }
                            newBfCols.add(column.getName());
                        }
                    }
                }
                olapTable.setBloomFilterInfo(newBfCols, olapTable.getBfFpp());
            }
        }
    }

    public void replayModifyTableLightSchemaChange(TableAddOrDropColumnsInfo info)
            throws MetaNotFoundException, AnalysisException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("info:{}", info);
        }
        long dbId = info.getDbId();
        long tableId = info.getTableId();
        Map<Long, LinkedList<Column>> indexSchemaMap = info.getIndexSchemaMap();
        List<Index> indexes = info.getIndexes();
        long jobId = info.getJobId();

        Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrMetaException(dbId);
        OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
        olapTable.writeLock();
        try {
            modifyTableLightSchemaChange("", db, olapTable, indexSchemaMap, indexes, null, false, jobId,
                    true, new HashMap<>());
        } catch (DdlException e) {
            // should not happen
            LOG.warn("failed to replay modify table add or drop or modify columns", e);
        } finally {
            olapTable.writeUnlock();
        }
    }

    public Map<Long, List<Column>> checkTable(Database db, OlapTable olapTable,
                                              Map<Long, LinkedList<Column>> indexSchemaMap) throws DdlException {
        Map<Long, List<Column>> changedIndexIdToSchema = Maps.newHashMap();
        // ATTN: DO NOT change any meta in this loop
        for (Long alterIndexId : indexSchemaMap.keySet()) {
            // Must get all columns including invisible columns.
            // Because in alter process, all columns must be considered.
            List<Column> alterSchema = indexSchemaMap.get(alterIndexId);
            if (LOG.isDebugEnabled()) {
                LOG.debug("index[{}] is changed. start checking...", alterIndexId);
            }
            // 1. check order: a) has key; b) value after key
            boolean meetValue = false;
            boolean hasKey = false;
            for (Column column : alterSchema) {
                if (column.isKey() && meetValue) {
                    throw new DdlException(
                            "Invalid column order. value should be after key. index[" + olapTable.getIndexNameById(
                                    alterIndexId) + "]");
                }
                if (!column.isKey()) {
                    meetValue = true;
                } else {
                    hasKey = true;
                }
            }
            if (!hasKey && !olapTable.isDuplicateWithoutKey()) {
                throw new DdlException("No key column left. index[" + olapTable.getIndexNameById(alterIndexId) + "]");
            }

            // 2. check partition key
            PartitionInfo partitionInfo = olapTable.getPartitionInfo();
            if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
                List<Column> partitionColumns = partitionInfo.getPartitionColumns();
                for (Column partitionCol : partitionColumns) {
                    boolean found = false;
                    for (Column alterColumn : alterSchema) {
                        if (alterColumn.nameEquals(partitionCol.getName(), true)) {
                            found = true;
                            break;
                        }
                    } // end for alterColum

                    if (!found && alterIndexId == olapTable.getBaseIndexId()) {
                        // 2.1 partition column cannot be deleted.
                        throw new DdlException(
                                "Partition column[" + partitionCol.getName() + "] cannot be dropped. index["
                                        + olapTable.getIndexNameById(alterIndexId) + "]");
                    }
                } // end for partitionColumns
            }

            // 3. check distribution key:
            DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
            if (distributionInfo.getType() == DistributionInfoType.HASH) {
                List<Column> distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns();
                for (Column distributionCol : distributionColumns) {
                    boolean found = false;
                    for (Column alterColumn : alterSchema) {
                        if (alterColumn.nameEquals(distributionCol.getName(), true)) {
                            found = true;
                            break;
                        }
                    } // end for alterColumns
                    if (!found && alterIndexId == olapTable.getBaseIndexId()) {
                        // 2.2 distribution column cannot be deleted.
                        throw new DdlException(
                                "Distribution column[" + distributionCol.getName() + "] cannot be dropped. index["
                                        + olapTable.getIndexNameById(alterIndexId) + "]");
                    }
                } // end for distributionCols
            }

            // 5. store the changed columns for edit log
            changedIndexIdToSchema.put(alterIndexId, alterSchema);

            if (LOG.isDebugEnabled()) {
                LOG.debug("schema change[{}-{}-{}] check pass.", db.getId(), olapTable.getId(), alterIndexId);
            }
        } // end for indices
        return changedIndexIdToSchema;
    }

    public void updateBaseIndexSchema(OlapTable olapTable, Map<Long, LinkedList<Column>> indexSchemaMap,
                                      List<Index> indexes) throws IOException {
        long baseIndexId = olapTable.getBaseIndexId();
        List<Long> indexIds = new ArrayList<Long>();
        indexIds.add(baseIndexId);
        indexIds.addAll(olapTable.getIndexIdListExceptBaseIndex());
        for (int i = 0; i < indexIds.size(); i++) {
            List<Column> indexSchema = indexSchemaMap.get(indexIds.get(i));
            MaterializedIndexMeta currentIndexMeta = olapTable.getIndexMetaByIndexId(indexIds.get(i));
            currentIndexMeta.setSchema(indexSchema);

            int currentSchemaVersion = currentIndexMeta.getSchemaVersion();
            int newSchemaVersion = currentSchemaVersion + 1;
            currentIndexMeta.setSchemaVersion(newSchemaVersion);

            //update max column unique id
            int maxColUniqueId = currentIndexMeta.getMaxColUniqueId();
            for (Column column : indexSchema) {
                if (column.getUniqueId() > maxColUniqueId) {
                    maxColUniqueId = column.getUniqueId();
                }
            }
            currentIndexMeta.setMaxColUniqueId(maxColUniqueId);
            currentIndexMeta.setIndexes(indexes);
        }
        olapTable.setIndexes(indexes);
        olapTable.rebuildFullSchema();
        olapTable.rebuildDistributionInfo();
    }

    public void replayModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info)
            throws MetaNotFoundException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("info:{}", info);
        }
        long dbId = info.getDbId();
        long tableId = info.getTableId();
        Map<Long, LinkedList<Column>> indexSchemaMap = info.getIndexSchemaMap();
        List<Index> newIndexes = info.getIndexes();
        List<Index> alterIndexes = info.getAlterInvertedIndexes();
        boolean isDropIndex = info.getIsDropInvertedIndex();
        long jobId = info.getJobId();

        Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrMetaException(dbId);
        OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
        olapTable.writeLock();
        try {
            modifyTableLightSchemaChange("", db, olapTable, indexSchemaMap, newIndexes,
                                             alterIndexes, isDropIndex, jobId, true, new HashMap<>());
        } catch (UserException e) {
            // should not happen
            LOG.warn("failed to replay modify table add or drop indexes", e);
        } finally {
            olapTable.writeUnlock();
        }
    }

    public void buildOrDeleteTableInvertedIndices(Database db, OlapTable olapTable,
            Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> alterIndexes,
            Map<Long, Set<String>> invertedIndexOnPartitions, boolean isDropOp) throws UserException {
        LOG.info("begin to build table's inverted index. table: {}", olapTable.getName());

        // for now table's state can only be NORMAL
        Preconditions.checkState(olapTable.getState() == OlapTableState.NORMAL, olapTable.getState().name());

        // remove the index which is not the base index, only base index can be built inverted index
        indexSchemaMap.entrySet().removeIf(entry -> !entry.getKey().equals(olapTable.getBaseIndexId()));
        // begin checking each table
        Map<Long, List<Column>> changedIndexIdToSchema = Maps.newHashMap();
        try {
            changedIndexIdToSchema = checkTable(db, olapTable, indexSchemaMap);
        } catch (DdlException e) {
            throw new DdlException("Table " + db.getFullName() + "." + olapTable.getName()
                                   + " check failed");
        }
        if (changedIndexIdToSchema.isEmpty() && alterIndexes.isEmpty()) {
            throw new DdlException("Nothing is changed. please check your alter stmt.");
        }

        try {
            long timeoutSecond = Config.alter_table_timeout_second;
            for (Map.Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) {
                long originIndexId = entry.getKey();
                for (Partition partition : olapTable.getPartitions()) {
                    // create job
                    long jobId = Env.getCurrentEnv().getNextId();
                    IndexChangeJob indexChangeJob = new IndexChangeJob(
                            jobId, db.getId(), olapTable.getId(), olapTable.getName(), timeoutSecond * 1000);
                    indexChangeJob.setOriginIndexId(originIndexId);
                    indexChangeJob.setAlterInvertedIndexInfo(isDropOp, alterIndexes);
                    long partitionId = partition.getId();
                    String partitionName = partition.getName();
                    boolean found = false;
                    for (Set<String> partitions : invertedIndexOnPartitions.values()) {
                        if (partitions.contains(partitionName)) {
                            found = true;
                            break;
                        }
                    }
                    if (!found) {
                        continue;
                    }

                    if (hasIndexChangeJobOnPartition(originIndexId, db.getId(), olapTable.getId(),
                            partitionName, alterIndexes, isDropOp)) {
                        throw new DdlException("partition " + partitionName + " has been built specified index."
                                            + " please check your build stmt.");
                    }

                    indexChangeJob.setPartitionId(partitionId);
                    indexChangeJob.setPartitionName(partitionName);

                    addIndexChangeJob(indexChangeJob);

                    // write edit log
                    if (!FeConstants.runningUnitTest) {
                        Env.getCurrentEnv().getEditLog().logIndexChangeJob(indexChangeJob);
                    }
                    LOG.info("finish create table's inverted index job. table: {}, partition: {}, job: {}",
                            olapTable.getName(), partitionName, jobId);
                } // end for partition
            } // end for index
        } catch (Exception e) {
            LOG.warn("Exception:", e);
            throw new UserException(e.getMessage());
        }
    }

    public boolean hasIndexChangeJobOnPartition(
            long originIndexId, long dbId, long tableId, String partitionName,
            List<Index> alterIndexes, boolean isDrop) {
        // TODO: this is temporary methods
        for (IndexChangeJob indexChangeJob : ImmutableList.copyOf(indexChangeJobs.values())) {
            if (indexChangeJob.getOriginIndexId() == originIndexId
                    && indexChangeJob.getDbId() == dbId
                    && indexChangeJob.getTableId() == tableId
                    && indexChangeJob.getPartitionName().equals(partitionName)
                    && indexChangeJob.hasSameAlterInvertedIndex(isDrop, alterIndexes)
                    && !indexChangeJob.isDone()) {
                // if JobState is done (CANCELLED or FINISHED), also allow user to create job again
                return true;
            }
        }
        return false;
    }

    public void replayIndexChangeJob(IndexChangeJob indexChangeJob) throws MetaNotFoundException {
        if (!indexChangeJob.isDone() && !runnableSchemaChangeJobV2.containsKey(indexChangeJob.getJobId())) {
            runnableIndexChangeJob.put(indexChangeJob.getJobId(), indexChangeJob);
        }
        indexChangeJobs.put(indexChangeJob.getJobId(), indexChangeJob);
        indexChangeJob.replay(indexChangeJob);
        if (indexChangeJob.isDone()) {
            runnableIndexChangeJob.remove(indexChangeJob.getJobId());
        }
    }

    public boolean updateBinlogConfig(Database db, OlapTable olapTable, List<AlterClause> alterClauses)
            throws DdlException, UserException {
        List<Partition> partitions = Lists.newArrayList();
        BinlogConfig oldBinlogConfig;
        BinlogConfig newBinlogConfig;

        olapTable.readLock();
        try {
            oldBinlogConfig = new BinlogConfig(olapTable.getBinlogConfig());
            newBinlogConfig = new BinlogConfig(oldBinlogConfig);
            partitions.addAll(olapTable.getPartitions());
        } catch (Exception e) {
            throw new DdlException(e.getMessage());
        } finally {
            olapTable.readUnlock();
        }

        for (AlterClause alterClause : alterClauses) {
            Map<String, String> properties = alterClause.getProperties();
            if (properties == null) {
                continue;
            }

            if (properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_ENABLE)) {
                boolean binlogEnable = Boolean.parseBoolean(properties.get(
                        PropertyAnalyzer.PROPERTIES_BINLOG_ENABLE));
                if (binlogEnable != oldBinlogConfig.isEnable()) {
                    newBinlogConfig.setEnable(binlogEnable);
                }
            }
            if (properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_TTL_SECONDS)) {
                Long binlogTtlSeconds = Long.parseLong(properties.get(
                        PropertyAnalyzer.PROPERTIES_BINLOG_TTL_SECONDS));
                if (binlogTtlSeconds != oldBinlogConfig.getTtlSeconds()) {
                    newBinlogConfig.setTtlSeconds(binlogTtlSeconds);
                }
            }
            if (properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_BYTES)) {
                Long binlogMaxBytes = Long.parseLong(properties.get(
                        PropertyAnalyzer.PROPERTIES_BINLOG_MAX_BYTES));
                if (binlogMaxBytes != oldBinlogConfig.getMaxBytes()) {
                    newBinlogConfig.setMaxBytes(binlogMaxBytes);
                }
            }
            if (properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS)) {
                Long binlogMaxHistoryNums = Long.parseLong(properties.get(
                        PropertyAnalyzer.PROPERTIES_BINLOG_MAX_HISTORY_NUMS));
                if (binlogMaxHistoryNums != oldBinlogConfig.getMaxHistoryNums()) {
                    newBinlogConfig.setMaxHistoryNums(binlogMaxHistoryNums);
                }
            }
        }

        boolean hasChanged = !newBinlogConfig.equals(oldBinlogConfig);
        if (!hasChanged) {
            LOG.info("table {} binlog config is same as the previous version, so nothing need to do",
                    olapTable.getName());
            return true;
        }

        // check db binlog config, if db binlog config is not same as table binlog config, throw exception
        BinlogConfig dbBinlogConfig;
        db.readLock();
        try {
            dbBinlogConfig = new BinlogConfig(db.getBinlogConfig());
        } finally {
            db.readUnlock();
        }
        boolean dbBinlogEnable = (dbBinlogConfig != null && dbBinlogConfig.isEnable());
        if (dbBinlogEnable && !newBinlogConfig.isEnable()) {
            throw new DdlException("db binlog is enable, but table binlog is disable");
        }

        LOG.info("begin to update table's binlog config. table: {}, old binlog: {}, new binlog: {}",
                olapTable.getName(), oldBinlogConfig, newBinlogConfig);


        for (Partition partition : partitions) {
            updatePartitionProperties(db, olapTable.getName(), partition.getName(), -1, -1,
                                                newBinlogConfig, null, null, -1, -1, -1);
        }

        olapTable.writeLockOrDdlException();
        try {
            Env.getCurrentEnv().updateBinlogConfig(db, olapTable, newBinlogConfig);
        } finally {
            olapTable.writeUnlock();
        }

        return false;
    }

    private void removeColumnWhenDropGeneratedColumn(Column dropColumn, Map<String, Column> nameToColumn) {
        GeneratedColumnInfo generatedColumnInfo = dropColumn.getGeneratedColumnInfo();
        if (generatedColumnInfo == null) {
            return;
        }
        String dropColName = dropColumn.getName();
        Expr expr = generatedColumnInfo.getExpr();
        Set<Expr> slotRefsInGeneratedExpr = new HashSet<>();
        expr.collect(e -> e instanceof SlotRef, slotRefsInGeneratedExpr);
        for (Expr slotRef : slotRefsInGeneratedExpr) {
            String name = ((SlotRef) slotRef).getColumnName();
            if (!nameToColumn.containsKey(name)) {
                continue;
            }
            Column c = nameToColumn.get(name);
            Set<String> sets = c.getGeneratedColumnsThatReferToThis();
            sets.remove(dropColName);
        }
    }

    private void checkOrder(List<Column> targetIndexSchema, List<String> orderedColNames) throws DdlException {
        Set<String> nameSet = new HashSet<>();
        for (Column column : targetIndexSchema) {
            if (column.isVisible() && null == column.getGeneratedColumnInfo()) {
                nameSet.add(column.getName());
            }
        }
        for (String colName : orderedColNames) {
            Column oneCol = null;
            for (Column column : targetIndexSchema) {
                if (column.getName().equalsIgnoreCase(colName) && column.isVisible()) {
                    oneCol = column;
                    break;
                }
            }
            if (oneCol == null) {
                throw new DdlException("Column[" + colName + "] not exists");
            }
            if (null == oneCol.getGeneratedColumnInfo()) {
                continue;
            }
            Expr expr = oneCol.getGeneratedColumnInfo().getExpr();
            Set<Expr> slotRefsInGeneratedExpr = new HashSet<>();
            expr.collect(e -> e instanceof SlotRef, slotRefsInGeneratedExpr);
            for (Expr slotRef : slotRefsInGeneratedExpr) {
                String slotName = ((SlotRef) slotRef).getColumnName();
                if (!nameSet.contains(slotName)) {
                    throw new DdlException("The specified column order is incorrect, `" + colName
                            + "` should come after `" + slotName
                            + "`, because both of them are generated columns, and `"
                            + colName + "` refers to `" + slotName + "`.");
                }
            }
            nameSet.add(colName);
        }
    }
}