LoadingTaskPlanner.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load.loadv2;

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.FileLoadScanNode;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TPartialUpdateNewRowPolicy;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class LoadingTaskPlanner {
    private static final Logger LOG = LogManager.getLogger(LoadingTaskPlanner.class);

    // Input params
    private final long loadJobId;
    private final long txnId;
    private final long dbId;
    private final OlapTable table;
    private final BrokerDesc brokerDesc;
    private final List<BrokerFileGroup> fileGroups;
    private final boolean strictMode;
    private final boolean isPartialUpdate;
    private final TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy;
    private final long timeoutS; // timeout of load job, in second
    private final int loadParallelism;
    private final int sendBatchParallelism;
    private final boolean useNewLoadScanNode;
    private final boolean singleTabletLoadPerSink;
    private final boolean enableMemtableOnSinkNode;
    private UserIdentity userInfo;
    // Something useful
    // ConnectContext here is just a dummy object to avoid some NPE problem, like ctx.getDatabase()
    private Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), new ConnectContext());
    private DescriptorTable descTable = analyzer.getDescTbl();

    // Output params
    private List<PlanFragment> fragments = Lists.newArrayList();
    private List<ScanNode> scanNodes = Lists.newArrayList();

    private int nextNodeId = 0;

    public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table,
            BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
            boolean strictMode, boolean isPartialUpdate, TPartialUpdateNewRowPolicy partialUpdateNewKeyPolicy,
            String timezone, long timeoutS, int loadParallelism,
            int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity userInfo,
            boolean singleTabletLoadPerSink, boolean enableMemtableOnSinkNode) {
        this.loadJobId = loadJobId;
        this.txnId = txnId;
        this.dbId = dbId;
        this.table = table;
        this.brokerDesc = brokerDesc;
        this.fileGroups = brokerFileGroups;
        this.strictMode = strictMode;
        this.isPartialUpdate = isPartialUpdate;
        this.analyzer.setTimezone(timezone);
        this.partialUpdateNewKeyPolicy = partialUpdateNewKeyPolicy;
        this.timeoutS = timeoutS;
        this.loadParallelism = loadParallelism;
        this.sendBatchParallelism = sendBatchParallelism;
        this.useNewLoadScanNode = useNewLoadScanNode;
        this.userInfo = userInfo;
        this.singleTabletLoadPerSink = singleTabletLoadPerSink;
        this.enableMemtableOnSinkNode = enableMemtableOnSinkNode;
        if (Env.getCurrentEnv().getAccessManager()
                .checkDbPriv(userInfo, InternalCatalog.INTERNAL_CATALOG_NAME,
                        Env.getCurrentInternalCatalog().getDbNullable(dbId).getFullName(),
                        PrivPredicate.SELECT)) {
            this.analyzer.setUDFAllowed(true);
        } else {
            this.analyzer.setUDFAllowed(Config.enable_udf_in_load);
        }
    }

    public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded)
            throws UserException {
        // Generate tuple descriptor
        TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
        TupleDescriptor scanTupleDesc = destTupleDesc;
        scanTupleDesc = descTable.createTupleDescriptor("ScanTuple");
        if (isPartialUpdate && !table.getEnableUniqueKeyMergeOnWrite()) {
            throw new UserException("Only unique key merge on write support partial update");
        }
        if (isPartialUpdate && table.isUniqKeyMergeOnWriteWithClusterKeys()) {
            throw new UserException("Only unique key merge on write without cluster keys support partial update");
        }

        HashSet<String> partialUpdateInputColumns = new HashSet<>();
        if (isPartialUpdate) {
            for (Column col : table.getFullSchema()) {
                boolean existInExpr = false;
                for (ImportColumnDesc importColumnDesc : fileGroups.get(0).getColumnExprList()) {
                    if (importColumnDesc.getColumnName() != null
                            && importColumnDesc.getColumnName().equals(col.getName())) {
                        if (!col.isVisible() && !Column.DELETE_SIGN.equals(col.getName())) {
                            throw new UserException("Partial update should not include invisible column except"
                                    + " delete sign column: " + col.getName());
                        }
                        partialUpdateInputColumns.add(col.getName());
                        existInExpr = true;
                        break;
                    }
                }
                if (col.isKey() && !existInExpr) {
                    throw new UserException("Partial update should include all key columns, missing: " + col.getName());
                }
            }
        }

        // use full schema to fill the descriptor table
        for (Column col : table.getFullSchema()) {
            if (isPartialUpdate && !partialUpdateInputColumns.contains(col.getName())) {
                continue;
            }
            SlotDescriptor slotDesc = descTable.addSlotDescriptor(destTupleDesc);
            slotDesc.setIsMaterialized(true);
            slotDesc.setColumn(col);
            slotDesc.setIsNullable(col.isAllowNull());
            slotDesc.setAutoInc(col.isAutoInc());
            SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc);
            scanSlotDesc.setIsMaterialized(true);
            scanSlotDesc.setColumn(col);
            scanSlotDesc.setIsNullable(col.isAllowNull());
            scanSlotDesc.setAutoInc(col.isAutoInc());
            if (col.isAutoInc()) {
                // auto-increment column should be non-nullable
                // however, here we use `NullLiteral` to indicate that a cell should
                // be filled with generated value in `VOlapTableSink::_fill_auto_inc_cols()`
                scanSlotDesc.setIsNullable(true);
            }
            if (fileGroups.size() > 0) {
                for (ImportColumnDesc importColumnDesc : fileGroups.get(0).getColumnExprList()) {
                    try {
                        if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null
                                && importColumnDesc.getColumnName().equals(col.getName())) {
                            scanSlotDesc.setIsNullable(importColumnDesc.getExpr().isNullable());
                            break;
                        }
                    } catch (Exception e) {
                        // An exception may be thrown here because the `importColumnDesc.getExpr()` is not analyzed
                        // now. We just skip this case here.
                    }
                }
            }
        }

        // analyze expr in whereExpr before rewrite
        scanTupleDesc.setTable(table);
        analyzer.registerTupleDescriptor(scanTupleDesc);
        for (BrokerFileGroup fileGroup : fileGroups) {
            if (fileGroup.getWhereExpr() != null) {
                fileGroup.getWhereExpr().analyze(analyzer);
            }
        }

        // Generate plan trees
        // 1. Broker scan node
        ScanNode scanNode;
        scanNode = new FileLoadScanNode(new PlanNodeId(nextNodeId++), scanTupleDesc);
        ((FileLoadScanNode) scanNode).setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups,
                fileStatusesList, filesAdded, strictMode, loadParallelism, userInfo);
        scanNode.init(analyzer);
        scanNode.finalize(analyzer);
        scanNodes.add(scanNode);
        descTable.computeStatAndMemLayout();

        // 2. Olap table sink
        List<Long> partitionIds = getAllPartitionIds();
        final boolean enableSingleReplicaLoad = this.enableMemtableOnSinkNode
                ? false : Config.enable_single_replica_load;
        OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, partitionIds,
                enableSingleReplicaLoad);
        long txnTimeout = timeoutS == 0 ? ConnectContext.get().getExecTimeout() : timeoutS;
        olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, singleTabletLoadPerSink, strictMode,
                txnTimeout);
        olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns);
        if (isPartialUpdate) {
            olapTableSink.setPartialUpdateNewRowPolicy(partialUpdateNewKeyPolicy);
        }
        olapTableSink.complete(analyzer);

        // 3. Plan fragment
        PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(0), scanNode, DataPartition.RANDOM);
        sinkFragment.setParallelExecNum(loadParallelism);
        sinkFragment.setSink(olapTableSink);

        fragments.add(sinkFragment);

        // 4. finalize
        for (PlanFragment fragment : fragments) {
            fragment.finalize(null);
        }
        Collections.reverse(fragments);
    }

    public DescriptorTable getDescTable() {
        return descTable;
    }

    public List<PlanFragment> getFragments() {
        return fragments;
    }

    public List<ScanNode> getScanNodes() {
        return scanNodes;
    }

    public String getTimezone() {
        return analyzer.getTimezone();
    }

    private List<Long> getAllPartitionIds() throws LoadException, MetaNotFoundException {
        Set<Long> specifiedPartitionIds = Sets.newHashSet();
        for (BrokerFileGroup brokerFileGroup : fileGroups) {
            if (brokerFileGroup.getPartitionIds() != null) {
                for (long partitionId : brokerFileGroup.getPartitionIds()) {
                    if (!table.getPartitionInfo().getIsMutable(partitionId)) {
                        throw new LoadException("Can't load data to immutable partition, table: "
                            + table.getName() + ", partition: " + table.getPartition(partitionId));
                    }
                }
                specifiedPartitionIds.addAll(brokerFileGroup.getPartitionIds());
            }
            // all file group in fileGroups should have same partitions, so only need to get partition ids
            // from one of these file groups
            break;
        }
        if (specifiedPartitionIds.isEmpty()) {
            return null;
        }
        return Lists.newArrayList(specifiedPartitionIds);
    }

    // when retry load by reusing this plan in load process, the load_id should be changed
    public void updateLoadId(TUniqueId loadId) {
        for (PlanFragment planFragment : fragments) {
            if (!(planFragment.getSink() instanceof OlapTableSink)) {
                continue;
            }
            OlapTableSink olapTableSink = (OlapTableSink) planFragment.getSink();
            olapTableSink.updateLoadId(loadId);
        }

        LOG.info("update olap table sink's load id to {}, job: {}", DebugUtil.printId(loadId), loadJobId);
    }
}