LoadContext.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TErrorTabletInfo;
import org.apache.doris.thrift.TTabletCommitInfo;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class LoadContext {
private volatile String trackingUrl;
private volatile long transactionId;
private volatile String label;
private final List<String> exportFiles = Lists.newCopyOnWriteArrayList();
private final Map<String, String> loadCounters = Maps.newLinkedHashMap();
private final List<String> deltaUrls = Lists.newCopyOnWriteArrayList();
private final List<TErrorTabletInfo> errorTabletInfos = Lists.newCopyOnWriteArrayList();
// in pipelinex, the commit info may be duplicate, so we remove the duplicate ones
// key: backendsId
// values: tabletId
private final Map<Pair<Long, Long>, TTabletCommitInfo> commitInfoMap = Maps.newLinkedHashMap();
public synchronized Map<String, String> getLoadCounters() {
return ImmutableMap.copyOf(loadCounters);
}
public synchronized void updateLoadCounters(Map<String, String> newLoadCounters) {
long numRowsNormal = 0L;
String value = this.loadCounters.get(LoadEtlTask.DPP_NORMAL_ALL);
if (value != null) {
numRowsNormal = Long.parseLong(value);
}
long numRowsAbnormal = 0L;
value = this.loadCounters.get(LoadEtlTask.DPP_ABNORMAL_ALL);
if (value != null) {
numRowsAbnormal = Long.parseLong(value);
}
long numRowsUnselected = 0L;
value = this.loadCounters.get(LoadJob.UNSELECTED_ROWS);
if (value != null) {
numRowsUnselected = Long.parseLong(value);
}
// new load counters
value = newLoadCounters.get(LoadEtlTask.DPP_NORMAL_ALL);
if (value != null) {
numRowsNormal += Long.parseLong(value);
}
value = newLoadCounters.get(LoadEtlTask.DPP_ABNORMAL_ALL);
if (value != null) {
numRowsAbnormal += Long.parseLong(value);
}
value = newLoadCounters.get(LoadJob.UNSELECTED_ROWS);
if (value != null) {
numRowsUnselected += Long.parseLong(value);
}
this.loadCounters.put(LoadEtlTask.DPP_NORMAL_ALL, Long.toString(numRowsNormal));
this.loadCounters.put(LoadEtlTask.DPP_ABNORMAL_ALL, Long.toString(numRowsAbnormal));
this.loadCounters.put(LoadJob.UNSELECTED_ROWS, Long.toString(numRowsUnselected));
}
public List<String> getDeltaUrls() {
return Utils.fastToImmutableList(deltaUrls);
}
public void updateDeltaUrls(List<String> deltaUrls) {
if (!deltaUrls.isEmpty()) {
this.deltaUrls.addAll(deltaUrls);
}
}
public synchronized void updateCommitInfos(List<TTabletCommitInfo> commitInfos) {
// distinct commit info in the map
for (TTabletCommitInfo commitInfo : commitInfos) {
this.commitInfoMap.put(Pair.of(commitInfo.backendId, commitInfo.tabletId), commitInfo);
}
}
public synchronized List<TTabletCommitInfo> getCommitInfos() {
return Utils.fastToImmutableList(commitInfoMap.values());
}
public void updateTrackingUrl(String trackingUrl) {
this.trackingUrl = trackingUrl;
}
public String getTrackingUrl() {
return trackingUrl;
}
public void updateTransactionId(long transactionId) {
this.transactionId = transactionId;
}
public long getTransactionId() {
return transactionId;
}
public String getLabel() {
return label;
}
public void updateLabel(String label) {
this.label = label;
}
public void addExportFiles(List<String> files) {
this.exportFiles.addAll(files);
}
public List<String> getExportFiles() {
return exportFiles;
}
public synchronized void updateErrorTabletInfos(List<TErrorTabletInfo> errorTabletInfos) {
if (this.errorTabletInfos.size() <= Config.max_error_tablet_of_broker_load) {
this.errorTabletInfos.addAll(errorTabletInfos.stream().limit(Config.max_error_tablet_of_broker_load
- this.errorTabletInfos.size()).collect(Collectors.toList()));
}
}
public List<TErrorTabletInfo> getErrorTabletInfos() {
return errorTabletInfos;
}
}