MultiLoadMgr.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportWhereStmt;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.awaitility.Awaitility;

import java.io.StringReader;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

// Class used to record state of multi-load operation
public class MultiLoadMgr {
    private static final Logger LOG = LogManager.getLogger(MultiLoadMgr.class);

    private Map<LabelName, MultiLoadDesc> infoMap = Maps.newHashMap();
    private ReadWriteLock lock = new ReentrantReadWriteLock(true);

    // Start multi-load transaction.
    // Label is the only need parameter, maybe other properties?
    public void startMulti(String fullDbName, String label, Map<String, String> properties) throws DdlException {
        if (Strings.isNullOrEmpty(fullDbName)) {
            throw new DdlException("Database is empty");
        }

        if (Strings.isNullOrEmpty(label)) {
            throw new DdlException("Label is empty");
        }

        LoadStmt.checkProperties(properties);
        LabelName multiLabel = new LabelName(fullDbName, label);
        lock.writeLock().lock();
        try {
            if (infoMap.containsKey(multiLabel)) {
                throw new LabelAlreadyUsedException(label);
            }
            BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().build();
            List<Long> backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
            if (backendIds.isEmpty()) {
                throw new DdlException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + " policy: " + policy);
            }
            MultiLoadDesc multiLoadDesc = new MultiLoadDesc(multiLabel, properties);
            multiLoadDesc.setBackendId(backendIds.get(0));
            infoMap.put(multiLabel, multiLoadDesc);
        } finally {
            lock.writeLock().unlock();
        }
        // Register to Load after put into map.
        Env.getCurrentEnv().getLoadManager().createLoadJobV1FromMultiStart(fullDbName, label);
    }

    // Add one load job
    private void load(String fullDbName, String label,
            String subLabel, String table,
            List<Pair<String, Long>> files,
            TNetworkAddress fileAddr,
            Map<String, String> properties,
            long timestamp) throws DdlException {
        LabelName multiLabel = new LabelName(fullDbName, label);
        lock.writeLock().lock();
        try {
            MultiLoadDesc multiLoadDesc = infoMap.get(multiLabel);
            if (multiLoadDesc == null) {
                throw new DdlException("Unknown label(" + multiLabel + ")");
            }
            multiLoadDesc.addFile(subLabel, table, files, fileAddr, properties, timestamp);
        } finally {
            lock.writeLock().unlock();
        }
    }

    public void unload(String fullDbName, String label, String subLabel) throws DdlException {
        LabelName multiLabel = new LabelName(fullDbName, label);
        lock.writeLock().lock();
        try {
            MultiLoadDesc multiLoadDesc = infoMap.get(multiLabel);
            if (multiLoadDesc == null) {
                throw new DdlException("Unknown label(" + multiLabel + ")");
            }
            multiLoadDesc.delFile(subLabel);
        } finally {
            lock.writeLock().unlock();
        }
    }

    // 'db' and 'label' form a multiLabel used to
    // user can pass commitLabel which use this string commit to jobmgr
    public void commit(String fullDbName, String label) throws DdlException, UserException {
        LabelName multiLabel = new LabelName(fullDbName, label);
        List<Long> jobIds = Lists.newArrayList();
        lock.writeLock().lock();
        try {
            MultiLoadDesc multiLoadDesc = infoMap.get(multiLabel);
            if (multiLoadDesc == null) {
                throw new DdlException("Unknown label(" + multiLabel + ")");
            }
            jobIds.add(Env.getCurrentEnv().getLoadManager().createLoadJobFromStmt(multiLoadDesc.toLoadStmt()));
            infoMap.remove(multiLabel);
        } finally {
            lock.writeLock().unlock();
        }
        final long jobId = jobIds.isEmpty() ? -1 : jobIds.get(0);
        Env.getCurrentEnv().getLoadInstance().deregisterMiniLabel(fullDbName, label);
        Env env = Env.getCurrentEnv();
        ConnectContext ctx = ConnectContext.get();
        Awaitility.await().atMost(Config.broker_load_default_timeout_second, TimeUnit.SECONDS).until(() -> {
            ConnectContext.threadLocalInfo.set(ctx);
            LoadJob loadJob = env.getLoadManager().getLoadJob(jobId);
            if (loadJob.getState() == JobState.FINISHED) {
                return true;
            } else if (loadJob.getState() == JobState.PENDING || loadJob.getState() == JobState.LOADING) {
                return false;
            } else {
                throw new DdlException("job failed. ErrorMsg: " + loadJob.getFailMsg().getMsg()
                        + ", URL: " + loadJob.getLoadingStatus().getTrackingUrl()
                        + ", JobDetails: " + loadJob.getLoadStatistic().toJson());
            }
        });
    }

    // Abort a in-progress multi-load job
    public void abort(String fullDbName, String label) throws DdlException {
        LabelName multiLabel = new LabelName(fullDbName, label);
        lock.writeLock().lock();
        try {
            MultiLoadDesc multiLoadDesc = infoMap.get(multiLabel);
            if (multiLoadDesc == null) {
                throw new DdlException("Unknown label(" + multiLabel + ")");
            }
            infoMap.remove(multiLabel);
        } finally {
            lock.writeLock().unlock();
        }
        Env.getCurrentEnv().getLoadInstance().deregisterMiniLabel(fullDbName, label);
    }

    public void desc(String fullDbName, String label, List<String> subLabels) throws DdlException {
        LabelName multiLabel = new LabelName(fullDbName, label);
        lock.readLock().lock();
        try {
            MultiLoadDesc multiLoadDesc = infoMap.get(multiLabel);
            if (multiLoadDesc == null) {
                throw new DdlException("Unknown label(" + multiLabel + ")");
            }
            multiLoadDesc.listLabel(subLabels);
        } finally {
            lock.readLock().unlock();
        }
    }

    // List all in-progress labels in database.
    public void list(String fullDbName, List<String> labels) throws DdlException {
        if (Strings.isNullOrEmpty(fullDbName)) {
            throw new DdlException("No database selected");
        }
        lock.readLock().lock();
        try {
            for (LabelName label : infoMap.keySet()) {
                if (fullDbName.equals(label.getDbName())) {
                    labels.add(label.getLabelName());
                }
            }
        } finally {
            lock.readLock().unlock();
        }
    }

    public TNetworkAddress redirectAddr(String fullDbName, String label) throws DdlException {
        LabelName multiLabel = new LabelName(fullDbName, label);
        lock.writeLock().lock();
        try {
            MultiLoadDesc desc = infoMap.get(multiLabel);
            if (desc == null) {
                throw new DdlException("Unknown multiLabel(" + multiLabel + ")");
            }
            Backend backend = Env.getCurrentSystemInfo().getBackend(desc.getBackendId());
            return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
        } finally {
            lock.writeLock().unlock();
        }
    }

    // This is no protect of lock
    private static class MultiLoadDesc {
        private LabelName multiLabel;
        private Map<String, TableLoadDesc> loadDescByLabel;
        private Map<String, TableLoadDesc> loadDescByTable;
        private Long backendId;
        private Map<String, String> properties;

        public MultiLoadDesc(LabelName label, Map<String, String> properties) {
            multiLabel = label;
            loadDescByLabel = Maps.newHashMap();
            loadDescByTable = Maps.newHashMap();
            backendId = -1L;
            this.properties = properties;
        }

        public void addFile(String subLabel, String table, List<Pair<String, Long>> files,
                TNetworkAddress fileAddr,
                Map<String, String> properties,
                long timestamp) throws DdlException {

            if (isSubLabelUsed(subLabel, timestamp)) {
                // sub label is used and this is a retry request.
                // no need to do further operation, just return
                return;
            }

            TableLoadDesc desc = loadDescByLabel.get(subLabel);
            if (desc != null) {
                // Already exists
                throw new LabelAlreadyUsedException(multiLabel.getLabelName(), subLabel);
            }
            desc = loadDescByTable.get(table);
            if (desc == null) {
                desc = new TableLoadDesc(table, subLabel, files, fileAddr, properties, timestamp);
                desc.setBackendId(backendId);
                loadDescByTable.put(table, desc);
            } else {
                if (!desc.canMerge(properties)) {
                    throw new DdlException("Same table have different properties in one multi-load."
                            + "new=" + properties + ",old=" + desc.properties);
                }
                desc.addFiles(subLabel, files);
                desc.addTimestamp(timestamp);
            }
            loadDescByLabel.put(subLabel, desc);
        }

        public void delFile(String label) throws DdlException {
            TableLoadDesc desc = loadDescByLabel.get(label);
            if (desc == null) {
                throw new DdlException("Unknown load label(" + label + ")");
            }
            desc.delFiles(label);
            if (desc.isEmpty()) {
                loadDescByTable.remove(desc.tbl);
            }
            loadDescByLabel.remove(label);
        }

        public void listLabel(List<String> labels) {
            for (String label : loadDescByLabel.keySet()) {
                labels.add(label);
            }
        }

        /*
         * 1. if sub label is already used, and this is not a retry request,
         *    throw exception ("Label already used")
         * 2. if label is already used, but this is a retry request,
         *    return true
         * 3. if label is not used, return false
         * 4. throw exception if encounter error.
         */
        public boolean isSubLabelUsed(String subLabel, long timestamp) throws DdlException {
            if (loadDescByLabel.containsKey(subLabel)) {
                if (timestamp == -1) {
                    // for compatibility
                    throw new LabelAlreadyUsedException(multiLabel.getLabelName(), subLabel);
                } else {
                    TableLoadDesc tblLoadDesc = loadDescByLabel.get(subLabel);
                    if (tblLoadDesc.containsTimestamp(timestamp)) {
                        LOG.info("get a retry request with label: {}, sub label: {}, timestamp: {}. return ok",
                                multiLabel.getLabelName(), subLabel, timestamp);
                        return true;
                    } else {
                        throw new LabelAlreadyUsedException(multiLabel.getLabelName(), subLabel);
                    }
                }
            }
            return false;
        }

        public void setBackendId(long backendId) {
            this.backendId = backendId;
        }

        public long getBackendId() {
            return backendId;
        }

        public LoadStmt toLoadStmt() throws DdlException {
            LabelName commitLabel = multiLabel;

            List<DataDescription> dataDescriptions = Lists.newArrayList();
            for (TableLoadDesc desc : loadDescByTable.values()) {
                dataDescriptions.add(desc.toDataDesc());
            }
            Map<String, String> brokerProperties = Maps.newHashMap();
            brokerProperties.put(BrokerDesc.MULTI_LOAD_BROKER_BACKEND_KEY, backendId.toString());
            BrokerDesc brokerDesc = new BrokerDesc(BrokerDesc.MULTI_LOAD_BROKER, brokerProperties);

            String comment = "multi load";
            if (properties.containsKey(LoadStmt.KEY_COMMENT)) {
                comment = properties.get(LoadStmt.KEY_COMMENT);
                properties.remove(LoadStmt.KEY_COMMENT);
            }

            properties.remove(LoadStmt.KEY_COMMENT);
            LoadStmt loadStmt = new LoadStmt(commitLabel, dataDescriptions, brokerDesc, properties, comment);
            loadStmt.setEtlJobType(EtlJobType.BROKER);
            loadStmt.setOrigStmt(new OriginStatement("", 0));
            loadStmt.setUserInfo(ConnectContext.get().getCurrentUserIdentity());
            Analyzer analyzer = new Analyzer(ConnectContext.get().getEnv(), ConnectContext.get());
            try {
                loadStmt.analyze(analyzer);
            } catch (UserException e) {
                throw new DdlException(e.getMessage());
            }
            return loadStmt;
        }
    }

    public static class TableLoadDesc {
        // identity of this load
        private String tbl;
        private Map<String, List<Pair<String, Long>>> filesByLabel;
        private TNetworkAddress address;

        private Long backendId;
        private Map<String, String> properties;
        // 2 or more files may be loaded to same table with different sub labels.
        // So we use Set to save all timestamp of all different sub labels
        private Set<Long> timestamps = Sets.newHashSet();

        public TableLoadDesc(String tbl, String label, List<Pair<String, Long>> files,
                TNetworkAddress address, Map<String, String> properties,
                long timestamp) {
            this.tbl = tbl;
            this.filesByLabel = Maps.newLinkedHashMap();

            this.address = address;
            this.properties = properties;
            filesByLabel.put(label, files);
            this.timestamps.add(timestamp);
        }

        public boolean canMerge(Map<String, String> properties) {
            return Maps.difference(this.properties, properties).areEqual();
        }

        public boolean isEmpty() {
            return filesByLabel.isEmpty();
        }

        public void addFiles(String label, List<Pair<String, Long>> files) {
            filesByLabel.put(label, files);
        }

        public void delFiles(String label) {
            filesByLabel.remove(label);
        }

        public boolean containsTimestamp(long timestamp) {
            return timestamps.contains(timestamp);
        }

        public void addTimestamp(long timestamp) {
            timestamps.add(timestamp);
        }

        public Long getBackendId() {
            return backendId;
        }

        public void setBackendId(Long backendId) {
            this.backendId = backendId;
        }

        // TODO(zc):
        public DataDescription toDataDesc() throws DdlException {
            List<String> files = Lists.newArrayList();
            List<Long> fileSizes = Lists.newArrayList();
            Iterator<Map.Entry<String, List<Pair<String, Long>>>> it = filesByLabel.entrySet().iterator();
            while (it.hasNext()) {
                List<Pair<String, Long>> value = it.next().getValue();
                value.forEach(pair -> {
                    files.add(pair.first);
                    fileSizes.add(pair.second);
                });
            }
            Separator columnSeparator = null;
            PartitionNames partitionNames = null;
            String fileFormat = properties.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE);
            boolean isNegative = properties.get(LoadStmt.KEY_IN_PARAM_NEGATIVE) == null ? false :
                    Boolean.parseBoolean(properties.get(LoadStmt.KEY_IN_PARAM_NEGATIVE));
            Expr whereExpr = null;
            LoadTask.MergeType mergeType = LoadTask.MergeType.APPEND;
            Expr deleteCondition = null;
            String sequenceColName = properties.get(LoadStmt.KEY_IN_PARAM_SEQUENCE_COL);
            String colString = null;
            Backend backend = null;
            if (properties != null) {
                colString = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS);
                String columnSeparatorStr = properties.get(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR);
                if (columnSeparatorStr != null) {
                    columnSeparator = new Separator(columnSeparatorStr);
                    try {
                        columnSeparator.analyze();
                    } catch (AnalysisException e) {
                        throw new DdlException(e.getMessage());
                    }
                }
                if (properties.get(LoadStmt.KEY_IN_PARAM_PARTITIONS) != null) {
                    String[] splitPartNames = properties.get(LoadStmt.KEY_IN_PARAM_PARTITIONS).trim().split(",");
                    List<String> partNames = Arrays.stream(splitPartNames).map(String::trim)
                            .collect(Collectors.toList());
                    partitionNames = new PartitionNames(false, partNames);
                } else if (properties.get(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS) != null) {
                    String[] splitTempPartNames = properties.get(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS).trim()
                            .split(",");
                    List<String> tempPartNames = Arrays.stream(splitTempPartNames).map(String::trim)
                            .collect(Collectors.toList());
                    partitionNames = new PartitionNames(true, tempPartNames);
                }
                if (properties.get(LoadStmt.KEY_IN_PARAM_MERGE_TYPE) != null) {
                    mergeType = LoadTask.MergeType.valueOf(properties.get(LoadStmt.KEY_IN_PARAM_MERGE_TYPE));
                }
                if (properties.get(LoadStmt.KEY_IN_PARAM_WHERE) != null) {
                    whereExpr = parseWhereExpr(properties.get(LoadStmt.KEY_IN_PARAM_WHERE));
                }
                if (properties.get(LoadStmt.KEY_IN_PARAM_DELETE_CONDITION) != null) {
                    deleteCondition = parseWhereExpr(properties.get(LoadStmt.KEY_IN_PARAM_DELETE_CONDITION));
                }
            }
            DataDescription dataDescription = new DataDescription(tbl, partitionNames, files, null, columnSeparator,
                    fileFormat, null, isNegative, null, null, whereExpr, mergeType, deleteCondition,
                    sequenceColName, properties);
            dataDescription.setColumnDef(colString);
            backend = Env.getCurrentSystemInfo().getBackend(backendId);
            if (backend == null) {
                throw new DdlException("Backend [" + backendId + "] not found. ");
            }
            dataDescription.setBeAddr(new TNetworkAddress(backend.getHost(), backend.getHeartbeatPort()));
            dataDescription.setFileSize(fileSizes);
            dataDescription.setBackendId(backendId);
            return dataDescription;
        }

        private Expr parseWhereExpr(String whereString) throws DdlException {
            String whereSQL = "WHERE " + whereString;
            SqlParser parser = new SqlParser(new SqlScanner(new StringReader(whereSQL)));
            ImportWhereStmt whereStmt;
            try {
                whereStmt = (ImportWhereStmt) SqlParserUtils.getFirstStmt(parser);
            } catch (Error e) {
                LOG.warn("error happens when parsing where header, sql={}", whereSQL, e);
                throw new DdlException("failed to parsing where header, maybe contain unsupported character");
            } catch (DdlException e) {
                LOG.warn("analyze where statement failed, sql={}, error={}",
                        whereSQL, parser.getErrorMsg(whereSQL), e);
                String errorMessage = parser.getErrorMsg(whereSQL);
                if (errorMessage == null) {
                    throw e;
                } else {
                    throw new DdlException(errorMessage, e);
                }
            } catch (Exception e) {
                LOG.warn("failed to parse where header, sql={}", whereSQL, e);
                throw new DdlException("parse columns header failed", e);
            }
            return whereStmt.getExpr();
        }
    }
}