MultiFragmentsPipelineTask.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe.runtime;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PCancelPlanFragmentResult;
import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
import org.apache.doris.proto.Types;
import org.apache.doris.proto.Types.PUniqueId;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.CoordinatorContext;
import org.apache.doris.qe.SimpleScheduler;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class MultiFragmentsPipelineTask extends AbstractRuntimeTask<Integer, SingleFragmentPipelineTask> {
private static final Logger LOG = LogManager.getLogger(PipelineExecutionTask.class);
// immutable parameters
private final CoordinatorContext coordinatorContext;
private final Backend backend;
private final BackendServiceProxy backendClientProxy;
// mutable states
// we will set fragmentsParams and serializeFragments to null after send rpc, to save memory
private ByteString serializeFragments;
private final AtomicBoolean hasCancelled;
private final AtomicBoolean cancelInProcess;
public MultiFragmentsPipelineTask(
CoordinatorContext coordinatorContext, Backend backend, BackendServiceProxy backendClientProxy,
ByteString serializeFragments,
Map<Integer, SingleFragmentPipelineTask> fragmentTasks) {
super(new ChildrenRuntimeTasks<>(fragmentTasks));
this.coordinatorContext = Objects.requireNonNull(coordinatorContext, "coordinatorContext can not be null");
this.backend = Objects.requireNonNull(backend, "backend can not be null");
this.backendClientProxy = Objects.requireNonNull(backendClientProxy, "backendClientProxy can not be null");
this.serializeFragments = Objects.requireNonNull(
serializeFragments, "serializeFragments can not be null"
);
this.hasCancelled = new AtomicBoolean();
this.cancelInProcess = new AtomicBoolean();
}
public Future<PExecPlanFragmentResult> sendPhaseOneRpc(boolean twoPhaseExecution) {
return execRemoteFragmentsAsync(
backendClientProxy, serializeFragments, backend.getBrpcAddress(), twoPhaseExecution
);
}
public Future<PExecPlanFragmentResult> sendPhaseTwoRpc() {
return execPlanFragmentStartAsync(backendClientProxy, backend.getBrpcAddress());
}
@Override
public String toString() {
TNetworkAddress brpcAddress = backend.getBrpcAddress();
return "MultiFragmentsPipelineTask(Backend " + backend.getId()
+ "(" + brpcAddress.getHostname() + ":" + brpcAddress.getPort() + "): ["
+ childrenTasks.allTasks()
.stream()
.map(singleFragment -> "F" + singleFragment.getFragmentId())
.collect(Collectors.joining(", ")) + "])";
}
public synchronized void cancelExecute(Status cancelReason) {
TUniqueId queryId = coordinatorContext.queryId;
if (LOG.isDebugEnabled()) {
LOG.debug("cancelRemoteFragments backend: {}, query={}, reason: {}",
backend, DebugUtil.printId(queryId), cancelReason.toString());
}
if (this.hasCancelled.get() || this.cancelInProcess.get()) {
LOG.info("Frangment has already been cancelled. Query {} backend: {}",
DebugUtil.printId(queryId), backend);
return;
}
try {
TNetworkAddress brpcAddress = backend.getBrpcAddress();
try {
ListenableFuture<PCancelPlanFragmentResult> cancelResult =
BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(
brpcAddress, queryId, cancelReason);
Futures.addCallback(cancelResult, new FutureCallback<PCancelPlanFragmentResult>() {
public void onSuccess(InternalService.PCancelPlanFragmentResult result) {
cancelInProcess.set(false);
if (result.hasStatus()) {
Status status = new Status(result.getStatus());
if (status.getErrorCode() == TStatusCode.OK) {
hasCancelled.set(true);
} else {
LOG.warn("Failed to cancel query {} backend: {}, reason: {}",
DebugUtil.printId(queryId), backend, status.toString());
}
} else {
LOG.warn("Failed to cancel query {} backend: {} reason: {}",
DebugUtil.printId(queryId), backend, "without status");
}
}
public void onFailure(Throwable t) {
cancelInProcess.set(false);
LOG.warn("Failed to cancel query {} backend: {}, reason: {}",
DebugUtil.printId(queryId), backend, cancelReason.toString(), t);
}
}, Coordinator.backendRpcCallbackExecutor);
cancelInProcess.set(true);
} catch (RpcException e) {
LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(),
brpcAddress.getPort());
SimpleScheduler.addToBlacklist(backend.getId(), e.getMessage());
}
} catch (Exception e) {
LOG.warn("catch a exception", e);
}
}
public Backend getBackend() {
return backend;
}
private Future<InternalService.PExecPlanFragmentResult> execRemoteFragmentsAsync(
BackendServiceProxy proxy, ByteString serializedFragments, TNetworkAddress brpcAddr,
boolean twoPhaseExecution) {
Preconditions.checkNotNull(serializedFragments);
try {
return proxy.execPlanFragmentsAsync(brpcAddr, serializedFragments, twoPhaseExecution);
} catch (RpcException e) {
// DO NOT throw exception here, return a complete future with error code,
// so that the following logic will cancel the fragment.
return futureWithException(e);
} finally {
// save memory
this.serializeFragments = null;
}
}
public Future<InternalService.PExecPlanFragmentResult> execPlanFragmentStartAsync(
BackendServiceProxy proxy, TNetworkAddress brpcAddr) {
TUniqueId queryId = coordinatorContext.queryId;
try {
PExecPlanFragmentStartRequest.Builder builder = PExecPlanFragmentStartRequest.newBuilder();
PUniqueId qid = PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
builder.setQueryId(qid);
return proxy.execPlanFragmentStartAsync(brpcAddr, builder.build());
} catch (RpcException e) {
// DO NOT throw exception here, return a complete future with error code,
// so that the following logic will cancel the fragment.
return futureWithException(e);
}
}
private Future<PExecPlanFragmentResult> futureWithException(RpcException e) {
return new Future<PExecPlanFragmentResult>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
@Override
public PExecPlanFragmentResult get() {
PExecPlanFragmentResult result = PExecPlanFragmentResult.newBuilder().setStatus(
Types.PStatus.newBuilder().addErrorMsgs(e.getMessage())
.setStatusCode(TStatusCode.THRIFT_RPC_ERROR.getValue()).build()).build();
return result;
}
@Override
public PExecPlanFragmentResult get(long timeout, TimeUnit unit) {
return get();
}
};
}
}