IngestionLoadJob.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load.loadv2;

import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.FailMsg;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.sparkdpp.DppResult;
import org.apache.doris.sparkdpp.EtlJobConfig;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.PushTask;
import org.apache.doris.thrift.TBrokerRangeDesc;
import org.apache.doris.thrift.TBrokerScanRange;
import org.apache.doris.thrift.TBrokerScanRangeParams;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TDescriptorTable;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TabletQuorumFailedException;
import org.apache.doris.transaction.TransactionState;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import com.google.gson.reflect.TypeToken;
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * Ingestion Load
 * </p>
 * Load data file which has been pre-processed
 * </p>
 * There are 4 steps in IngestionLoadJob:
 * Step1: Outside system execute ingestion etl job.
 * Step2: LoadEtlChecker will check ingestion etl job status periodically
 * and send push tasks to be when ingestion etl job is finished.
 * Step3: LoadLoadingChecker will check loading status periodically and commit transaction when push tasks are finished.
 * Step4: PublishVersionDaemon will send publish version tasks to be and finish transaction.
 */
public class IngestionLoadJob extends LoadJob {

    public static final Logger LOG = LogManager.getLogger(IngestionLoadJob.class);

    @Setter
    @SerializedName("ests")
    private EtlStatus etlStatus;

    // members below updated when job state changed to loading
    // { tableId.partitionId.indexId.bucket.schemaHash -> (etlFilePath, etlFileSize) }
    @SerializedName(value = "tm2fi")
    private final Map<String, Pair<String, Long>> tabletMetaToFileInfo = Maps.newHashMap();

    @SerializedName(value = "hp")
    private final Map<String, String> hadoopProperties = new HashMap<>();

    @SerializedName(value = "i2sv")
    private final Map<Long, Integer> indexToSchemaVersion = new HashMap<>();

    private final Map<Long, Integer> indexToSchemaHash = Maps.newHashMap();

    private final Map<String, Long> filePathToSize = new HashMap<>();

    private final Set<Long> finishedReplicas = Sets.newHashSet();
    private final Set<Long> quorumTablets = Sets.newHashSet();
    private final Set<Long> fullTablets = Sets.newHashSet();

    private final List<TabletCommitInfo> commitInfos = Lists.newArrayList();

    private final Map<Long, Set<Long>> tableToLoadPartitions = Maps.newHashMap();

    private final Map<Long, Map<Long, PushTask>> tabletToSentReplicaPushTask = Maps.newHashMap();

    private long etlStartTimestamp = -1;

    private long quorumFinishTimestamp = -1;

    private List<Long> loadTableIds = new ArrayList<>();

    public IngestionLoadJob() {
        super(EtlJobType.INGESTION);
    }

    public IngestionLoadJob(long dbId, String label, List<String> tableNames, UserIdentity userInfo)
            throws LoadException {
        super(EtlJobType.INGESTION, dbId, label);
        this.loadTableIds = getLoadTableIds(tableNames);
        this.userInfo = userInfo;
    }

    @Override
    public Set<String> getTableNamesForShow() {
        return Collections.emptySet();
    }

    @Override
    public Set<String> getTableNames() throws MetaNotFoundException {
        Set<String> result = Sets.newHashSet();
        Database database = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
        for (long tableId : loadTableIds) {
            Table table = database.getTableOrMetaException(tableId);
            result.add(table.getName());
        }
        return result;
    }

    @Override
    public void afterVisible(TransactionState txnState, boolean txnOperated) {
        super.afterVisible(txnState, txnOperated);
        clearJob();
    }

    @Override
    public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason)
            throws UserException {
        super.afterAborted(txnState, txnOperated, txnStatusChangeReason);
        clearJob();
    }

    @Override
    public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn, boolean needLog) {
        super.cancelJobWithoutCheck(failMsg, abortTxn, needLog);
        clearJob();
    }

    @Override
    public void cancelJob(FailMsg failMsg) throws DdlException {
        super.cancelJob(failMsg);
        clearJob();
    }

    private List<Long> getLoadTableIds(List<String> tableNames) throws LoadException {
        Database db = Env.getCurrentInternalCatalog()
                .getDbOrException(dbId, s -> new LoadException("db does not exist. id: " + s));
        List<Long> list = new ArrayList<>(tableNames.size());
        for (String tableName : tableNames) {
            OlapTable olapTable = (OlapTable) db.getTableOrException(tableName,
                    s -> new LoadException("table does not exist. id: " + s));
            list.add(olapTable.getId());
        }
        return list;
    }

    @Override
    protected long getEtlStartTimestamp() {
        return etlStartTimestamp;
    }

    public long beginTransaction()
            throws BeginTransactionException, MetaNotFoundException, AnalysisException, QuotaExceedException,
            LabelAlreadyUsedException, DuplicatedRequestException {
        this.transactionId = Env.getCurrentGlobalTransactionMgr()
                .beginTransaction(dbId, loadTableIds, label, null,
                        new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
                                FrontendOptions.getLocalHostAddress(), ExecuteEnv.getInstance().getStartupTime()),
                        TransactionState.LoadJobSourceType.FRONTEND, id, getTimeout());
        return transactionId;
    }

    public Map<String, Object> getLoadMeta(Map<String, List<String>> tableToPartitionMap)
            throws LoadException {

        if (tableToPartitionMap == null || tableToPartitionMap.isEmpty()) {
            throw new IllegalArgumentException("tableToPartitionMap is empty");
        }

        Database db = Env.getCurrentInternalCatalog()
                .getDbOrException(dbId, s -> new LoadException("db does not exist. id: " + s));
        Map<String, Object> loadMeta = new HashMap<>();
        loadMeta.put("dbId", db.getId());
        Long signature = Env.getCurrentEnv().getNextId();
        loadMeta.put("signature", signature);

        List<Table> tables;
        try {
            tables = db.getTablesOnIdOrderOrThrowException(loadTableIds);
        } catch (MetaNotFoundException e) {
            throw new LoadException(e.getMessage());
        }

        MetaLockUtils.readLockTables(tables);
        try {
            Map<String, Map<String, Object>> tableMeta = new HashMap<>(tableToPartitionMap.size());
            for (Map.Entry<String, List<String>> entry : tableToPartitionMap.entrySet()) {
                String tableName = entry.getKey();
                Map<String, Object> meta = tableMeta.getOrDefault(tableName, new HashMap<>());
                OlapTable olapTable = (OlapTable) db.getTableOrException(tableName,
                        s -> new LoadException("table does not exist. id: " + s));
                meta.put("id", olapTable.getId());
                List<EtlJobConfig.EtlIndex> indices = createEtlIndexes(olapTable);
                meta.put("indexes", indices);
                List<String> partitionNames = entry.getValue();
                Set<Long> partitionIds;
                if (partitionNames != null && !partitionNames.isEmpty()) {
                    partitionIds = new HashSet<>(partitionNames.size());
                    for (String partitionName : partitionNames) {
                        Partition partition = olapTable.getPartition(partitionName);
                        if (partition == null) {
                            throw new LoadException(String.format("partition %s is not exists", partitionName));
                        }
                        partitionIds.add(partition.getId());
                    }
                } else {
                    partitionIds =
                            olapTable.getAllPartitions().stream().map(Partition::getId).collect(Collectors.toSet());
                }
                EtlJobConfig.EtlPartitionInfo etlPartitionInfo = createEtlPartitionInfo(olapTable, partitionIds);
                meta.put("partitionInfo", etlPartitionInfo);
                tableMeta.put(tableName, meta);

                if (tableToLoadPartitions.containsKey(olapTable.getId())) {
                    tableToLoadPartitions.get(olapTable.getId()).addAll(partitionIds);
                } else {
                    tableToLoadPartitions.put(olapTable.getId(), partitionIds);
                }

            }
            loadMeta.put("tableMeta", tableMeta);
        } finally {
            MetaLockUtils.readUnlockTables(tables);
        }
        return loadMeta;

    }

    private List<EtlJobConfig.EtlIndex> createEtlIndexes(OlapTable table) throws LoadException {
        List<EtlJobConfig.EtlIndex> etlIndexes = Lists.newArrayList();

        for (Map.Entry<Long, List<Column>> entry : table.getIndexIdToSchema().entrySet()) {
            long indexId = entry.getKey();
            // todo(liheng): get schema hash and version from materialized index meta directly
            MaterializedIndexMeta indexMeta = table.getIndexMetaByIndexId(indexId);
            int schemaHash = indexMeta.getSchemaHash();
            int schemaVersion = indexMeta.getSchemaVersion();

            boolean changeAggType = table.getKeysTypeByIndexId(indexId).equals(KeysType.UNIQUE_KEYS)
                    && table.getTableProperty().getEnableUniqueKeyMergeOnWrite();

            // columns
            List<EtlJobConfig.EtlColumn> etlColumns = Lists.newArrayList();
            for (Column column : entry.getValue()) {
                etlColumns.add(createEtlColumn(column, changeAggType));
            }

            // check distribution type
            DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
            if (distributionInfo.getType() != DistributionInfo.DistributionInfoType.HASH) {
                // RANDOM not supported
                String errMsg = "Unsupported distribution type. type: " + distributionInfo.getType().name();
                LOG.warn(errMsg);
                throw new LoadException(errMsg);
            }

            // index type
            String indexType;
            KeysType keysType = table.getKeysTypeByIndexId(indexId);
            switch (keysType) {
                case DUP_KEYS:
                    indexType = "DUPLICATE";
                    break;
                case AGG_KEYS:
                    indexType = "AGGREGATE";
                    break;
                case UNIQUE_KEYS:
                    indexType = "UNIQUE";
                    break;
                default:
                    String errMsg = "unknown keys type. type: " + keysType.name();
                    LOG.warn(errMsg);
                    throw new LoadException(errMsg);
            }

            indexToSchemaVersion.put(indexId, schemaVersion);

            etlIndexes.add(new EtlJobConfig.EtlIndex(indexId, etlColumns, schemaHash, indexType,
                    indexId == table.getBaseIndexId(), schemaVersion));
        }

        return etlIndexes;
    }

    private EtlJobConfig.EtlColumn createEtlColumn(Column column, boolean changeAggType) {
        // column name
        String name = column.getName().toLowerCase(Locale.ROOT);
        // column type
        PrimitiveType type = column.getDataType();
        String columnType = column.getDataType().toString();
        // is allow null
        boolean isAllowNull = column.isAllowNull();
        // is key
        boolean isKey = column.isKey();

        // aggregation type
        String aggregationType = null;
        if (column.getAggregationType() != null) {
            if (changeAggType && !column.isKey()) {
                aggregationType = AggregateType.REPLACE.toSql();
            } else {
                aggregationType = column.getAggregationType().toString();
            }
        }

        // default value
        String defaultValue = null;
        if (column.getDefaultValue() != null) {
            defaultValue = column.getDefaultValue();
        }
        if (column.isAllowNull() && column.getDefaultValue() == null) {
            defaultValue = "\\N";
        }

        // string length
        int stringLength = 0;
        if (type.isStringType()) {
            stringLength = column.getStrLen();
        }

        // decimal precision scale
        int precision = 0;
        int scale = 0;
        if (type.isDecimalV2Type() || type.isDecimalV3Type()) {
            precision = column.getPrecision();
            scale = column.getScale();
        }

        return new EtlJobConfig.EtlColumn(name, columnType, isAllowNull, isKey, aggregationType, defaultValue,
                stringLength, precision, scale);
    }

    private EtlJobConfig.EtlPartitionInfo createEtlPartitionInfo(OlapTable table, Set<Long> partitionIds)
            throws LoadException {
        PartitionType type = table.getPartitionInfo().getType();

        List<String> partitionColumnRefs = Lists.newArrayList();
        List<EtlJobConfig.EtlPartition> etlPartitions = Lists.newArrayList();
        if (type == PartitionType.RANGE) {
            RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) table.getPartitionInfo();
            for (Column column : rangePartitionInfo.getPartitionColumns()) {
                partitionColumnRefs.add(column.getName());
            }

            for (Map.Entry<Long, PartitionItem> entry : rangePartitionInfo.getAllPartitionItemEntryList(true)) {
                long partitionId = entry.getKey();
                if (!partitionIds.contains(partitionId)) {
                    continue;
                }

                Partition partition = table.getPartition(partitionId);
                if (partition == null) {
                    throw new LoadException("partition does not exist. id: " + partitionId);
                }

                // bucket num
                int bucketNum = partition.getDistributionInfo().getBucketNum();

                // is max partition
                Range<PartitionKey> range = entry.getValue().getItems();
                boolean isMaxPartition = range.upperEndpoint().isMaxValue();

                // start keys
                List<LiteralExpr> rangeKeyExprs = range.lowerEndpoint().getKeys();
                List<Object> startKeys = Lists.newArrayList();
                for (LiteralExpr literalExpr : rangeKeyExprs) {
                    Object keyValue = literalExpr.getRealValue();
                    startKeys.add(keyValue);
                }

                // end keys
                // is empty list when max partition
                List<Object> endKeys = Lists.newArrayList();
                if (!isMaxPartition) {
                    rangeKeyExprs = range.upperEndpoint().getKeys();
                    for (LiteralExpr literalExpr : rangeKeyExprs) {
                        Object keyValue = literalExpr.getRealValue();
                        endKeys.add(keyValue);
                    }
                }

                etlPartitions.add(
                        new EtlJobConfig.EtlPartition(partitionId, startKeys, endKeys, isMaxPartition, bucketNum));
            }
        } else if (type == PartitionType.UNPARTITIONED) {
            Preconditions.checkState(partitionIds.size() == 1, "partition size must be eqauls to 1");

            for (Long partitionId : partitionIds) {
                Partition partition = table.getPartition(partitionId);
                if (partition == null) {
                    throw new LoadException("partition does not exist. id: " + partitionId);
                }

                // bucket num
                int bucketNum = partition.getDistributionInfo().getBucketNum();

                etlPartitions.add(new EtlJobConfig.EtlPartition(partitionId, Lists.newArrayList(), Lists.newArrayList(),
                        true, bucketNum));
            }
        } else {
            throw new LoadException("Spark Load does not support list partition yet");
        }

        // distribution column refs
        List<String> distributionColumnRefs = Lists.newArrayList();
        DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
        Preconditions.checkState(distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH);
        for (Column column : ((HashDistributionInfo) distributionInfo).getDistributionColumns()) {
            distributionColumnRefs.add(column.getName());
        }

        return new EtlJobConfig.EtlPartitionInfo(type.typeString, partitionColumnRefs, distributionColumnRefs,
                etlPartitions);
    }

    public void updateEtlStatus() throws Exception {

        if (!checkState(JobState.ETL) || etlStatus == null) {
            return;
        }

        writeLock();
        try {
            switch (etlStatus.getState()) {
                case FINISHED:
                    unprotectedProcessEtlFinish();
                    break;
                case CANCELLED:
                    throw new LoadException("spark etl job failed. msg: " + etlStatus.getFailMsg());
                default:
                    break;
            }
        } finally {
            writeUnlock();
        }

        if (checkState(JobState.LOADING)) {
            submitPushTasks();
        }

    }

    private boolean checkState(JobState expectState) {
        readLock();
        try {
            return state == expectState;
        } finally {
            readUnlock();
        }
    }

    private Set<Long> submitPushTasks() throws UserException {

        // check db exist
        Database db = null;
        try {
            db = getDb();
        } catch (MetaNotFoundException e) {
            String errMsg = new LogBuilder(LogKey.LOAD_JOB, id).add("database_id", dbId).add("label", label)
                    .add("error_msg", "db has been deleted when job is loading").build();
            throw new MetaNotFoundException(errMsg);
        }

        AgentBatchTask batchTask = new AgentBatchTask();
        boolean hasLoadPartitions = false;
        Set<Long> totalTablets = Sets.newHashSet();
        List<? extends TableIf> tableList = db.getTablesOnIdOrderOrThrowException(
                Lists.newArrayList(tableToLoadPartitions.keySet()));
        MetaLockUtils.readLockTables(tableList);
        try {
            writeLock();
            try {
                // check state is still loading. If state is cancelled or finished, return.
                // if state is cancelled or finished and not return,
                // this would throw all partitions have no load data exception,
                // because tableToLoadPartitions was already cleaned up,
                if (state != JobState.LOADING) {
                    LOG.warn("job state is not loading. job id: {}, state: {}", id, state);
                    return totalTablets;
                }

                for (TableIf table : tableList) {
                    Set<Long> partitionIds = tableToLoadPartitions.get(table.getId());
                    OlapTable olapTable = (OlapTable) table;
                    for (long partitionId : partitionIds) {
                        Partition partition = olapTable.getPartition(partitionId);
                        if (partition == null) {
                            throw new LoadException("partition does not exist. id: " + partitionId);
                        }

                        hasLoadPartitions = true;
                        int quorumReplicaNum =
                                olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum() / 2
                                        + 1;

                        List<MaterializedIndex> indexes = partition.getMaterializedIndices(
                                MaterializedIndex.IndexExtState.ALL);
                        for (MaterializedIndex index : indexes) {
                            long indexId = index.getId();
                            MaterializedIndexMeta indexMeta = olapTable.getIndexMetaByIndexId(indexId);
                            int schemaVersion = indexMeta.getSchemaVersion();
                            int schemaHash = indexMeta.getSchemaHash();

                            // check schemaHash and schemaVersion whether is changed
                            checkIndexSchema(indexId, schemaHash, schemaVersion);

                            int bucket = 0;
                            for (Tablet tablet : index.getTablets()) {
                                long tabletId = tablet.getId();
                                totalTablets.add(tabletId);
                                Set<Long> tabletAllReplicas = Sets.newHashSet();
                                Set<Long> tabletFinishedReplicas = Sets.newHashSet();
                                for (Replica replica : tablet.getReplicas()) {
                                    long replicaId = replica.getId();
                                    tabletAllReplicas.add(replicaId);
                                    if (!tabletToSentReplicaPushTask.containsKey(tabletId)
                                            || !tabletToSentReplicaPushTask.get(tabletId).containsKey(replicaId)) {
                                        long backendId = replica.getBackendId();
                                        long taskSignature = Env.getCurrentGlobalTransactionMgr()
                                                .getNextTransactionId();

                                        PushTask pushTask =
                                                buildPushTask(backendId, olapTable, taskSignature, partitionId, indexId,
                                                        tabletId, replicaId, schemaHash, schemaVersion, bucket++);
                                        if (AgentTaskQueue.addTask(pushTask)) {
                                            batchTask.addTask(pushTask);
                                            if (!tabletToSentReplicaPushTask.containsKey(tabletId)) {
                                                tabletToSentReplicaPushTask.put(tabletId, Maps.newHashMap());
                                            }
                                            tabletToSentReplicaPushTask.get(tabletId).put(replicaId, pushTask);
                                        }
                                    }

                                    if (finishedReplicas.contains(replicaId) && replica.getLastFailedVersion() < 0) {
                                        tabletFinishedReplicas.add(replicaId);
                                    }
                                }

                                if (tabletAllReplicas.isEmpty()) {
                                    LOG.error("invalid situation. tablet is empty. id: {}", tabletId);
                                }

                                // check tablet push states
                                if (tabletFinishedReplicas.size() >= quorumReplicaNum) {
                                    quorumTablets.add(tabletId);
                                    if (tabletFinishedReplicas.size() == tabletAllReplicas.size()) {
                                        fullTablets.add(tabletId);
                                    }
                                }
                            }
                        }
                    }
                }

                if (batchTask.getTaskNum() > 0) {
                    AgentTaskExecutor.submit(batchTask);
                }

                if (!hasLoadPartitions) {
                    String errMsg = new LogBuilder(LogKey.LOAD_JOB, id).add("database_id", dbId).add("label", label)
                            .add("error_msg", "all partitions have no load data").build();
                    throw new LoadException(errMsg);
                }

                return totalTablets;
            } finally {
                writeUnlock();
            }
        } finally {
            MetaLockUtils.readUnlockTables(tableList);
        }

    }

    public void updateJobStatus(Map<String, String> statusInfo) {

        updateState(statusInfo.get("status"), statusInfo.get("msg"));

        etlStatus.setTrackingUrl(statusInfo.get("appId"));
        etlStatus.setProgress(progress);

        if (etlStatus.getState() == TEtlState.FINISHED) {
            Gson gson = new Gson();
            DppResult dppResult = gson.fromJson(statusInfo.get("dppResult"), DppResult.class);
            loadStatistic.fileNum = (int) dppResult.fileNumber;
            loadStatistic.totalFileSizeB = dppResult.fileSize;
            TUniqueId dummyId = new TUniqueId(0, 0);
            long dummyBackendId = -1L;
            loadStatistic.initLoad(dummyId, Sets.newHashSet(dummyId), Lists.newArrayList(dummyBackendId));
            loadStatistic.updateLoadProgress(dummyBackendId, dummyId, dummyId, dppResult.scannedRows,
                    dppResult.scannedBytes, true);
            loadingStatus.setDppResult(dppResult);
            Map<String, String> counters = loadingStatus.getCounters();
            counters.put(DPP_NORMAL_ALL, String.valueOf(dppResult.normalRows));
            counters.put(DPP_ABNORMAL_ALL, String.valueOf(dppResult.abnormalRows));
            counters.put(UNSELECTED_ROWS, String.valueOf(dppResult.unselectRows));
            filePathToSize.putAll(
                    gson.fromJson(statusInfo.get("filePathToSize"), new TypeToken<HashMap<String, Long>>() {
                    }));
            hadoopProperties.putAll(
                    gson.fromJson(statusInfo.get("hadoopProperties"), new TypeToken<HashMap<String, String>>() {
                    }));
        }

    }

    private void updateState(String stateStr, String msg) {

        switch (stateStr.toLowerCase()) {
            case "running":
                etlStatus.setState(TEtlState.RUNNING);
                break;
            case "success":
                etlStatus.setState(TEtlState.FINISHED);
                break;
            case "failed":
                boolean res = etlStatus.setState(TEtlState.CANCELLED);
                if (!res) {
                    etlStatus = new EtlStatus();
                    etlStatus.setState(TEtlState.CANCELLED);
                }
                etlStatus.setFailMsg(msg);
                break;
            default:
                etlStatus.setState(TEtlState.UNKNOWN);
                break;
        }

    }

    public void startEtlJob() {
        etlStartTimestamp = System.currentTimeMillis();
        state = JobState.ETL;
        etlStatus = new EtlStatus();
        unprotectedLogUpdateStateInfo();
    }

    private void unprotectedUpdateToLoadingState(EtlStatus etlStatus, Map<String, Long> filePathToSize)
            throws LoadException {
        try {
            for (Map.Entry<String, Long> entry : filePathToSize.entrySet()) {
                String filePath = entry.getKey();
                if (!filePath.endsWith(EtlJobConfig.ETL_OUTPUT_FILE_FORMAT)) {
                    continue;
                }
                String tabletMetaStr = EtlJobConfig.getTabletMetaStr(filePath);
                tabletMetaToFileInfo.put(tabletMetaStr, Pair.of(filePath, entry.getValue()));
            }

            loadingStatus = etlStatus;
            progress = 0;
            Env.getCurrentProgressManager().registerProgressSimple(String.valueOf(id));
            unprotectedUpdateState(JobState.LOADING);
            LOG.info("update to {} state success. job id: {}", state, id);
        } catch (Exception e) {
            LOG.warn("update to {} state failed. job id: {}", state, id, e);
            throw new LoadException(e.getMessage(), e);
        }
    }

    private void unprotectedPrepareLoadingInfos() {
        for (String tabletMetaStr : tabletMetaToFileInfo.keySet()) {
            String[] fileNameArr = tabletMetaStr.split("\\.");
            // tableId.partitionId.indexId.bucket.schemaHash
            Preconditions.checkState(fileNameArr.length == 5);
            long tableId = Long.parseLong(fileNameArr[0]);
            long partitionId = Long.parseLong(fileNameArr[1]);
            long indexId = Long.parseLong(fileNameArr[2]);
            int schemaHash = Integer.parseInt(fileNameArr[4]);

            if (!tableToLoadPartitions.containsKey(tableId)) {
                tableToLoadPartitions.put(tableId, Sets.newHashSet());
            }
            tableToLoadPartitions.get(tableId).add(partitionId);

            indexToSchemaHash.put(indexId, schemaHash);
        }
    }

    private void unprotectedProcessEtlFinish() throws Exception {
        // checkDataQuality
        if (!checkDataQuality()) {
            throw new DataQualityException(DataQualityException.QUALITY_FAIL_MSG);
        }

        // get etl output files and update loading state
        unprotectedUpdateToLoadingState(etlStatus, filePathToSize);
        // log loading state
        unprotectedLogUpdateStateInfo();
        // prepare loading infos
        unprotectedPrepareLoadingInfos();
    }

    private TBrokerScanRange getTBrokerScanRange(DescriptorTable descTable, TupleDescriptor destTupleDesc,
                                                 List<Column> columns, Map<String, String> properties)
            throws AnalysisException {

        TBrokerScanRange brokerScanRange = new TBrokerScanRange();

        TBrokerScanRangeParams params = new TBrokerScanRangeParams();
        params.setStrictMode(false);
        params.setProperties(properties);
        TupleDescriptor srcTupleDesc = descTable.createTupleDescriptor();
        Map<String, SlotDescriptor> srcSlotDescByName = Maps.newHashMap();
        for (Column column : columns) {
            SlotDescriptor srcSlotDesc = descTable.addSlotDescriptor(srcTupleDesc);
            srcSlotDesc.setIsMaterialized(true);
            srcSlotDesc.setIsNullable(true);

            if (column.getDataType() == PrimitiveType.BITMAP) {
                // cast to bitmap when the target column type is bitmap
                srcSlotDesc.setType(ScalarType.createType(PrimitiveType.BITMAP));
                srcSlotDesc.setColumn(new Column(column.getName(), PrimitiveType.BITMAP));
            } else {
                srcSlotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR));
                srcSlotDesc.setColumn(new Column(column.getName(), PrimitiveType.VARCHAR));
            }

            params.addToSrcSlotIds(srcSlotDesc.getId().asInt());
            srcSlotDescByName.put(column.getName(), srcSlotDesc);
        }

        Map<Integer, Integer> destSidToSrcSidWithoutTrans = Maps.newHashMap();
        for (SlotDescriptor destSlotDesc : destTupleDesc.getSlots()) {
            if (!destSlotDesc.isMaterialized()) {
                continue;
            }

            SlotDescriptor srcSlotDesc = srcSlotDescByName.get(destSlotDesc.getColumn().getName());
            destSidToSrcSidWithoutTrans.put(destSlotDesc.getId().asInt(), srcSlotDesc.getId().asInt());
            Expr expr = new SlotRef(srcSlotDesc);
            expr = castToSlot(destSlotDesc, expr);
            params.putToExprOfDestSlot(destSlotDesc.getId().asInt(), expr.treeToThrift());
        }
        params.setDestSidToSrcSidWithoutTrans(destSidToSrcSidWithoutTrans);
        params.setSrcTupleId(srcTupleDesc.getId().asInt());
        params.setDestTupleId(destTupleDesc.getId().asInt());
        brokerScanRange.setParams(params);

        // broker address updated for each replica
        brokerScanRange.setBrokerAddresses(Lists.newArrayList());

        // broker range desc
        TBrokerRangeDesc tBrokerRangeDesc = new TBrokerRangeDesc();
        tBrokerRangeDesc.setFileType(TFileType.FILE_HDFS);
        tBrokerRangeDesc.setFormatType(TFileFormatType.FORMAT_PARQUET);
        tBrokerRangeDesc.setSplittable(false);
        tBrokerRangeDesc.setStartOffset(0);
        tBrokerRangeDesc.setSize(-1);
        // path and file size updated for each replica
        brokerScanRange.setRanges(Collections.singletonList(tBrokerRangeDesc));

        return brokerScanRange;

    }

    private Expr castToSlot(SlotDescriptor slotDesc, Expr expr) throws AnalysisException {
        PrimitiveType dstType = slotDesc.getType().getPrimitiveType();
        PrimitiveType srcType = expr.getType().getPrimitiveType();
        if (dstType == PrimitiveType.BOOLEAN && srcType == PrimitiveType.VARCHAR) {
            // there is no cast VARCHAR to BOOLEAN function,
            // so we cast VARCHAR to TINYINT first, then cast TINYINT to BOOLEAN
            return new CastExpr(Type.BOOLEAN, new CastExpr(Type.TINYINT, expr));
        }
        if (dstType != srcType) {
            return expr.castTo(slotDesc.getType());
        }
        return expr;
    }

    private TDescriptorTable getTDescriptorTable(DescriptorTable descTable) {
        descTable.computeStatAndMemLayout();
        return descTable.toThrift();
    }

    private PushTask buildPushTask(long backendId, OlapTable olapTable, long taskSignature, long partitionId,
                                   long indexId, long tabletId, long replicaId, int schemaHash, int schemaVersion,
                                   long bucket)
            throws AnalysisException {

        DescriptorTable descTable = new DescriptorTable();
        TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();

        List<TColumn> columnsDesc = new ArrayList<>();
        List<Column> columns = new ArrayList<>();
        for (Column column : olapTable.getSchemaByIndexId(indexId)) {
            Column col = new Column(column);
            col.setName(column.getName().toLowerCase(Locale.ROOT));
            columns.add(col);
            columnsDesc.add(col.toThrift());
            // use index schema to fill the descriptor table
            SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
            destSlotDesc.setIsMaterialized(true);
            destSlotDesc.setColumn(col);
            destSlotDesc.setIsNullable(col.isAllowNull());
        }

        // deep copy TBrokerScanRange because filePath and fileSize will be updated
        // in different tablet push task
        TBrokerScanRange tBrokerScanRange =
                getTBrokerScanRange(descTable, destTupleDesc, columns, hadoopProperties);
        // update filePath fileSize
        TBrokerRangeDesc tBrokerRangeDesc = tBrokerScanRange.getRanges().get(0);
        tBrokerRangeDesc.setFileType(TFileType.FILE_HDFS);
        tBrokerRangeDesc.setPath("");
        tBrokerRangeDesc.setFileSize(-1);
        String tabletMetaStr = String.format("%d.%d.%d.%d.%d", olapTable.getId(), partitionId,
                indexId, bucket, schemaHash);
        if (tabletMetaToFileInfo.containsKey(tabletMetaStr)) {
            Pair<String, Long> fileInfo = tabletMetaToFileInfo.get(tabletMetaStr);
            tBrokerRangeDesc.setPath(fileInfo.first);
            tBrokerRangeDesc.setFileSize(fileInfo.second);
        }

        TDescriptorTable tDescriptorTable = getTDescriptorTable(descTable);

        return new PushTask(backendId, dbId, olapTable.getId(),
                partitionId, indexId, tabletId, replicaId, schemaHash, 0, id,
                TPushType.LOAD_V2, TPriority.NORMAL, transactionId, taskSignature,
                tBrokerScanRange, tDescriptorTable, columnsDesc,
                olapTable.getStorageVaultId(), schemaVersion);
    }

    public void updateLoadingStatus() throws UserException {
        if (!checkState(JobState.LOADING)) {
            return;
        }

        if (etlStatus.getState() == TEtlState.CANCELLED) {
            throw new LoadException(etlStatus.getFailMsg());
        }

        // submit push tasks
        Set<Long> totalTablets = submitPushTasks();
        if (totalTablets.isEmpty()) {
            LOG.warn("total tablets set is empty. job id: {}, state: {}", id, state);
            return;
        }

        // update status
        boolean canCommitJob = false;
        writeLock();
        try {
            // loading progress
            // 100: txn status is visible and load has been finished
            progress = fullTablets.size() * 100 / totalTablets.size();
            if (progress == 100) {
                progress = 99;
            }

            // quorum finish ts
            if (quorumFinishTimestamp < 0 && quorumTablets.containsAll(totalTablets)) {
                quorumFinishTimestamp = System.currentTimeMillis();
            }

            // if all replicas are finished or stay in quorum finished for long time, try to commit it.
            long stragglerTimeout = 300 * 1000;
            if ((quorumFinishTimestamp > 0 && System.currentTimeMillis() - quorumFinishTimestamp > stragglerTimeout)
                    || fullTablets.containsAll(totalTablets)) {
                canCommitJob = true;
            }
        } finally {
            writeUnlock();
        }

        // try commit transaction
        if (canCommitJob) {
            tryCommitJob();
        }
    }

    private void tryCommitJob() throws UserException {
        LOG.info(new LogBuilder(LogKey.LOAD_JOB, id).add("txn_id", transactionId)
                .add("msg", "Load job try to commit txn").build());
        Database db = getDb();
        List<Table> tableList = db.getTablesOnIdOrderOrThrowException(
                Lists.newArrayList(tableToLoadPartitions.keySet()));
        MetaLockUtils.writeLockTablesOrMetaException(tableList);
        try {
            Env.getCurrentGlobalTransactionMgr().commitTransactionWithoutLock(
                    dbId, tableList, transactionId, commitInfos,
                    new LoadJobFinalOperation(id, loadingStatus, progress, loadStartTimestamp,
                            finishTimestamp, state, failMsg));
        } catch (TabletQuorumFailedException e) {
            // retry in next loop
        } finally {
            MetaLockUtils.writeUnlockTables(tableList);
        }
    }

    public void addFinishedReplica(long replicaId, long tabletId, long backendId) {
        writeLock();
        try {
            if (finishedReplicas.add(replicaId)) {
                commitInfos.add(new TabletCommitInfo(tabletId, backendId));
                // set replica push task null
                Map<Long, PushTask> sentReplicaPushTask = tabletToSentReplicaPushTask.get(tabletId);
                if (sentReplicaPushTask != null) {
                    if (sentReplicaPushTask.containsKey(replicaId)) {
                        sentReplicaPushTask.put(replicaId, null);
                    }
                }
            }
        } finally {
            writeUnlock();
        }
    }

    private void clearJob() {
        Preconditions.checkState(state == JobState.FINISHED || state == JobState.CANCELLED);

        if (LOG.isDebugEnabled()) {
            LOG.debug("clear push tasks and infos that not persist. id: {}, state: {}", id, state);
        }
        writeLock();
        try {
            // clear push task first
            for (Map<Long, PushTask> sentReplicaPushTask : tabletToSentReplicaPushTask.values()) {
                for (PushTask pushTask : sentReplicaPushTask.values()) {
                    if (pushTask == null) {
                        continue;
                    }
                    AgentTaskQueue.removeTask(pushTask.getBackendId(), pushTask.getTaskType(), pushTask.getSignature());
                }
            }
            tableToLoadPartitions.clear();
            indexToSchemaHash.clear();
            tabletToSentReplicaPushTask.clear();
            finishedReplicas.clear();
            quorumTablets.clear();
            fullTablets.clear();

            Env.getCurrentProgressManager().removeProgress(String.valueOf(progress));
        } finally {
            writeUnlock();
        }
    }

    private void unprotectedLogUpdateStateInfo() {
        IngestionLoadJobStateUpdateInfo info =
                new IngestionLoadJobStateUpdateInfo(id, state, transactionId, etlStartTimestamp, loadStartTimestamp,
                        etlStatus, tabletMetaToFileInfo, hadoopProperties, indexToSchemaVersion);
        Env.getCurrentEnv().getEditLog().logUpdateLoadJob(info);
    }

    public static class IngestionLoadJobStateUpdateInfo extends LoadJobStateUpdateInfo {

        @SerializedName(value = "etlStartTimestamp")
        private long etlStartTimestamp;
        @SerializedName(value = "etlStatus")
        private EtlStatus etlStatus;
        @SerializedName(value = "tabletMetaToFileInfo")
        private Map<String, Pair<String, Long>> tabletMetaToFileInfo;
        @SerializedName(value = "hadoopProperties")
        private Map<String, String> hadoopProperties;
        @SerializedName(value = "indexToSchemaVersion")
        private Map<Long, Integer> indexToSchemaVersion;

        public IngestionLoadJobStateUpdateInfo(long jobId, JobState state, long transactionId,
                                               long etlStartTimestamp, long loadStartTimestamp, EtlStatus etlStatus,
                                               Map<String, Pair<String, Long>> tabletMetaToFileInfo,
                                               Map<String, String> hadoopProperties,
                                               Map<Long, Integer> indexToSchemaVersion) {
            super(jobId, state, transactionId, loadStartTimestamp);
            this.etlStartTimestamp = etlStartTimestamp;
            this.etlStatus = etlStatus;
            this.tabletMetaToFileInfo = tabletMetaToFileInfo;
            this.hadoopProperties = hadoopProperties;
            this.indexToSchemaVersion = indexToSchemaVersion;
        }

        public long getEtlStartTimestamp() {
            return etlStartTimestamp;
        }

        public EtlStatus getEtlStatus() {
            return etlStatus;
        }

        public Map<String, Pair<String, Long>> getTabletMetaToFileInfo() {
            return tabletMetaToFileInfo;
        }

        public Map<String, String> getHadoopProperties() {
            return hadoopProperties;
        }

        public Map<Long, Integer> getIndexToSchemaVersion() {
            return indexToSchemaVersion;
        }
    }

    @Override
    public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) {
        super.replayUpdateStateInfo(info);
        IngestionLoadJobStateUpdateInfo stateUpdateInfo = (IngestionLoadJobStateUpdateInfo) info;
        this.etlStartTimestamp = stateUpdateInfo.getEtlStartTimestamp();
        this.etlStatus = stateUpdateInfo.getEtlStatus();
        if (stateUpdateInfo.getTabletMetaToFileInfo() != null) {
            this.tabletMetaToFileInfo.putAll(stateUpdateInfo.getTabletMetaToFileInfo());
        }
        if (stateUpdateInfo.getHadoopProperties() != null) {
            this.hadoopProperties.putAll(stateUpdateInfo.getHadoopProperties());
        }
        if (stateUpdateInfo.getIndexToSchemaVersion() != null) {
            this.indexToSchemaVersion.putAll(stateUpdateInfo.getIndexToSchemaVersion());
        }
        switch (state) {
            case ETL:
                break;
            case LOADING:
                unprotectedPrepareLoadingInfos();
                break;
            default:
                LOG.warn("replay update load job state info failed. error: wrong state. job id: {}, state: {}", id,
                        state);
                break;
        }
    }

    @Override
    protected void readFields(DataInput in) throws IOException {
        super.readFields(in);
        this.etlStartTimestamp = in.readLong();
        this.etlStatus = new EtlStatus();
        this.etlStatus.readFields(in);
        int size = in.readInt();
        for (int i = 0; i < size; i++) {
            String tabletMetaStr = Text.readString(in);
            Pair<String, Long> fileInfo = Pair.of(Text.readString(in), in.readLong());
            tabletMetaToFileInfo.put(tabletMetaStr, fileInfo);
        }
        size = in.readInt();
        for (int i = 0; i < size; i++) {
            String propKey = Text.readString(in);
            String propValue = Text.readString(in);
            hadoopProperties.put(propKey, propValue);
        }
        size = in.readInt();
        for (int i = 0; i < size; i++) {
            indexToSchemaVersion.put(in.readLong(), in.readInt());
        }
    }

    private void checkIndexSchema(long indexId, int schemaHash, int schemaVersion) throws LoadException {
        if (indexToSchemaHash.containsKey(indexId) && indexToSchemaHash.get(indexId) == schemaHash
                && indexToSchemaVersion.containsKey(indexId) && indexToSchemaVersion.get(indexId) == schemaVersion) {
            return;
        }
        throw new LoadException(
                "schema of index [" + indexId + "] has changed, old schemaHash: " + indexToSchemaHash.get(indexId)
                        + ", current schemaHash: " + schemaHash + ", old schemaVersion: "
                        + indexToSchemaVersion.get(indexId) + ", current schemaVersion: " + schemaVersion);
    }

}