ShowLoadCommand.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.analysis.ShowStreamLoadStmt;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cloud.load.CloudLoadManager;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.proc.LoadProcDir;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.common.util.Util;
import org.apache.doris.job.manager.JobManager;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.Load;
import org.apache.doris.load.LoadJob;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.CompoundPredicate;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Like;
import org.apache.doris.nereids.trees.expressions.literal.StringLikeLiteral;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.qe.StmtExecutor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* show load command
*/
public class ShowLoadCommand extends ShowCommand {
// LOAD_TITLE_NAMES copy from org.apache.doris.common.proc.LoadProcDir
public static final ImmutableList<String> LOAD_TITLE_NAMES = new ImmutableList.Builder<String>()
.add("JobId").add("Label").add("State").add("Progress")
.add("Type").add("EtlInfo").add("TaskInfo").add("ErrorMsg").add("CreateTime")
.add("EtlStartTime").add("EtlFinishTime").add("LoadStartTime").add("LoadFinishTime")
.add("URL").add("JobDetails").add("TransactionId").add("ErrorTablets").add("User").add("Comment")
.build();
// STREAM_LOAD_TITLE_NAMES copy from org.apache.doris.analysis.org.apache.doris.analysis
public static final ImmutableList<String> STREAM_LOAD_TITLE_NAMES = new ImmutableList.Builder<String>()
.add("Label").add("Db").add("Table")
.add("ClientIp").add("Status").add("Message").add("Url").add("TotalRows")
.add("LoadedRows").add("FilteredRows").add("UnselectedRows").add("LoadBytes")
.add("StartTime").add("FinishTime").add("User").add("Comment")
.build();
protected String labelValue;
protected String stateValue;
protected boolean isAccurateMatch;
protected String copyIdValue;
protected String tableNameValue;
protected String fileValue;
protected boolean isCopyIdAccurateMatch;
protected boolean isTableNameAccurateMatch;
protected boolean isFilesAccurateMatch;
private final Expression wildWhere;
private final long limit;
private final long offset;
private final List<OrderKey> orderKeys;
private String dbName;
private ArrayList<OrderByPair> orderByPairs;
private boolean isStreamLoad;
/**
* constructor for show load
*/
public ShowLoadCommand(Expression wildWhere, List<OrderKey> orderKeys, long limit, long offset, String dbName) {
super(PlanType.SHOW_LOAD_COMMAND);
this.wildWhere = wildWhere;
this.orderKeys = orderKeys;
this.limit = limit;
this.offset = offset;
this.dbName = dbName;
this.isStreamLoad = false;
this.copyIdValue = null;
this.isCopyIdAccurateMatch = false;
}
/**
* constructor for show load
*/
public ShowLoadCommand(Expression wildWhere, List<OrderKey> orderKeys, long limit,
long offset, String dbName, boolean isStreamLoad) {
super(PlanType.SHOW_LOAD_COMMAND);
this.wildWhere = wildWhere;
this.orderKeys = orderKeys;
this.limit = limit;
this.offset = offset;
this.dbName = dbName;
this.isStreamLoad = isStreamLoad;
this.copyIdValue = null;
this.isCopyIdAccurateMatch = false;
}
@Override
public ShowResultSetMetaData getMetaData() {
ImmutableList<String> titles = isStreamLoad ? STREAM_LOAD_TITLE_NAMES : LOAD_TITLE_NAMES;
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : titles) {
builder.addColumn(new Column(title, ScalarType.createVarchar(128)));
}
return builder.build();
}
protected boolean isAccurateMatch() {
return isAccurateMatch;
}
protected boolean isStreamLoad() {
return isStreamLoad;
}
public ArrayList<OrderByPair> getOrderByPairs() {
return this.orderByPairs;
}
private org.apache.doris.load.loadv2.JobState getStateV2() {
if (Strings.isNullOrEmpty(stateValue)) {
return null;
}
return org.apache.doris.load.loadv2.JobState.valueOf(stateValue);
}
private ShowStreamLoadStmt.StreamLoadState getStreamLoadState() {
if (Strings.isNullOrEmpty(stateValue)) {
return null;
}
ShowStreamLoadStmt.StreamLoadState state = null;
try {
state = ShowStreamLoadStmt.StreamLoadState.valueOf(stateValue);
} catch (Exception e) {
// CHECKSTYLE IGNORE THIS LINE
}
return state;
}
/**
* handle show load
*/
private ShowResultSet handleShowLoad(ConnectContext ctx, StmtExecutor executor) throws Exception {
// first validate the where
boolean valid = validate(ctx);
if (!valid) {
throw new AnalysisException("Where clause should looks like: LABEL = \"your_load_label\","
+ " or LABEL LIKE \"matcher\", " + " or STATE = \"PENDING|ETL|LOADING|FINISHED|CANCELLED\", "
+ " or compound predicate with operator AND");
}
// then process the order by
ImmutableList<String> titles = isStreamLoad ? STREAM_LOAD_TITLE_NAMES : LOAD_TITLE_NAMES;
orderByPairs = getOrderByPairs(orderKeys, titles);
Set<EtlJobType> jobTypes = Sets.newHashSet(EnumSet.allOf(EtlJobType.class));
jobTypes.remove(EtlJobType.COPY);
// get the load info
Util.prohibitExternalCatalog(ctx.getDefaultCatalog(), this.getClass().getSimpleName());
Env env = ctx.getEnv();
DatabaseIf db = ctx.getCurrentCatalog().getDbOrAnalysisException(dbName);
long dbId = db.getId();
List<List<Comparable>> loadInfos;
// combine the List<LoadInfo> of load(v1) and loadManager(v2)
Load load = env.getLoadInstance();
loadInfos = load.getLoadJobInfosByDb(dbId, db.getFullName(), labelValue, isAccurateMatch, getStates());
Set<String> statesValue = getStates() == null ? null : getStates().stream()
.map(entity -> entity.name())
.collect(Collectors.toSet());
if (!Config.isCloudMode()) {
loadInfos.addAll(env.getLoadManager().getLoadJobInfosByDb(dbId, labelValue, isAccurateMatch, statesValue));
} else {
loadInfos.addAll(((CloudLoadManager) env.getLoadManager())
.getLoadJobInfosByDb(dbId, labelValue,
isAccurateMatch, statesValue, jobTypes, copyIdValue, isCopyIdAccurateMatch,
tableNameValue, isTableNameAccurateMatch, fileValue, isFilesAccurateMatch));
}
// add the nerieds load info
JobManager loadMgr = env.getJobManager();
loadInfos.addAll(loadMgr.getLoadJobInfosByDb(dbId, db.getFullName(), labelValue,
isAccurateMatch, getStateV2(), db.getCatalog().getName()));
// for stream load
List<List<Comparable>> streamLoadRecords = new ArrayList<>();
if (isStreamLoad) {
streamLoadRecords = env.getStreamLoadRecordMgr()
.getStreamLoadRecordByDb(dbId, labelValue, isAccurateMatch, getStreamLoadState());
}
ListComparator<List<Comparable>> comparator;
if (orderByPairs != null) {
OrderByPair[] orderByPairArr = new OrderByPair[orderByPairs.size()];
comparator = new ListComparator<>(orderByPairs.toArray(orderByPairArr));
} else {
// sort by id asc
comparator = new ListComparator<>(0);
}
List<List<Comparable>> showLoadInfos = isStreamLoad ? streamLoadRecords : loadInfos;
Collections.sort(showLoadInfos, comparator);
List<List<String>> rows = Lists.newArrayList();
for (List<Comparable> loadInfo : showLoadInfos) {
List<String> oneInfo = new ArrayList<>(loadInfo.size());
// replace QUORUM_FINISHED -> FINISHED
if (loadInfo.get(LoadProcDir.STATE_INDEX).equals(LoadJob.JobState.QUORUM_FINISHED.name())) {
loadInfo.set(LoadProcDir.STATE_INDEX, LoadJob.JobState.FINISHED.name());
}
for (Comparable element : loadInfo) {
oneInfo.add(element.toString());
}
rows.add(oneInfo);
}
// filter by limit
rows = applyLimit(limit, offset, rows);
return new ShowResultSet(getMetaData(), rows);
}
@VisibleForTesting
protected boolean validate(ConnectContext ctx) throws UserException {
if (Strings.isNullOrEmpty(dbName)) {
dbName = ctx.getDatabase();
if (Strings.isNullOrEmpty(dbName)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
}
// analyze where clause if not null
if (wildWhere != null) {
if (wildWhere instanceof CompoundPredicate) {
return analyzeCompoundPredicate(wildWhere);
} else {
return analyzeSubPredicate(wildWhere);
}
}
return true;
}
private Set<LoadJob.JobState> getStates() {
if (Strings.isNullOrEmpty(stateValue)) {
return null;
}
Set<LoadJob.JobState> states = new HashSet<LoadJob.JobState>();
LoadJob.JobState state = LoadJob.JobState.valueOf(stateValue);
states.add(state);
if (state == LoadJob.JobState.FINISHED) {
states.add(LoadJob.JobState.QUORUM_FINISHED);
}
return states;
}
private boolean analyzeCompoundPredicate(Expression expr) throws AnalysisException {
if (expr instanceof CompoundPredicate) {
if (!(expr instanceof And)) {
throw new AnalysisException("Only allow compound predicate with operator AND");
}
checkPredicateName(expr.child(0), expr.child(1));
return analyzeSubPredicate(expr.child(0)) && analyzeSubPredicate(expr.child(1));
} else {
return analyzeSubPredicate(expr);
}
}
private void checkPredicateName(Expression leftChild, Expression rightChild) throws AnalysisException {
String leftKey = ((UnboundSlot) leftChild.child(0)).getName();
String rightKey = ((UnboundSlot) rightChild.child(0)).getName();
if (leftKey.equals(rightKey)) {
throw new AnalysisException("column names on both sides of operator AND should be different");
}
}
private boolean analyzeSubPredicate(Expression expr) {
if (expr == null) {
return true;
}
boolean valid = true;
boolean hasLabel = false;
boolean hasState = false;
CHECK: {
if (!(expr instanceof EqualTo) && !(expr instanceof Like)) {
valid = false;
break CHECK;
}
// left child
if (!(expr.child(0) instanceof UnboundSlot)) {
valid = false;
break CHECK;
}
String leftKey = ((UnboundSlot) expr.child(0)).getName();
if (leftKey.equalsIgnoreCase("label")) {
hasLabel = true;
} else if (leftKey.equalsIgnoreCase("state")) {
hasState = true;
} else {
valid = false;
break CHECK;
}
if (hasState && !(expr instanceof EqualTo)) {
valid = false;
break CHECK;
}
if (hasLabel && expr instanceof EqualTo) {
isAccurateMatch = true;
}
// right child
if (!(expr.child(1) instanceof StringLikeLiteral)) {
valid = false;
break CHECK;
}
String rightValue = ((StringLikeLiteral) expr.child(1)).getStringValue();
if (Strings.isNullOrEmpty(rightValue)) {
valid = false;
break CHECK;
}
if (hasLabel && !isAccurateMatch && !rightValue.contains("%")) {
rightValue = "%" + rightValue + "%";
}
if (hasLabel) {
labelValue = rightValue;
} else if (hasState) {
stateValue = rightValue.toUpperCase();
try {
if (isStreamLoad) {
ShowStreamLoadStmt.StreamLoadState.valueOf(stateValue);
} else {
LoadJob.JobState.valueOf(stateValue);
}
} catch (Exception e) {
valid = false;
break CHECK;
}
}
}
return valid;
}
@Override
public ShowResultSet doRun(ConnectContext ctx, StmtExecutor executor) throws Exception {
return handleShowLoad(ctx, executor);
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitShowLoadCommand(this, context);
}
@Override
public RedirectStatus toRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
}