LoadStatistic.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.gson.Gson;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class LoadStatistic {
// number of rows processed on BE, this number will be updated periodically by query report.
// A load job may has several load tasks(queries), and each task has several fragments.
// each fragment will report independently.
// load task id -> fragment id -> rows count
private Table<TUniqueId, TUniqueId, Long> counterTbl = HashBasedTable.create();
// load task id -> fragment id -> load bytes
private Table<TUniqueId, TUniqueId, Long> loadBytes = HashBasedTable.create();
// load task id -> unfinished backend id list
private Map<TUniqueId, List<Long>> unfinishedBackendIds = Maps.newHashMap();
// load task id -> all backend id list
private Map<TUniqueId, List<Long>> allBackendIds = Maps.newHashMap();
private Map<String, String> counters = new HashMap<>();
// number of file to be loaded
public int fileNum = 0;
public long totalFileSizeB = 0;
// init the statistic of specified load task
public synchronized void initLoad(TUniqueId loadId, Set<TUniqueId> fragmentIds, List<Long> relatedBackendIds) {
counterTbl.rowMap().remove(loadId);
for (TUniqueId fragId : fragmentIds) {
counterTbl.put(loadId, fragId, 0L);
}
loadBytes.rowMap().remove(loadId);
for (TUniqueId fragId : fragmentIds) {
loadBytes.put(loadId, fragId, 0L);
}
allBackendIds.put(loadId, relatedBackendIds);
// need to get a copy of relatedBackendIds, so that when we modify the "relatedBackendIds" in
// allBackendIds, the list in unfinishedBackendIds will not be changed.
unfinishedBackendIds.put(loadId, Lists.newArrayList(relatedBackendIds));
}
public synchronized void removeLoad(TUniqueId loadId) {
counterTbl.rowMap().remove(loadId);
loadBytes.rowMap().remove(loadId);
unfinishedBackendIds.remove(loadId);
allBackendIds.remove(loadId);
}
public synchronized void updateLoadProgress(long backendId, TUniqueId loadId, TUniqueId fragmentId,
long rows, long bytes, boolean isDone) {
if (counterTbl.contains(loadId, fragmentId)) {
counterTbl.put(loadId, fragmentId, rows);
}
if (loadBytes.contains(loadId, fragmentId)) {
loadBytes.put(loadId, fragmentId, bytes);
}
if (isDone && unfinishedBackendIds.containsKey(loadId)) {
unfinishedBackendIds.get(loadId).remove(backendId);
}
}
public synchronized long getScannedRows() {
long total = 0;
for (long rows : counterTbl.values()) {
total += rows;
}
return total;
}
public synchronized long getLoadBytes() {
long total = 0;
for (long bytes : loadBytes.values()) {
total += bytes;
}
return total;
}
public Map<String, String> getCounters() {
// TODO: add extra statistics to counters
return counters;
}
public synchronized String toJson() {
long total = 0;
for (long rows : counterTbl.values()) {
total += rows;
}
long totalBytes = 0;
for (long bytes : loadBytes.values()) {
totalBytes += bytes;
}
Map<String, Object> details = Maps.newHashMap();
details.put("ScannedRows", total);
details.put("LoadBytes", totalBytes);
details.put("FileNumber", fileNum);
details.put("FileSize", totalFileSizeB);
details.put("TaskNumber", counterTbl.rowMap().size());
details.put("Unfinished backends", getPrintableMap(unfinishedBackendIds));
details.put("All backends", getPrintableMap(allBackendIds));
Gson gson = new Gson();
return gson.toJson(details);
}
private Map<String, List<Long>> getPrintableMap(Map<TUniqueId, List<Long>> map) {
Map<String, List<Long>> newMap = Maps.newHashMap();
for (Map.Entry<TUniqueId, List<Long>> entry : map.entrySet()) {
newMap.put(DebugUtil.printId(entry.getKey()), entry.getValue());
}
return newMap;
}
}