ProgressManager.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.load.loadv2;

import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;

/**
 * ProgressManager manage the progress of loading and exporting tasks
 */
public class ProgressManager {
    private static final Logger LOG = LogManager.getLogger(ProgressManager.class);

    private Map<String, Progress> idToProgress = Maps.newConcurrentMap();

    public void registerProgress(String id, int scannerNum) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("create {} with initial scannerNum {}", id, scannerNum);
        }
        idToProgress.remove(id);
        idToProgress.put(id, new Progress(scannerNum));
    }

    public void registerProgressSimple(String id) {
        registerProgress(id, 0);
    }

    public void removeProgress(String id) {
        idToProgress.remove(id);
    }

    public void updateProgress(String id, TUniqueId queryId, TUniqueId fragmentId, int finishedScannerNum) {
        Progress progress = idToProgress.get(id);
        if (progress != null) {
            progress.updateFinishedScanNums(queryId, fragmentId, finishedScannerNum);
        } else {
            LOG.warn("progress[" + id + "] missing meta information");
        }
    }

    public void addTotalScanNums(String id, int num) {
        Progress progress = idToProgress.get(id);
        if (progress != null) {
            progress.addTotalScanNums(num);
        }
    }

    public String getProgressInfo(String id, boolean finished) {
        String progressInfo = "Unknown id: " + id;
        Progress progress = idToProgress.get(id);
        if (progress != null) {
            int finish = progress.getFinishedScanNums();
            int total = progress.getTotalScanNums();
            String currentProgress = String.format("%.2f", progress.getProgress(finished));
            progressInfo = currentProgress + "% (" + finish + "/" + total + ")";
        }
        return progressInfo;
    }

    static class Progress {
        // one job have multiple query, and the query can be divided into
        // separate fragments. finished scan ranges reported from BE is bound
        // to the query, so we need to store them all to save status.
        // table: queryId -> fragmentId -> scan ranges
        private Table<TUniqueId, TUniqueId, Integer> finishedScanNums = HashBasedTable.create();
        private int totalScanNums = 0;

        public synchronized void addTotalScanNums(int num) {
            totalScanNums += num;
        }

        public synchronized void updateFinishedScanNums(TUniqueId queryId, TUniqueId fragmentId, int finishedScanNum) {
            finishedScanNums.put(queryId, fragmentId, finishedScanNum);
        }

        public int getTotalScanNums() {
            return totalScanNums;
        }

        public int getFinishedScanNums() {
            int result = 0;
            for (Integer v : finishedScanNums.values()) {
                result += v;
            }
            return result;
        }

        public double getProgress(boolean finished) {
            // if no scan range found, the progress should be finished(100%)
            int finishedScanNums = getFinishedScanNums();
            if (totalScanNums == 0 || finishedScanNums == totalScanNums) {
                return finished ? 100.0 : 99.99;
            }
            return finishedScanNums * 100.0 / totalScanNums;
        }

        public Progress(int totalScanNums) {
            this.totalScanNums = totalScanNums;
        }

        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("Finished/Total: ");
            sb.append(getFinishedScanNums());
            sb.append("/");
            sb.append(totalScanNums);
            sb.append(" => ");
            sb.append(getProgress(true));
            sb.append("%");
            return sb.toString();
        }
    }
}