StreamLoadRecordMgr.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load;
import org.apache.doris.analysis.ShowStreamLoadStmt.StreamLoadState;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.AuditEvent.EventType;
import org.apache.doris.plugin.audit.StreamLoadAuditEvent;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStreamLoadRecord;
import org.apache.doris.thrift.TStreamLoadRecordResult;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public class StreamLoadRecordMgr extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(StreamLoadRecordMgr.class);
public class StreamLoadItem {
private String label;
private long dbId;
private String finishTime;
public StreamLoadItem(String label, long dbId, String finishTime) {
this.label = label;
this.dbId = dbId;
this.finishTime = finishTime;
}
public String getLabel() {
return label;
}
public long getDbId() {
return dbId;
}
public String getFinishTime() {
return finishTime;
}
public List<String> getStatistics() {
List<String> row = Lists.newArrayList();
row.add(label);
row.add(String.valueOf(dbId));
row.add(finishTime);
return row;
}
}
class StreamLoadComparator implements Comparator<StreamLoadItem> {
public int compare(StreamLoadItem s1, StreamLoadItem s2) {
return s1.getFinishTime().compareTo(s2.getFinishTime());
}
}
Queue<StreamLoadItem> streamLoadRecordHeap = new PriorityQueue<>(new StreamLoadComparator());
private Map<Long, Map<String, StreamLoadRecord>> dbIdToLabelToStreamLoadRecord = Maps.newConcurrentMap();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public StreamLoadRecordMgr(String name, long intervalMs) {
super(name, intervalMs);
}
public void addStreamLoadRecord(long dbId, String label, StreamLoadRecord streamLoadRecord) {
writeLock();
while (isQueueFull()) {
StreamLoadItem record = streamLoadRecordHeap.poll();
if (record != null) {
String deLabel = record.getLabel();
long deDbId = record.getDbId();
Map<String, StreamLoadRecord> labelToStreamLoadRecord = dbIdToLabelToStreamLoadRecord.get(deDbId);
Iterator<Map.Entry<String, StreamLoadRecord>> iterRecord
= labelToStreamLoadRecord.entrySet().iterator();
while (iterRecord.hasNext()) {
String labelInMap = iterRecord.next().getKey();
if (labelInMap.equals(deLabel)) {
iterRecord.remove();
break;
}
}
}
}
StreamLoadItem record = new StreamLoadItem(label, dbId, streamLoadRecord.getFinishTime());
streamLoadRecordHeap.offer(record);
if (!dbIdToLabelToStreamLoadRecord.containsKey(dbId)) {
dbIdToLabelToStreamLoadRecord.put(dbId, new ConcurrentHashMap<>());
}
Map<String, StreamLoadRecord> labelToStreamLoadRecord = dbIdToLabelToStreamLoadRecord.get(dbId);
if (!labelToStreamLoadRecord.containsKey(label)) {
labelToStreamLoadRecord.put(label, streamLoadRecord);
} else if (labelToStreamLoadRecord.get(label).getFinishTime().compareTo(streamLoadRecord.getFinishTime()) < 0) {
labelToStreamLoadRecord.put(label, streamLoadRecord);
}
writeUnlock();
}
public List<StreamLoadItem> getStreamLoadRecords() {
return new ArrayList<>(streamLoadRecordHeap);
}
public List<List<Comparable>> getStreamLoadRecordByDb(
long dbId, String label, boolean accurateMatch, StreamLoadState state) {
LinkedList<List<Comparable>> streamLoadRecords = new LinkedList<List<Comparable>>();
readLock();
try {
if (!dbIdToLabelToStreamLoadRecord.containsKey(dbId)) {
return streamLoadRecords;
}
List<StreamLoadRecord> streamLoadRecordList = Lists.newArrayList();
Map<String, StreamLoadRecord> labelToStreamLoadRecord = dbIdToLabelToStreamLoadRecord.get(dbId);
if (Strings.isNullOrEmpty(label)) {
streamLoadRecordList.addAll(labelToStreamLoadRecord.values().stream().collect(Collectors.toList()));
} else {
// check label value
if (accurateMatch) {
if (!labelToStreamLoadRecord.containsKey(label)) {
return streamLoadRecords;
}
streamLoadRecordList.add(labelToStreamLoadRecord.get(label));
} else {
// non-accurate match
for (Map.Entry<String, StreamLoadRecord> entry : labelToStreamLoadRecord.entrySet()) {
if (entry.getKey().contains(label)) {
streamLoadRecordList.add(entry.getValue());
}
}
}
}
for (StreamLoadRecord streamLoadRecord : streamLoadRecordList) {
try {
if (state != null && !String.valueOf(state).equalsIgnoreCase(streamLoadRecord.getStatus())) {
continue;
}
// check auth
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME,
streamLoadRecord.getDb(), streamLoadRecord.getTable(),
PrivPredicate.LOAD)) {
continue;
}
streamLoadRecords.add(streamLoadRecord.getStreamLoadInfo());
} catch (Exception e) {
continue;
}
}
return streamLoadRecords;
} finally {
readUnlock();
}
}
public void clearStreamLoadRecord() {
writeLock();
if (streamLoadRecordHeap.size() > 0 || dbIdToLabelToStreamLoadRecord.size() > 0) {
streamLoadRecordHeap.clear();
dbIdToLabelToStreamLoadRecord.clear();
}
writeUnlock();
}
public boolean isQueueFull() {
return streamLoadRecordHeap.size() >= Config.max_stream_load_record_size;
}
private void readLock() {
lock.readLock().lock();
}
private void readUnlock() {
lock.readLock().unlock();
}
private void writeLock() {
lock.writeLock().lock();
}
private void writeUnlock() {
lock.writeLock().unlock();
}
@Override
protected void runAfterCatalogReady() {
ImmutableMap<Long, Backend> backends;
try {
backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
} catch (AnalysisException e) {
LOG.warn("Failed to load backends from system info", e);
return;
}
long start = System.currentTimeMillis();
int pullRecordSize = 0;
Map<Long, Long> beIdToLastStreamLoad = Maps.newHashMap();
for (Backend backend : backends.values()) {
if (!backend.isAlive()) {
continue;
}
BackendService.Client client = null;
TNetworkAddress address = null;
boolean ok = false;
try {
address = new TNetworkAddress(backend.getHost(), backend.getBePort());
client = ClientPool.backendPool.borrowObject(address);
TStreamLoadRecordResult result = client.getStreamLoadRecord(backend.getLastStreamLoadTime());
Map<String, TStreamLoadRecord> streamLoadRecordBatch = result.getStreamLoadRecord();
pullRecordSize += streamLoadRecordBatch.size();
long lastStreamLoadTime = -1;
for (Map.Entry<String, TStreamLoadRecord> entry : streamLoadRecordBatch.entrySet()) {
TStreamLoadRecord streamLoadItem = entry.getValue();
String startTime = TimeUtils.longToTimeString(streamLoadItem.getStartTime(),
TimeUtils.getDatetimeMsFormatWithTimeZone());
String finishTime = TimeUtils.longToTimeString(streamLoadItem.getFinishTime(),
TimeUtils.getDatetimeMsFormatWithTimeZone());
if (LOG.isDebugEnabled()) {
LOG.debug("receive stream load record info from backend: {}."
+ " label: {}, db: {}, tbl: {}, user: {}, user_ip: {},"
+ " status: {}, message: {}, error_url: {},"
+ " total_rows: {}, loaded_rows: {}, filtered_rows: {}, unselected_rows: {},"
+ " load_bytes: {}, start_time: {}, finish_time: {}.",
backend.getHost(), streamLoadItem.getLabel(), streamLoadItem.getDb(),
streamLoadItem.getTbl(), streamLoadItem.getUser(), streamLoadItem.getUserIp(),
streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(),
streamLoadItem.getTotalRows(), streamLoadItem.getLoadedRows(),
streamLoadItem.getFilteredRows(), streamLoadItem.getUnselectedRows(),
streamLoadItem.getLoadBytes(), startTime, finishTime);
}
AuditEvent auditEvent =
new StreamLoadAuditEvent.AuditEventBuilder().setEventType(EventType.STREAM_LOAD_FINISH)
.setLabel(streamLoadItem.getLabel()).setDb(streamLoadItem.getDb())
.setTable(streamLoadItem.getTbl()).setUser(streamLoadItem.getUser())
.setClientIp(streamLoadItem.getUserIp()).setStatus(streamLoadItem.getStatus())
.setMessage(streamLoadItem.getMessage()).setUrl(streamLoadItem.getUrl())
.setTotalRows(streamLoadItem.getTotalRows())
.setLoadedRows(streamLoadItem.getLoadedRows())
.setFilteredRows(streamLoadItem.getFilteredRows())
.setUnselectedRows(streamLoadItem.getUnselectedRows())
.setLoadBytes(streamLoadItem.getLoadBytes()).setStartTime(startTime)
.setFinishTime(finishTime).build();
Env.getCurrentEnv().getAuditEventProcessor().handleAuditEvent(auditEvent);
if (entry.getValue().getFinishTime() > lastStreamLoadTime) {
lastStreamLoadTime = entry.getValue().getFinishTime();
}
if (Config.disable_show_stream_load) {
continue;
}
StreamLoadRecord streamLoadRecord =
new StreamLoadRecord(streamLoadItem.getLabel(), streamLoadItem.getDb(),
streamLoadItem.getTbl(), streamLoadItem.getUserIp(),
streamLoadItem.getStatus(), streamLoadItem.getMessage(), streamLoadItem.getUrl(),
String.valueOf(streamLoadItem.getTotalRows()),
String.valueOf(streamLoadItem.getLoadedRows()),
String.valueOf(streamLoadItem.getFilteredRows()),
String.valueOf(streamLoadItem.getUnselectedRows()),
String.valueOf(streamLoadItem.getLoadBytes()),
startTime, finishTime, streamLoadItem.getUser(), streamLoadItem.getComment());
String fullDbName = streamLoadItem.getDb();
Database db = Env.getCurrentInternalCatalog().getDbNullable(fullDbName);
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(streamLoadItem.getCluster())) {
dbName = streamLoadItem.getDb();
}
LOG.warn("unknown database, database=" + dbName);
continue;
}
long dbId = db.getId();
Env.getCurrentEnv().getStreamLoadRecordMgr()
.addStreamLoadRecord(dbId, streamLoadItem.getLabel(), streamLoadRecord);
}
if (streamLoadRecordBatch.size() > 0) {
backend.setLastStreamLoadTime(lastStreamLoadTime);
beIdToLastStreamLoad.put(backend.getId(), lastStreamLoadTime);
} else {
beIdToLastStreamLoad.put(backend.getId(), backend.getLastStreamLoadTime());
}
ok = true;
} catch (Exception e) {
LOG.warn("task exec error. backend[{}]", backend.getId(), e);
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
}
}
}
LOG.info("finished to pull stream load records of all backends. record size: {}, cost: {} ms",
pullRecordSize, (System.currentTimeMillis() - start));
if (pullRecordSize > 0) {
FetchStreamLoadRecord fetchStreamLoadRecord = new FetchStreamLoadRecord(beIdToLastStreamLoad);
Env.getCurrentEnv().getEditLog().logFetchStreamLoadRecord(fetchStreamLoadRecord);
}
if (Config.disable_show_stream_load) {
Env.getCurrentEnv().getStreamLoadRecordMgr().clearStreamLoadRecord();
}
}
public void replayFetchStreamLoadRecord(FetchStreamLoadRecord fetchStreamLoadRecord) throws AnalysisException {
ImmutableMap<Long, Backend> backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
Map<Long, Long> beIdToLastStreamLoad = fetchStreamLoadRecord.getBeIdToLastStreamLoad();
for (Backend backend : backends.values()) {
if (beIdToLastStreamLoad.containsKey(backend.getId())) {
long lastStreamLoadTime = beIdToLastStreamLoad.get(backend.getId());
LOG.info("Replay stream load bdbje. backend: {}, last stream load time: {}",
backend.getHost(), lastStreamLoadTime);
backend.setLastStreamLoadTime(lastStreamLoadTime);
}
}
}
public static class FetchStreamLoadRecord implements Writable {
@SerializedName("beIdToLastStreamLoad")
private Map<Long, Long> beIdToLastStreamLoad;
public FetchStreamLoadRecord(Map<Long, Long> beIdToLastStreamLoad) {
this.beIdToLastStreamLoad = beIdToLastStreamLoad;
}
public void setBeIdToLastStreamLoad(Map<Long, Long> beIdToLastStreamLoad) {
this.beIdToLastStreamLoad = beIdToLastStreamLoad;
}
public Map<Long, Long> getBeIdToLastStreamLoad() {
return beIdToLastStreamLoad;
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static FetchStreamLoadRecord read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, FetchStreamLoadRecord.class);
}
}
}