PipelineExecutionTask.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe.runtime;

import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
import org.apache.doris.qe.CoordinatorContext;
import org.apache.doris.qe.SimpleScheduler;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.joda.time.DateTime;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

/**
 * SqlPipelineTask.
 *
 * This class is used to describe which backend process which fragments
 */
public class PipelineExecutionTask extends AbstractRuntimeTask<Long, MultiFragmentsPipelineTask> {
    private static final Logger LOG = LogManager.getLogger(PipelineExecutionTask.class);

    // immutable parameters
    private final long timeoutDeadline;
    private final CoordinatorContext coordinatorContext;
    private final BackendServiceProxy backendServiceProxy;

    // mutable states
    public PipelineExecutionTask(
            CoordinatorContext coordinatorContext,
            BackendServiceProxy backendServiceProxy,
            Map<Long, MultiFragmentsPipelineTask> fragmentTasks) {
        // insert into stmt need latch to wait finish, but query stmt not need because result receiver can wait finish
        super(new ChildrenRuntimeTasks<>(fragmentTasks));
        this.coordinatorContext = Objects.requireNonNull(coordinatorContext, "coordinatorContext can not be null");
        this.backendServiceProxy = Objects.requireNonNull(backendServiceProxy, "backendServiceProxy can not be null");
        this.timeoutDeadline = coordinatorContext.timeoutDeadline.get();

        // flatten to fragment tasks to quickly index by BackendFragmentId, when receive the report message
        ImmutableMap.Builder<BackendFragmentId, SingleFragmentPipelineTask> backendFragmentTasks
                = ImmutableMap.builder();
        for (Entry<Long, MultiFragmentsPipelineTask> backendTask : fragmentTasks.entrySet()) {
            Long backendId = backendTask.getKey();
            for (Entry<Integer, SingleFragmentPipelineTask> fragmentIdToTask : backendTask.getValue()
                    .getChildrenTasks().entrySet()) {
                Integer fragmentId = fragmentIdToTask.getKey();
                SingleFragmentPipelineTask fragmentTask = fragmentIdToTask.getValue();
                backendFragmentTasks.put(new BackendFragmentId(backendId, fragmentId), fragmentTask);
            }
        }
    }

    @Override
    public void execute() throws Exception {
        coordinatorContext.withLock(() -> {
            sendAndWaitPhaseOneRpc();
            if (coordinatorContext.twoPhaseExecution()) {
                sendAndWaitPhaseTwoRpc();
            }
            return null;
        });
    }

    @Override
    public String toString() {
        return "PipelineExecutionTask(\n"
                + childrenTasks.allTasks()
                    .stream()
                    .map(multiFragmentsPipelineTask -> "  " + multiFragmentsPipelineTask)
                    .collect(Collectors.joining(",\n"))
                + "\n)";
    }

    private void sendAndWaitPhaseOneRpc() throws UserException, RpcException {
        List<RpcInfo> rpcs = Lists.newArrayList();
        for (MultiFragmentsPipelineTask fragmentsTask : childrenTasks.allTasks()) {
            rpcs.add(new RpcInfo(
                    fragmentsTask,
                    DateTime.now().getMillis(),
                    fragmentsTask.sendPhaseOneRpc(coordinatorContext.twoPhaseExecution()))
            );
        }
        Map<TNetworkAddress, List<Long>> rpcPhase1Latency = waitPipelineRpc(rpcs,
                timeoutDeadline - System.currentTimeMillis(), "send fragments");

        coordinatorContext.updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(rpcs.size()));
        coordinatorContext.updateProfileIfPresent(SummaryProfile::setFragmentSendPhase1Time);
        coordinatorContext.updateProfileIfPresent(profile -> profile.setRpcPhase1Latency(rpcPhase1Latency));
    }

    private void sendAndWaitPhaseTwoRpc() throws RpcException, UserException {
        List<RpcInfo> rpcs = Lists.newArrayList();
        for (MultiFragmentsPipelineTask fragmentTask : childrenTasks.allTasks()) {
            rpcs.add(new RpcInfo(
                    fragmentTask,
                    DateTime.now().getMillis(),
                    fragmentTask.sendPhaseTwoRpc())
            );
        }

        Map<TNetworkAddress, List<Long>> rpcPhase2Latency = waitPipelineRpc(rpcs,
                timeoutDeadline - System.currentTimeMillis(), "send execution start");
        coordinatorContext.updateProfileIfPresent(profile -> profile.updateFragmentRpcCount(rpcs.size()));
        coordinatorContext.updateProfileIfPresent(SummaryProfile::setFragmentSendPhase2Time);
        coordinatorContext.updateProfileIfPresent(profile -> profile.setRpcPhase2Latency(rpcPhase2Latency));
    }

    private Map<TNetworkAddress, List<Long>> waitPipelineRpc(
            List<RpcInfo> rpcs,
            long leftTimeMs, String operation) throws RpcException, UserException {
        TQueryOptions queryOptions = coordinatorContext.queryOptions;
        TUniqueId queryId = coordinatorContext.queryId;

        if (leftTimeMs <= 0) {
            long currentTimeMillis = System.currentTimeMillis();
            long elapsed = (currentTimeMillis - timeoutDeadline) / 1000 + queryOptions.getExecutionTimeout();
            String msg = String.format(
                    "timeout before waiting %s rpc, query timeout:%d sec, already elapsed:%d sec, "
                    + "left for this:%d ms",
                    operation, queryOptions.getExecutionTimeout(), elapsed, leftTimeMs);
            LOG.warn("Query {} {}", DebugUtil.printId(queryId), msg);
            if (!queryOptions.isSetExecutionTimeout() || !queryOptions.isSetQueryTimeout()) {
                LOG.warn("Query {} does not set timeout info, execution timeout: is_set:{}, value:{}"
                                + ", query timeout: is_set:{}, value: {}, "
                                + "coordinator timeout deadline {}, cur time millis: {}",
                        DebugUtil.printId(queryId),
                        queryOptions.isSetExecutionTimeout(), queryOptions.getExecutionTimeout(),
                        queryOptions.isSetQueryTimeout(), queryOptions.getQueryTimeout(),
                        timeoutDeadline, currentTimeMillis);
            }
            throw new UserException(msg);
        }

        // BE -> (RPC latency from FE to BE, Execution latency on bthread, Duration of doing work, RPC latency from BE
        // to FE)
        Map<TNetworkAddress, List<Long>> beToPrepareLatency = new HashMap<>();
        long timeoutMs = Math.min(leftTimeMs, Config.remote_fragment_exec_timeout_ms);
        for (RpcInfo rpc : rpcs) {
            TStatusCode code;
            String errMsg = null;
            Exception exception = null;

            Backend backend = rpc.task.getBackend();
            long beId = backend.getId();
            TNetworkAddress brpcAddress = backend.getBrpcAddress();

            try {
                PExecPlanFragmentResult result = rpc.future.get(timeoutMs, TimeUnit.MILLISECONDS);
                long rpcDone = DateTime.now().getMillis();
                beToPrepareLatency.put(brpcAddress,
                        Lists.newArrayList(result.getReceivedTime() - rpc.startTime,
                                result.getExecutionTime() - result.getReceivedTime(),
                                result.getExecutionDoneTime() - result.getExecutionTime(),
                                rpcDone - result.getExecutionDoneTime()));
                code = TStatusCode.findByValue(result.getStatus().getStatusCode());
                if (code == null) {
                    code = TStatusCode.INTERNAL_ERROR;
                }

                if (code != TStatusCode.OK) {
                    if (!result.getStatus().getErrorMsgsList().isEmpty()) {
                        errMsg = result.getStatus().getErrorMsgsList().get(0);
                    } else {
                        errMsg = operation + " failed. backend id: " + beId;
                    }
                }
            } catch (ExecutionException e) {
                exception = e;
                code = TStatusCode.THRIFT_RPC_ERROR;
                backendServiceProxy.removeProxy(brpcAddress);
            } catch (InterruptedException e) {
                exception = e;
                code = TStatusCode.INTERNAL_ERROR;
                backendServiceProxy.removeProxy(brpcAddress);
            } catch (TimeoutException e) {
                exception = e;
                errMsg = String.format(
                        "timeout when waiting for %s rpc, query timeout:%d, left timeout for this operation:%d",
                        operation, queryOptions.getExecutionTimeout(), timeoutMs / 1000);
                LOG.warn("Query {} {}", DebugUtil.printId(queryId), errMsg);
                code = TStatusCode.TIMEOUT;
                backendServiceProxy.removeProxy(brpcAddress);
            }

            if (code != TStatusCode.OK) {
                if (exception != null && errMsg == null) {
                    errMsg = operation + " failed. " + exception.getMessage();
                }
                Status cancelStatus = new Status(TStatusCode.INTERNAL_ERROR, errMsg);
                coordinatorContext.updateStatusIfOk(cancelStatus);
                coordinatorContext.cancelSchedule(cancelStatus);
                switch (code) {
                    case TIMEOUT:
                        MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(brpcAddress.hostname)
                                .increase(1L);
                        throw new RpcException(brpcAddress.hostname, errMsg, exception);
                    case THRIFT_RPC_ERROR:
                        MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(brpcAddress.hostname)
                                .increase(1L);
                        SimpleScheduler.addToBlacklist(beId, errMsg);
                        throw new RpcException(brpcAddress.hostname, errMsg, exception);
                    default:
                        throw new UserException(errMsg, exception);
                }
            }
        }
        return beToPrepareLatency;
    }

    private static class RpcInfo {
        public final MultiFragmentsPipelineTask task;
        public final long startTime;
        public final Future<PExecPlanFragmentResult> future;

        public RpcInfo(MultiFragmentsPipelineTask task, long startTime, Future<PExecPlanFragmentResult> future) {
            this.task = task;
            this.startTime = startTime;
            this.future = future;
        }
    }
}