CopyLoadPendingTask.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cloud.load;

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.datasource.CloudInternalCatalog;
import org.apache.doris.cloud.proto.Cloud.CopyJobPB;
import org.apache.doris.cloud.proto.Cloud.CopyJobPB.JobStatus;
import org.apache.doris.cloud.proto.Cloud.ObjectFilePB;
import org.apache.doris.cloud.stage.StageUtil;
import org.apache.doris.cloud.storage.ListObjectsResult;
import org.apache.doris.cloud.storage.ObjectFile;
import org.apache.doris.cloud.storage.RemoteBase;
import org.apache.doris.cloud.storage.RemoteBase.ObjectInfo;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
import org.apache.doris.load.loadv2.BrokerLoadPendingTask;
import org.apache.doris.load.loadv2.BrokerPendingTaskAttachment;
import org.apache.doris.thrift.TBrokerFileStatus;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.nio.file.PathMatcher;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;

public class CopyLoadPendingTask extends BrokerLoadPendingTask {
    private static final Logger LOG = LogManager.getLogger(CopyLoadPendingTask.class);
    private static final String NO_FILES_ERROR_MSG = "No files can be copied";
    private Map<FileGroupAggKey, List<List<Pair<TBrokerFileStatus, ObjectFilePB>>>> fileStatusMap = Maps.newHashMap();

    private int matchedFileNum = 0;
    private int loadedFileNum = 0;
    private String reachLimitStr = "";
    // BeginCopy may have been executed if replay when FE is restarted
    private boolean isBeginCopyDone = false;

    public CopyLoadPendingTask(CopyJob loadTaskCallback,
            Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToBrokerFileGroups, BrokerDesc brokerDesc) {
        super(loadTaskCallback, aggKeyToBrokerFileGroups, brokerDesc, Priority.NORMAL);
        retryTime = 0;
    }

    @Override
    public void executeTask() throws UserException {
        super.executeTask(); // get all files and begin txn
        if (!isBeginCopyDone) {
            beginCopy((BrokerPendingTaskAttachment) attachment);
        }
        ((CopyJob) callback).setSelectedFiles(((BrokerPendingTaskAttachment) attachment).getFileStatusMap());
    }

    @Override
    protected void getAllFileStatus() throws UserException {
        long start = System.currentTimeMillis();
        CopyJob copyJob = (CopyJob) callback;
        for (Map.Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : aggKeyToBrokerFileGroups.entrySet()) {
            FileGroupAggKey aggKey = entry.getKey();
            List<BrokerFileGroup> fileGroups = entry.getValue();
            LOG.debug("aggKey:{}, fileGroups:{}", aggKey, fileGroups);

            List<List<Pair<TBrokerFileStatus, ObjectFilePB>>> fileStatusList = Lists.newArrayList();
            long tableTotalFileSize = 0;
            int tableTotalFileNum = 0;
            int groupNum = 0;
            for (BrokerFileGroup fileGroup : fileGroups) {
                long groupFileSize = 0;
                List<Pair<TBrokerFileStatus, ObjectFilePB>> fileStatuses = Lists.newArrayList();
                if (copyJob.isReplay()) {
                    fileStatuses = getCopyFilesWhenReplay(copyJob.getStageId(), fileGroup.getTableId(),
                            copyJob.getCopyId(), copyJob.getObjectInfo());
                    LOG.info("Get copy files when replay, stageId={}, tableId={}, copyId={}, files={}",
                            copyJob.getStageId(), fileGroup.getTableId(), copyJob.getCopyId(),
                            fileStatuses.stream().map(p -> p.second.getRelativePath()).collect(Collectors.toList()));
                }
                if (!isBeginCopyDone) {
                    for (String path : fileGroup.getFilePaths()) {
                        LOG.debug("input path = {}", path);
                        parseFileForCopyJob(copyJob.getStageId(), fileGroup.getTableId(), copyJob.getCopyId(),
                                copyJob.getPattern(), copyJob.getSizeLimit(), Config.max_file_num_per_copy_into_job,
                                Config.max_meta_size_per_copy_into_job, fileStatuses, copyJob.getObjectInfo(),
                                copyJob.isForceCopy());
                    }
                }
                boolean isBinaryFileFormat = fileGroup.isBinaryFileFormat();
                List<Pair<TBrokerFileStatus, ObjectFilePB>> filteredFileStatuses = Lists.newArrayList();
                for (Pair<TBrokerFileStatus, ObjectFilePB> pair : fileStatuses) {
                    TBrokerFileStatus fstatus = pair.first;
                    if (fstatus.getSize() == 0 && isBinaryFileFormat) {
                        // For parquet or orc file, if it is an empty file, ignore it.
                        // Because we can not read an empty parquet or orc file.
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(
                                    new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()).add("empty file", fstatus)
                                            .build());
                        }
                    } else {
                        groupFileSize += fstatus.size;
                        filteredFileStatuses.add(pair);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId()).add("file_status",
                                    fstatus).build());
                        }
                    }
                }
                fileStatusList.add(filteredFileStatuses);
                tableTotalFileSize += groupFileSize;
                tableTotalFileNum += filteredFileStatuses.size();
                LOG.info("get {} files in file group {} for table {}. size: {}. job: {}, broker: {} ",
                        filteredFileStatuses.size(), groupNum, entry.getKey(), groupFileSize, callback.getCallbackId(),
                        brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER ? BrokerUtil.getAddress(
                                brokerDesc) : brokerDesc.getStorageType());
                groupNum++;
            }
            if (fileStatusList.stream().flatMap(List::stream).map(l -> l.second).count() == 0) {
                retryTime = 0;
                copyJob.retryTimes = 0;
                copyJob.setAbortedCopy(true);
                throw new UserException(String.format(NO_FILES_ERROR_MSG + ", matched %d files, "
                        + "filtered %d files because files may be loading or loaded"
                        + reachLimitStr, matchedFileNum, loadedFileNum));
            }
            fileStatusMap.put(aggKey, fileStatusList);
            LOG.info("get {} files to be loaded. total size: {}. cost: {} ms, job: {}, queryId: {}", tableTotalFileNum,
                    tableTotalFileSize, (System.currentTimeMillis() - start), callback.getCallbackId(),
                    copyJob.getCopyId());
        }
    }

    private void beginCopy(BrokerPendingTaskAttachment attachment) throws UserException {
        CopyJob copyJob = (CopyJob) callback;
        long startTime = System.currentTimeMillis();
        long timeoutTime = startTime + copyJob.getTimeout() * 1000 + 5000; // add a delta

        long totalFileSize = 0;
        int totalFileNum = 0;
        for (Entry<FileGroupAggKey, List<List<Pair<TBrokerFileStatus, ObjectFilePB>>>> entry :
                fileStatusMap.entrySet()) {
            FileGroupAggKey fileGroupAggKey = entry.getKey();
            List<List<Pair<TBrokerFileStatus, ObjectFilePB>>> value = entry.getValue();

            List<ObjectFilePB> objectFiles = value.stream().flatMap(List::stream).map(l -> l.second)
                    .collect(Collectors.toList());
            // groupId is 0 because the tableId is unique in FileGroupAggKey(copy into can't set partition now)
            List<ObjectFilePB> filteredObjectFiles = copyJob.isForceCopy() ? objectFiles
                    : ((CloudInternalCatalog) Env.getCurrentInternalCatalog())
                            .beginCopy(copyJob.getStageId(), copyJob.getStageType(), fileGroupAggKey.getTableId(),
                                    copyJob.getCopyId(), 0, startTime, timeoutTime, objectFiles, copyJob.getSizeLimit(),
                                    Config.max_file_num_per_copy_into_job, Config.max_meta_size_per_copy_into_job);
            if (filteredObjectFiles.isEmpty()) {
                retryTime = 0;
                copyJob.setAbortedCopy(true);
                throw new UserException(String.format(NO_FILES_ERROR_MSG + ", matched %d files, "
                        + "filtered %d files because files may be loading or loaded"
                        + reachLimitStr, matchedFileNum, matchedFileNum));
            }
            copyJob.setAbortedCopy(false);
            Set<String> filteredObjectSet = filteredObjectFiles.stream()
                    .map(f -> StageUtil.getFileInfoUniqueId(f)).collect(Collectors.toSet());
            LOG.debug("Begin copy for stage={}, table={}, queryId={}, before objectSize={}, filtered objectSize={}",
                    copyJob.getStageId(), fileGroupAggKey.getTableId(), copyJob.getCopyId(), objectFiles.size(),
                    filteredObjectSet.size());

            List<List<TBrokerFileStatus>> fileStatusList = new ArrayList<>();
            long tableTotalFileSize = 0;
            int tableTotalFileNum = 0;
            int groupNum = 0;

            boolean needFilter = filteredObjectFiles.size() != objectFiles.size();
            for (List<Pair<TBrokerFileStatus, ObjectFilePB>> pairs : value) {
                List<TBrokerFileStatus> fileStatuses = new ArrayList<>();
                for (Pair<TBrokerFileStatus, ObjectFilePB> pair : pairs) {
                    TBrokerFileStatus brokerFileStatus = pair.first;
                    ObjectFilePB objectFile = pair.second;
                    if (!needFilter || filteredObjectSet.contains(StageUtil.getFileInfoUniqueId(objectFile))) {
                        tableTotalFileSize += brokerFileStatus.getSize();
                        tableTotalFileNum++;
                        fileStatuses.add(brokerFileStatus);
                    }
                }
                fileStatusList.add(fileStatuses);
                LOG.info("get {} files in file group {} for table {}. size: {}. job: {}", fileStatuses.size(), groupNum,
                        entry.getKey(), tableTotalFileSize, callback.getCallbackId());
                groupNum++;
            }

            totalFileSize += tableTotalFileSize;
            totalFileNum += tableTotalFileNum;
            attachment.addFileStatus(fileGroupAggKey, fileStatusList);
            LOG.info("get {} files to be loaded. total size: {}. cost: {} ms, job: {}", tableTotalFileNum,
                    tableTotalFileSize, (System.currentTimeMillis() - startTime), callback.getCallbackId());
        }
        copyJob.setLoadFileInfo(totalFileNum, totalFileSize);
    }

    protected void parseFileForCopyJob(String stageId, long tableId, String copyId, String pattern, long sizeLimit,
            int fileNumLimit, int fileMetaSizeLimit, List<Pair<TBrokerFileStatus, ObjectFilePB>> fileStatus,
            ObjectInfo objectInfo, boolean forceCopy) throws UserException {
        try {
            Triple<Integer, Integer, String> triple = StageUtil.listAndFilterFilesV2(objectInfo, pattern, copyId,
                    stageId, tableId, forceCopy, sizeLimit, fileNumLimit, fileMetaSizeLimit, fileStatus);
            matchedFileNum = triple.getLeft();
            loadedFileNum = triple.getMiddle();
            reachLimitStr = triple.getRight();
        } catch (Exception e) {
            LOG.warn("Failed to list copy files for queryId={}", copyId, e);
            throw new UserException("list copy files failed. msg=" + e.getMessage());
        }
    }

    private List<ObjectFilePB> getCopyFiles(String stageId, long tableId, boolean force) throws DdlException {
        return force ? new ArrayList<>() :
                ((CloudInternalCatalog) Env.getCurrentInternalCatalog()).getCopyFiles(stageId, tableId);
    }

    protected void listAndFilterFiles(ObjectInfo objectInfo, String pattern, String copyId, long sizeLimit, int fileNum,
            int metaSize, List<ObjectFilePB> copiedFiles, List<Pair<TBrokerFileStatus, ObjectFilePB>> fileStatus)
            throws Exception {
        long startTimestamp = System.currentTimeMillis();
        long listFileNum = 0;
        matchedFileNum = 0;
        loadedFileNum = 0;
        reachLimitStr = "";
        RemoteBase remote = RemoteBase.newInstance(objectInfo);
        Set<String> loadedFileSet = copiedFiles.stream().map(f -> StageUtil.getFileInfoUniqueId(f))
                .collect(Collectors.toSet());
        try {
            long totalSize = 0;
            long totalMetaSize = 0;
            PathMatcher matcher = StageUtil.getPathMatcher(pattern);
            String continuationToken = null;
            boolean finish = false;
            while (!finish) {
                ListObjectsResult listObjectsResult = remote.listObjects(continuationToken);
                listFileNum += listObjectsResult.getObjectInfoList().size();
                long costSeconds = (System.currentTimeMillis() - startTimestamp) / 1000;
                if (costSeconds >= 3600 || listFileNum >= 1000000) {
                    throw new DdlException("Abort list object for queryId=" + ((CopyJob) callback).getCopyId()
                            + ". We don't collect enough files to load, after listing " + listFileNum + " objects for "
                            + costSeconds + " seconds, please check if your pattern " + pattern + " is correct.");
                }
                for (ObjectFile objectFile : listObjectsResult.getObjectInfoList()) {
                    // check:
                    // 1. match pattern if it's set
                    // 2. file is not copying or copied by other copy jobs
                    // 3. not reach any limit of fileNum/fileSize/fileMetaSize if select more than 1 file
                    if (!StageUtil.matchPattern(objectFile.getRelativePath(), matcher)) {
                        continue;
                    }
                    matchedFileNum++;
                    if (loadedFileSet.contains(StageUtil.getFileInfoUniqueId(objectFile))) {
                        loadedFileNum++;
                        continue;
                    }
                    ObjectFilePB objectFilePB = ObjectFilePB.newBuilder().setRelativePath(objectFile.getRelativePath())
                            .setEtag(objectFile.getEtag()).build();
                    if (fileStatus.size() > 0 && sizeLimit > 0 && totalSize + objectFile.getSize() >= sizeLimit) {
                        finish = true;
                        reachLimitStr = ", skip list because reach size limit: " + sizeLimit;
                        break;
                    }
                    if (fileStatus.size() > 0 && metaSize > 0
                            && totalMetaSize + objectFilePB.getSerializedSize() >= metaSize) {
                        finish = true;
                        reachLimitStr = ", skip list because reach meta size limit: " + metaSize;
                        break;
                    }
                    // add file
                    String objUrl = "s3://" + objectInfo.getBucket() + "/" + objectFile.getKey();
                    fileStatus.add(
                            Pair.of(new TBrokerFileStatus(objUrl, false, objectFile.getSize(), true), objectFilePB));

                    totalSize += objectFile.getSize();
                    totalMetaSize += objectFilePB.getSerializedSize();
                    if (fileNum > 0 && fileStatus.size() >= fileNum) {
                        finish = true;
                        reachLimitStr = ", skip list because reach file num limit: " + fileNum;
                        break;
                    }
                }
                if (!listObjectsResult.isTruncated()) {
                    break;
                }
                continuationToken = listObjectsResult.getContinuationToken();
            }
        } finally {
            remote.close();
        }
    }

    private List<Pair<TBrokerFileStatus, ObjectFilePB>> getCopyFilesWhenReplay(String stageId, long tableId,
            String copyId, ObjectInfo objectInfo) throws DdlException {
        CopyJobPB copyJobPB =
                ((CloudInternalCatalog) Env.getCurrentInternalCatalog()).getCopyJob(stageId, tableId, copyId, 0);
        // BeginCopy does not execute
        if (copyJobPB == null) {
            return new ArrayList<>();
        }
        if (copyJobPB.getJobStatus() != JobStatus.LOADING) {
            throw new DdlException("Copy job is not in loading status, status=" + copyJobPB.getJobStatus());
        }
        isBeginCopyDone = true;
        RemoteBase remote = null;
        try {
            remote = RemoteBase.newInstance(objectInfo);
            List<Pair<TBrokerFileStatus, ObjectFilePB>> fileStatuses = Lists.newArrayList();
            for (ObjectFilePB objectFile : copyJobPB.getObjectFilesList()) {
                List<ObjectFile> files = remote.headObject(objectFile.getRelativePath()).getObjectInfoList();
                TBrokerFileStatus brokerFileStatus = null;
                for (ObjectFile file : files) {
                    if (file.getRelativePath().equals(objectFile.getRelativePath()) && file.getEtag()
                            .equals(objectFile.getEtag())) {
                        String objUrl = "s3://" + objectInfo.getBucket() + "/" + file.getKey();
                        brokerFileStatus = new TBrokerFileStatus(objUrl, false, file.getSize(), true);
                        break;
                    }
                }
                if (brokerFileStatus == null) {
                    throw new Exception("Can not find object with relative path: " + objectFile.getRelativePath());
                }
                fileStatuses.add(Pair.of(brokerFileStatus, objectFile));
            }
            return fileStatuses;
        } catch (Exception e) {
            throw new DdlException(e.getMessage());
        } finally {
            remote.close();
        }
    }
}