NereidsCoordinator.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.ExecutionProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
import org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker;
import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.ResultFileSink;
import org.apache.doris.planner.ResultSink;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SchemaScanNode;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
import org.apache.doris.qe.runtime.LoadProcessor;
import org.apache.doris.qe.runtime.MultiFragmentsPipelineTask;
import org.apache.doris.qe.runtime.PipelineExecutionTask;
import org.apache.doris.qe.runtime.PipelineExecutionTaskBuilder;
import org.apache.doris.qe.runtime.QueryProcessor;
import org.apache.doris.qe.runtime.SingleFragmentPipelineTask;
import org.apache.doris.qe.runtime.ThriftPlansBuilder;
import org.apache.doris.resource.workloadgroup.QueryQueue;
import org.apache.doris.resource.workloadgroup.QueueToken;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TErrorTabletInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPipelineFragmentParamsList;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TTabletCommitInfo;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;

/** NereidsCoordinator */
public class NereidsCoordinator extends Coordinator {
    private static final Logger LOG = LogManager.getLogger(NereidsCoordinator.class);

    protected final CoordinatorContext coordinatorContext;

    protected volatile PipelineExecutionTask executionTask;

    private final boolean needEnqueue;

    // sql execution
    public NereidsCoordinator(ConnectContext context, Analyzer analyzer,
            NereidsPlanner planner, StatsErrorEstimator statsErrorEstimator) {
        super(context, analyzer, planner, statsErrorEstimator);

        this.coordinatorContext = CoordinatorContext.buildForSql(planner, this);
        this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext));
        this.needEnqueue = true;

        Preconditions.checkState(!planner.getFragments().isEmpty()
                && coordinatorContext.instanceNum.get() > 0, "Fragment and Instance can not be empty˚");
    }

    // broker load
    public NereidsCoordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable,
            List<PlanFragment> fragments, List<PipelineDistributedPlan> distributedPlans,
            List<ScanNode> scanNodes, String timezone, boolean loadZeroTolerance,
            boolean enableProfile) {
        super(jobId, queryId, descTable, fragments, scanNodes, timezone, loadZeroTolerance, enableProfile);
        this.coordinatorContext = CoordinatorContext.buildForLoad(
                this, jobId, queryId, fragments, distributedPlans, scanNodes,
                descTable, timezone, loadZeroTolerance, enableProfile
        );
        this.needEnqueue = false;

        Preconditions.checkState(!fragments.isEmpty()
                && coordinatorContext.instanceNum.get() > 0, "Fragment and Instance can not be empty˚");
    }

    @Override
    public void exec() throws Exception {
        enqueue(coordinatorContext.connectContext);

        processTopSink(coordinatorContext, coordinatorContext.topDistributedPlan);

        QeProcessorImpl.INSTANCE.registerInstances(coordinatorContext.queryId, coordinatorContext.instanceNum.get());

        Map<DistributedPlanWorker, TPipelineFragmentParamsList> workerToFragments
                = ThriftPlansBuilder.plansToThrift(coordinatorContext);
        executionTask = PipelineExecutionTaskBuilder.build(coordinatorContext, workerToFragments);
        executionTask.execute();
    }

    @Override
    public boolean isTimeout() {
        return System.currentTimeMillis() > coordinatorContext.timeoutDeadline.get();
    }

    @Override
    public void cancel(Status cancelReason) {
        coordinatorContext.getQueueToken().ifPresent(QueueToken::cancel);

        for (ScanNode scanNode : coordinatorContext.scanNodes) {
            scanNode.stop();
        }

        if (cancelReason.ok()) {
            throw new RuntimeException("Should use correct cancel reason, but it is " + cancelReason);
        }

        TUniqueId queryId = coordinatorContext.queryId;
        Status originQueryStatus = coordinatorContext.updateStatusIfOk(cancelReason);
        if (!originQueryStatus.ok()) {
            if (LOG.isDebugEnabled()) {
                // Print an error stack here to know why send cancel again.
                LOG.warn("Query {} already in abnormal status {}, but received cancel again,"
                                + "so that send cancel to BE again",
                        DebugUtil.printId(queryId), originQueryStatus.toString(),
                        new Exception("cancel failed"));
            }
        } else {
            LOG.warn("Cancel execution of query {}, this is a outside invoke, cancelReason {}",
                    DebugUtil.printId(queryId), cancelReason);
        }
        cancelInternal(cancelReason);
    }

    public QueryProcessor asQueryProcessor() {
        return coordinatorContext.asQueryProcessor();
    }

    public JobProcessor getJobProcessor() {
        return coordinatorContext.getJobProcessor();
    }

    public LoadProcessor asLoadProcessor() {
        return coordinatorContext.asLoadProcessor();
    }

    @Override
    public void setTWorkloadGroups(List<TPipelineWorkloadGroup> tWorkloadGroups) {
        coordinatorContext.setWorkloadGroups(tWorkloadGroups);
    }

    @Override
    public List<TPipelineWorkloadGroup> getTWorkloadGroups() {
        return coordinatorContext.getWorkloadGroups();
    }

    @Override
    public boolean isQueryCancelled() {
        return coordinatorContext.readCloneStatus().isCancelled();
    }

    @Override
    public RowBatch getNext() throws Exception {
        return coordinatorContext.asQueryProcessor().getNext();
    }

    public boolean isEos() {
        return coordinatorContext.asQueryProcessor().isEos();
    }

    @Override
    public long getNumReceivedRows() {
        return coordinatorContext.asQueryProcessor().getNumReceivedRows();
    }

    @Override
    public long getJobId() {
        JobProcessor jobProcessor = coordinatorContext.getJobProcessor();
        if (jobProcessor instanceof LoadProcessor) {
            return ((LoadProcessor) jobProcessor).jobId;
        }
        return -1L;
    }

    /*
     * Waiting the coordinator finish executing.
     * return false if waiting timeout.
     * return true otherwise.
     * NOTICE: return true does not mean that coordinator executed success,
     * the caller should check queryStatus for result.
     *
     * We divide the entire waiting process into multiple rounds,
     * with a maximum of 30 seconds per round. And after each round of waiting,
     * check the status of the BE. If the BE status is abnormal, the wait is ended
     * and the result is returned. Otherwise, continue to the next round of waiting.
     * This method mainly avoids the problem that the Coordinator waits for a long time
     * after some BE can no long return the result due to some exception, such as BE is down.
     */
    @Override
    public boolean join(int timeoutS) {
        return coordinatorContext.asLoadProcessor().join(timeoutS);
    }

    @Override
    public boolean isDone() {
        return coordinatorContext.asLoadProcessor().isDone();
    }

    @Override
    public void updateFragmentExecStatus(TReportExecStatusParams params) {
        coordinatorContext.getJobProcessor().updateFragmentExecStatus(params);
    }

    @Override
    public TUniqueId getQueryId() {
        return coordinatorContext.queryId;
    }

    @Override
    public TQueryOptions getQueryOptions() {
        return coordinatorContext.queryOptions;
    }

    @Override
    public Status getExecStatus() {
        return coordinatorContext.readCloneStatus();
    }

    @Override
    public void setQueryType(TQueryType type) {
        coordinatorContext.queryOptions.setQueryType(type);
    }

    @Override
    public void setLoadZeroTolerance(boolean loadZeroTolerance) {
        coordinatorContext.queryGlobals.setLoadZeroTolerance(loadZeroTolerance);
    }

    @Override
    public int getScanRangeNum() {
        return coordinatorContext.scanRangeNum.get();
    }

    @Override
    public ConnectContext getConnectContext() {
        return coordinatorContext.connectContext;
    }

    @Override
    public QueueToken getQueueToken() {
        return coordinatorContext.getQueueToken().orElse(null);
    }

    @Override
    public Map<String, String> getLoadCounters() {
        return coordinatorContext.asLoadProcessor().loadContext.getLoadCounters();
    }

    @Override
    public List<String> getDeltaUrls() {
        return coordinatorContext.asLoadProcessor().loadContext.getDeltaUrls();
    }

    @Override
    public List<TTabletCommitInfo> getCommitInfos() {
        return coordinatorContext.asLoadProcessor().loadContext.getCommitInfos();
    }

    @Override
    public List<String> getExportFiles() {
        return coordinatorContext.asLoadProcessor().loadContext.getExportFiles();
    }

    @Override
    public long getTxnId() {
        return coordinatorContext.asLoadProcessor().loadContext.getTransactionId();
    }

    @Override
    public void setTxnId(long txnId) {
        coordinatorContext.asLoadProcessor().loadContext.updateTransactionId(txnId);
    }

    @Override
    public String getLabel() {
        return coordinatorContext.asLoadProcessor().loadContext.getLabel();
    }

    @Override
    public String getTrackingUrl() {
        return coordinatorContext.asLoadProcessor().loadContext.getTrackingUrl();
    }

    @Override
    public List<TErrorTabletInfo> getErrorTabletInfos() {
        return coordinatorContext.asLoadProcessor().loadContext.getErrorTabletInfos();
    }

    @Override
    public List<TNetworkAddress> getInvolvedBackends() {
        return Utils.fastToImmutableList(coordinatorContext.backends.get().keySet());
    }

    @Override
    public List<FragmentInstanceInfo> getFragmentInstanceInfos() {
        List<QueryStatisticsItem.FragmentInstanceInfo> infos = Lists.newArrayList();
        if (executionTask != null) {
            for (MultiFragmentsPipelineTask multiFragmentsPipelineTask : executionTask.getChildrenTasks().values()) {
                for (SingleFragmentPipelineTask fragmentTask : multiFragmentsPipelineTask.getChildrenTasks().values()) {
                    infos.addAll(fragmentTask.buildFragmentInstanceInfo());
                }
            }
        }
        infos.sort(Comparator.comparing(FragmentInstanceInfo::getFragmentId));
        return infos;
    }

    @Override
    public List<PlanFragment> getFragments() {
        return coordinatorContext.fragments;
    }

    @Override
    public ExecutionProfile getExecutionProfile() {
        return coordinatorContext.executionProfile;
    }

    @Override
    public void setMemTableOnSinkNode(boolean enableMemTableOnSinkNode) {
        coordinatorContext.queryOptions.setEnableMemtableOnSinkNode(enableMemTableOnSinkNode);
    }

    @Override
    public void setBatchSize(int batchSize) {
        coordinatorContext.queryOptions.setBatchSize(batchSize);
    }

    @Override
    public void setTimeout(int timeout) {
        coordinatorContext.queryOptions.setQueryTimeout(timeout);
        coordinatorContext.queryOptions.setExecutionTimeout(timeout);
        if (coordinatorContext.queryOptions.getExecutionTimeout() < 1) {
            LOG.warn("try set timeout less than 1: {}", coordinatorContext.queryOptions.getExecutionTimeout());
        }
    }

    @Override
    public void setLoadMemLimit(long loadMemLimit) {
        coordinatorContext.queryOptions.setLoadMemLimit(loadMemLimit);
    }

    @Override
    public void setExecMemoryLimit(long execMemoryLimit) {
        coordinatorContext.queryOptions.setMemLimit(execMemoryLimit);
    }

    // this method is used to provide profile metrics: `Instances Num Per BE`
    @Override
    public Map<String, Integer> getBeToInstancesNum() {
        Map<String, Integer> result = Maps.newLinkedHashMap();
        if (executionTask != null) {
            for (MultiFragmentsPipelineTask beTasks : executionTask.getChildrenTasks().values()) {
                TNetworkAddress brpcAddress = beTasks.getBackend().getBrpcAddress();
                String brpcAddrString = brpcAddress.hostname.concat(":").concat("" + brpcAddress.port);
                result.put(brpcAddrString, beTasks.getChildrenTasks().size());
            }
        }
        return result;
    }

    @Override
    public void close() {
        // NOTE: all close method should be no exception
        if (coordinatorContext.getQueryQueue().isPresent() && coordinatorContext.getQueueToken().isPresent()) {
            try {
                coordinatorContext.getQueryQueue().get().releaseAndNotify(coordinatorContext.getQueueToken().get());
            } catch (Throwable t) {
                LOG.error("error happens when coordinator close ", t);
            }
        }

        try {
            for (ScanNode scanNode : coordinatorContext.scanNodes) {
                scanNode.stop();
            }
        } catch (Throwable t) {
            LOG.error("error happens when scannode stop ", t);
        }
    }

    protected void cancelInternal(Status cancelReason) {
        coordinatorContext.withLock(() -> coordinatorContext.getJobProcessor().cancel(cancelReason));
    }

    protected void processTopSink(
            CoordinatorContext coordinatorContext, PipelineDistributedPlan topPlan) throws AnalysisException {
        setForArrowFlight(coordinatorContext, topPlan);
        setForBroker(coordinatorContext, topPlan);
    }

    private void setForArrowFlight(CoordinatorContext coordinatorContext, PipelineDistributedPlan topPlan) {
        ConnectContext connectContext = coordinatorContext.connectContext;
        DataSink dataSink = coordinatorContext.dataSink;
        if (dataSink instanceof ResultSink || dataSink instanceof ResultFileSink) {
            if (connectContext != null && !connectContext.isReturnResultFromLocal()) {
                Preconditions.checkState(connectContext.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
                for (AssignedJob instance : topPlan.getInstanceJobs()) {
                    BackendWorker worker = (BackendWorker) instance.getAssignedWorker();
                    Backend backend = worker.getBackend();
                    if (backend.getArrowFlightSqlPort() < 0) {
                        throw new IllegalStateException("be arrow_flight_sql_port cannot be empty.");
                    }
                    TUniqueId finstId;
                    if (connectContext.getSessionVariable().enableParallelResultSink()) {
                        finstId = getQueryId();
                    } else {
                        finstId = instance.instanceId();
                    }
                    connectContext.addFlightSqlEndpointsLocation(new FlightSqlEndpointsLocation(finstId,
                            backend.getArrowFlightAddress(), backend.getBrpcAddress(),
                            topPlan.getFragmentJob().getFragment().getOutputExprs()));
                }
            }
        }
    }

    private void setForBroker(
            CoordinatorContext coordinatorContext, PipelineDistributedPlan topPlan) throws AnalysisException {
        DataSink dataSink = coordinatorContext.dataSink;
        if (dataSink instanceof ResultFileSink
                && ((ResultFileSink) dataSink).getStorageType() == StorageBackend.StorageType.BROKER) {
            // set the broker address for OUTFILE sink
            ResultFileSink topResultFileSink = (ResultFileSink) dataSink;
            DistributedPlanWorker worker = topPlan.getInstanceJobs().get(0).getAssignedWorker();
            FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
                    .getBroker(topResultFileSink.getBrokerName(), worker.host());
            topResultFileSink.setBrokerAddr(broker.host, broker.port);
        }
    }

    private void enqueue(ConnectContext context) throws UserException {
        // LoadTask does not have context, not controlled by queue now
        if (context != null && needEnqueue) {
            if (Config.enable_workload_group) {
                List<TPipelineWorkloadGroup> wgList = context.getEnv().getWorkloadGroupMgr().getWorkloadGroup(context);
                this.setTWorkloadGroups(wgList);
                if (shouldQueue(context)) {
                    Set<Long> wgIdSet = Sets.newHashSet();
                    for (TPipelineWorkloadGroup twg : wgList) {
                        wgIdSet.add(twg.getId());
                    }
                    QueryQueue queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(wgIdSet);
                    if (queryQueue == null) {
                        // This logic is actually useless, because when could not find query queue, it will
                        // throw exception during workload group manager.
                        throw new UserException("could not find query queue");
                    }
                    QueueToken queueToken = queryQueue.getToken(context.getSessionVariable().wgQuerySlotCount);
                    int queryTimeout = coordinatorContext.queryOptions.getExecutionTimeout() * 1000;
                    coordinatorContext.setQueueInfo(queryQueue, queueToken);
                    queueToken.get(DebugUtil.printId(coordinatorContext.queryId), queryTimeout);
                }
            } else {
                context.setWorkloadGroupName("");
            }
        }
    }

    private boolean shouldQueue(ConnectContext context) {
        boolean ret = Config.enable_query_queue && !context.getSessionVariable()
                .getBypassWorkloadGroup() && !isQueryCancelled();
        if (!ret) {
            return false;
        }
        // a query with ScanNode need not queue only when all its scan node is SchemaScanNode
        for (ScanNode scanNode : coordinatorContext.scanNodes) {
            if (!(scanNode instanceof SchemaScanNode)) {
                return true;
            }
        }
        return false;
    }

    private JobProcessor buildJobProcessor(CoordinatorContext coordinatorContext) {
        DataSink dataSink = coordinatorContext.dataSink;
        if ((dataSink instanceof ResultSink || dataSink instanceof ResultFileSink)) {
            return QueryProcessor.build(coordinatorContext);
        } else {
            // insert statement has jobId == -1
            return new LoadProcessor(coordinatorContext, -1L);
        }
    }

    @Override
    public void setIsProfileSafeStmt(boolean isSafe) {
        coordinatorContext.queryOptions.setEnableProfile(isSafe && coordinatorContext.queryOptions.isEnableProfile());
    }
}