ResultReceiver.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.rpc.TCustomProtocolFactory;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultBatch;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ResultReceiver {
    private static final Logger LOG = LogManager.getLogger(ResultReceiver.class);
    private boolean isDone = false;
    // runStatus represents the running status of the ResultReceiver.
    // If it is not "OK," it indicates cancel.
    private Status runStatus = new Status();
    private long packetIdx = 0;
    private long timeoutTs = 0;
    private TNetworkAddress address;
    private Types.PUniqueId queryId;
    private Types.PUniqueId instanceId;
    private Long backendId;
    private Thread currentThread;
    private volatile Future<InternalService.PFetchDataResult> fetchDataAsyncFuture = null;
    private Boolean enableParallelResultSink = false;
    int maxMsgSizeOfResultReceiver;
    public ResultReceiver(TUniqueId queryId, TUniqueId tid, Long backendId, TNetworkAddress address, long timeoutTs,
            int maxMsgSizeOfResultReceiver, Boolean enableParallelResultSink) {
        this.queryId = Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build();
        this.instanceId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
        this.backendId = backendId;
        this.address = address;
        this.timeoutTs = timeoutTs;
        this.maxMsgSizeOfResultReceiver = maxMsgSizeOfResultReceiver;
        this.enableParallelResultSink = enableParallelResultSink;
    }
    Types.PUniqueId getRealFinstId() {
        if (enableParallelResultSink) {
            return queryId;
        }
        return instanceId;
    }
    public void createFuture(
            FutureCallback<InternalService.PFetchDataResult> callback) throws RpcException {
        InternalService.PFetchDataRequest request = InternalService.PFetchDataRequest.newBuilder()
                .setFinstId(getRealFinstId())
                .setRespInAttachment(false)
                .build();
        try {
            fetchDataAsyncFuture = BackendServiceProxy.getInstance().fetchDataAsyncWithCallback(address, request,
                    callback);
        } catch (RpcException e) {
            LOG.warn("fetch result rpc exception, finstId={}", DebugUtil.printId(getRealFinstId()), e);
            SimpleScheduler.addToBlacklist(backendId, e.getMessage());
            throw e;
        }
    }
    public RowBatch getNext(Status status) throws TException {
        if (isDone) {
            return null;
        }
        final RowBatch rowBatch = new RowBatch();
        try {
            while (!isDone && runStatus.ok()) {
                currentThread = Thread.currentThread();
                Preconditions.checkNotNull(fetchDataAsyncFuture);
                InternalService.PFetchDataResult pResult = null;
                while (pResult == null) {
                    long currentTs = System.currentTimeMillis();
                    if (currentTs >= timeoutTs) {
                        throw new TimeoutException("query timeout, query id = " + DebugUtil.printId(this.queryId));
                    }
                    try {
                        pResult = fetchDataAsyncFuture.get(timeoutTs - currentTs, TimeUnit.MILLISECONDS);
                    } catch (CancellationException e) {
                        // When get this exception, it means another thread call cancel, so that the run status
                        // should be set already.
                        LOG.warn("Future of ResultReceiver of query {} is cancelled", DebugUtil.printId(this.queryId));
                        if (runStatus.ok()) {
                            LOG.warn("ResultReceiver is not set to cancelled state, this should not happen");
                        } else {
                            status.updateStatus(runStatus.getErrorCode(), runStatus.getErrorMsg());
                            return null;
                        }
                    } catch (TimeoutException e) {
                        String timeoutReason = "Query " + DebugUtil.printId(this.queryId) + " get result timeout"
                                + ", get result duration " + (timeoutTs - currentTs) + " ms";
                        LOG.warn(timeoutReason);
                        runStatus.updateStatus(TStatusCode.TIMEOUT, timeoutReason);
                        status.updateStatus(runStatus.getErrorCode(), runStatus.getErrorMsg());
                        return null;
                    } catch (InterruptedException e) {
                        // continue to get result
                        LOG.warn("Future of ResultReceiver of query {} got interrupted Exception",
                                DebugUtil.printId(this.queryId), e);
                        // If runstatus != ok, then no need to update it, may overwrite the actual cancel reason.
                        if (runStatus.ok()) {
                            runStatus.updateStatus(TStatusCode.INTERNAL_ERROR, "got interrupted Exception");
                        }
                        status.updateStatus(runStatus.getErrorCode(), runStatus.getErrorMsg());
                        return null;
                    }
                }
                Status resultStatus = new Status(pResult.getStatus());
                if (resultStatus.getErrorCode() != TStatusCode.OK) {
                    status.updateStatus(resultStatus.getErrorCode(), resultStatus.getErrorMsg());
                    return null;
                }
                rowBatch.setQueryStatistics(pResult.getQueryStatistics());
                if (packetIdx != pResult.getPacketSeq()) {
                    LOG.warn("finstId={}, receive packet failed, expect={}, receive={}",
                            DebugUtil.printId(getRealFinstId()), packetIdx, pResult.getPacketSeq());
                    status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, "receive error packet");
                    return null;
                }
                packetIdx++;
                isDone = pResult.getEos();
                if (pResult.hasEmptyBatch() && pResult.getEmptyBatch()) {
                    LOG.info("finstId={}, get first empty rowbatch", DebugUtil.printId(getRealFinstId()));
                    rowBatch.setEos(false);
                    return rowBatch;
                } else if (pResult.hasRowBatch() && pResult.getRowBatch().size() > 0) {
                    byte[] serialResult = pResult.getRowBatch().toByteArray();
                    TResultBatch resultBatch = new TResultBatch();
                    TDeserializer deserializer = new TDeserializer(
                            new TCustomProtocolFactory(this.maxMsgSizeOfResultReceiver));
                    try {
                        deserializer.deserialize(resultBatch, serialResult);
                    } catch (TException e) {
                        if (e.getMessage().contains("MaxMessageSize reached")) {
                            throw new TException(
                                    "MaxMessageSize reached, try increase max_msg_size_of_result_receiver");
                        } else {
                            throw e;
                        }
                    }
                    rowBatch.setBatch(resultBatch);
                    rowBatch.setEos(pResult.getEos());
                    return rowBatch;
                }
            }
        } catch (ExecutionException e) {
            LOG.warn("fetch result execution exception, finstId={}", DebugUtil.printId(getRealFinstId()), e);
            if (e.getMessage().contains("time out")) {
                // if timeout, we set error code to TIMEOUT, and it will not retry querying.
                status.updateStatus(TStatusCode.TIMEOUT, e.getMessage());
            } else {
                status.updateStatus(TStatusCode.THRIFT_RPC_ERROR, e.getMessage());
                SimpleScheduler.addToBlacklist(backendId, e.getMessage());
            }
        } catch (TimeoutException e) {
            LOG.warn("fetch result timeout, finstId={}", DebugUtil.printId(getRealFinstId()), e);
            status.updateStatus(TStatusCode.TIMEOUT, "Query timeout");
        } finally {
            synchronized (this) {
                currentThread = null;
            }
        }
        if (!runStatus.ok()) {
            status.updateStatus(runStatus.getErrorCode(), runStatus.getErrorMsg());
        }
        return rowBatch;
    }
    public synchronized void cancel(Status reason) {
        if (!runStatus.ok()) {
            LOG.info("ResultReceiver of query {} cancel failed, because its status not ok, "
                    + "maybe cancelled already. current run status is {}, new status is {}.",
                    DebugUtil.printId(queryId), runStatus.toString(), reason.toString());
            return;
        }
        runStatus.updateStatus(reason.getErrorCode(), reason.getErrorMsg());
        if (currentThread != null) {
            // TODO(cmy): we cannot interrupt this thread, or we may throw
            // java.nio.channels.ClosedByInterruptException when we call
            // MysqlChannel.realNetSend -> SocketChannelImpl.write
            // And user will lost connection to Palo
            // currentThread.interrupt();
        }
        if (fetchDataAsyncFuture != null) {
            if (fetchDataAsyncFuture.cancel(true)) {
                LOG.info("ResultReceiver of query {} is cancelled, reason is {}",
                        DebugUtil.printId(queryId), reason.toString());
            } else {
                LOG.warn("ResultReceiver of query {} cancel failed, typically means the future is finished, "
                        + "cancel reason is {}",
                        DebugUtil.printId(queryId), reason.toString());
            }
        }
    }
}