LoadContext.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TErrorTabletInfo;
import org.apache.doris.thrift.TTabletCommitInfo;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class LoadContext {
    private volatile String trackingUrl;
    private volatile String firstErrorMsg;
    private volatile long transactionId;
    private volatile String label;
    private final List<String> exportFiles = Lists.newCopyOnWriteArrayList();
    private final Map<String, String> loadCounters = Maps.newLinkedHashMap();
    private final List<String> deltaUrls = Lists.newCopyOnWriteArrayList();
    private final List<TErrorTabletInfo> errorTabletInfos = Lists.newCopyOnWriteArrayList();
    // in pipelinex, the commit info may be duplicate, so we remove the duplicate ones
    // key: backendsId
    // values: tabletId
    private final Map<Pair<Long, Long>, TTabletCommitInfo> commitInfoMap = Maps.newLinkedHashMap();
    public synchronized Map<String, String> getLoadCounters() {
        return ImmutableMap.copyOf(loadCounters);
    }
    public synchronized void updateLoadCounters(Map<String, String> newLoadCounters) {
        long numRowsNormal = 0L;
        String value = this.loadCounters.get(LoadEtlTask.DPP_NORMAL_ALL);
        if (value != null) {
            numRowsNormal = Long.parseLong(value);
        }
        long numRowsAbnormal = 0L;
        value = this.loadCounters.get(LoadEtlTask.DPP_ABNORMAL_ALL);
        if (value != null) {
            numRowsAbnormal = Long.parseLong(value);
        }
        long numRowsUnselected = 0L;
        value = this.loadCounters.get(LoadJob.UNSELECTED_ROWS);
        if (value != null) {
            numRowsUnselected = Long.parseLong(value);
        }
        // new load counters
        value = newLoadCounters.get(LoadEtlTask.DPP_NORMAL_ALL);
        if (value != null) {
            numRowsNormal += Long.parseLong(value);
        }
        value = newLoadCounters.get(LoadEtlTask.DPP_ABNORMAL_ALL);
        if (value != null) {
            numRowsAbnormal += Long.parseLong(value);
        }
        value = newLoadCounters.get(LoadJob.UNSELECTED_ROWS);
        if (value != null) {
            numRowsUnselected += Long.parseLong(value);
        }
        this.loadCounters.put(LoadEtlTask.DPP_NORMAL_ALL, Long.toString(numRowsNormal));
        this.loadCounters.put(LoadEtlTask.DPP_ABNORMAL_ALL, Long.toString(numRowsAbnormal));
        this.loadCounters.put(LoadJob.UNSELECTED_ROWS, Long.toString(numRowsUnselected));
    }
    public List<String> getDeltaUrls() {
        return Utils.fastToImmutableList(deltaUrls);
    }
    public void updateDeltaUrls(List<String> deltaUrls) {
        if (!deltaUrls.isEmpty()) {
            this.deltaUrls.addAll(deltaUrls);
        }
    }
    public synchronized void updateCommitInfos(List<TTabletCommitInfo> commitInfos) {
        // distinct commit info in the map
        for (TTabletCommitInfo commitInfo : commitInfos) {
            this.commitInfoMap.put(Pair.of(commitInfo.backendId, commitInfo.tabletId), commitInfo);
        }
    }
    public synchronized List<TTabletCommitInfo> getCommitInfos() {
        return Utils.fastToImmutableList(commitInfoMap.values());
    }
    public void updateTrackingUrl(String trackingUrl) {
        this.trackingUrl = trackingUrl;
    }
    public String getTrackingUrl() {
        return trackingUrl;
    }
    public void updateFirstErrorMsg(String firstErrorMsg) {
        this.firstErrorMsg = firstErrorMsg;
    }
    public String getFirstErrorMsg() {
        return firstErrorMsg;
    }
    public void updateTransactionId(long transactionId) {
        this.transactionId = transactionId;
    }
    public long getTransactionId() {
        return transactionId;
    }
    public String getLabel() {
        return label;
    }
    public void updateLabel(String label) {
        this.label = label;
    }
    public void addExportFiles(List<String> files) {
        this.exportFiles.addAll(files);
    }
    public List<String> getExportFiles() {
        return exportFiles;
    }
    public synchronized void updateErrorTabletInfos(List<TErrorTabletInfo> errorTabletInfos) {
        if (this.errorTabletInfos.size() <= Config.max_error_tablet_of_broker_load) {
            this.errorTabletInfos.addAll(errorTabletInfos.stream().limit(Config.max_error_tablet_of_broker_load
                    - this.errorTabletInfos.size()).collect(Collectors.toList()));
        }
    }
    public List<TErrorTabletInfo> getErrorTabletInfos() {
        return errorTabletInfos;
    }
}