AbstractJobProcessor.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;

import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.qe.runtime.BackendFragmentId;
import org.apache.doris.qe.runtime.MultiFragmentsPipelineTask;
import org.apache.doris.qe.runtime.PipelineExecutionTask;
import org.apache.doris.qe.runtime.SingleFragmentPipelineTask;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;

/** AbstractJobProcessor */
public abstract class AbstractJobProcessor implements JobProcessor {
    private final Logger logger = LogManager.getLogger(getClass());

    protected final CoordinatorContext coordinatorContext;
    protected volatile Optional<PipelineExecutionTask> executionTask;
    protected volatile Optional<Map<BackendFragmentId, SingleFragmentPipelineTask>> backendFragmentTasks;

    public AbstractJobProcessor(CoordinatorContext coordinatorContext) {
        this.coordinatorContext = Objects.requireNonNull(coordinatorContext, "coordinatorContext can not be null");
        this.executionTask = Optional.empty();
        this.backendFragmentTasks = Optional.empty();
    }

    protected abstract void doProcessReportExecStatus(
            TReportExecStatusParams params, SingleFragmentPipelineTask fragmentTask);

    @Override
    public final void setPipelineExecutionTask(PipelineExecutionTask pipelineExecutionTask) {
        Preconditions.checkArgument(pipelineExecutionTask != null, "sqlPipelineTask can not be null");

        this.executionTask = Optional.of(pipelineExecutionTask);
        Map<BackendFragmentId, SingleFragmentPipelineTask> backendFragmentTasks
                = buildBackendFragmentTasks(pipelineExecutionTask);
        this.backendFragmentTasks = Optional.of(backendFragmentTasks);

        afterSetPipelineExecutionTask(pipelineExecutionTask);
    }

    protected void afterSetPipelineExecutionTask(PipelineExecutionTask pipelineExecutionTask) {}

    @Override
    public final void updateFragmentExecStatus(TReportExecStatusParams params) {
        SingleFragmentPipelineTask fragmentTask = backendFragmentTasks.get().get(
                new BackendFragmentId(params.getBackendId(), params.getFragmentId()));
        if (fragmentTask == null || !fragmentTask.processReportExecStatus(params)) {
            return;
        }

        TUniqueId queryId = coordinatorContext.queryId;
        Status status = new Status(params.status);
        // for now, abort the query if we see any error except if the error is cancelled
        // and returned_all_results_ is true.
        // (UpdateStatus() initiates cancellation, if it hasn't already been initiated)
        if (!status.ok()) {
            if (coordinatorContext.isEos() && status.isCancelled()) {
                logger.warn("Query {} has returned all results, fragment_id={} instance_id={}, be={}"
                                + " is reporting failed status {}",
                        DebugUtil.printId(queryId), params.getFragmentId(),
                        DebugUtil.printId(params.getFragmentInstanceId()),
                        params.getBackendId(),
                        status.toString());
            } else {
                logger.warn("one instance report fail, query_id={} fragment_id={} instance_id={}, be={},"
                                + " error message: {}",
                        DebugUtil.printId(queryId), params.getFragmentId(),
                        DebugUtil.printId(params.getFragmentInstanceId()),
                        params.getBackendId(), status.toString());
                coordinatorContext.updateStatusIfOk(status);
            }
        }
        doProcessReportExecStatus(params, fragmentTask);
    }

    private Map<BackendFragmentId, SingleFragmentPipelineTask> buildBackendFragmentTasks(
            PipelineExecutionTask executionTask) {
        ImmutableMap.Builder<BackendFragmentId, SingleFragmentPipelineTask> backendFragmentTasks
                = ImmutableMap.builder();
        for (Entry<Long, MultiFragmentsPipelineTask> backendTask : executionTask.getChildrenTasks().entrySet()) {
            Long backendId = backendTask.getKey();
            for (Entry<Integer, SingleFragmentPipelineTask> fragmentIdToTask : backendTask.getValue()
                    .getChildrenTasks().entrySet()) {
                Integer fragmentId = fragmentIdToTask.getKey();
                SingleFragmentPipelineTask fragmentTask = fragmentIdToTask.getValue();
                backendFragmentTasks.put(new BackendFragmentId(backendId, fragmentId), fragmentTask);
            }
        }
        return backendFragmentTasks.build();
    }
}