Job.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.jobs;

import org.apache.doris.nereids.memo.CopyInResult;
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.memo.Memo;
import org.apache.doris.nereids.metrics.CounterType;
import org.apache.doris.nereids.metrics.EventChannel;
import org.apache.doris.nereids.metrics.EventProducer;
import org.apache.doris.nereids.metrics.TracerSupplier;
import org.apache.doris.nereids.metrics.consumer.LogConsumer;
import org.apache.doris.nereids.metrics.enhancer.AddCounterEventEnhancer;
import org.apache.doris.nereids.metrics.event.CounterEvent;
import org.apache.doris.nereids.metrics.event.TransformEvent;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleSet;
import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.Statistics;

import com.google.common.base.Preconditions;

import java.util.BitSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
 * Abstract class for all job using for analyze and optimize query plan in Nereids.
 */
public abstract class Job implements TracerSupplier {
    // counter tracer to count expression transform times.
    protected static final EventProducer COUNTER_TRACER = new EventProducer(CounterEvent.class,
            EventChannel.getDefaultChannel()
                    .addEnhancers(new AddCounterEventEnhancer())
                    .addConsumers(new LogConsumer(CounterEvent.class, EventChannel.LOG)));
    protected JobType type;
    protected JobContext context;
    protected boolean once;
    protected final BitSet disableRules;

    protected Map<CTEId, Statistics> cteIdToStats;

    public Job(JobType type, JobContext context) {
        this(type, context, true);
    }

    /** job full parameter constructor */
    public Job(JobType type, JobContext context, boolean once) {
        this.type = type;
        this.context = context;
        this.once = once;
        this.disableRules = getDisableRules(context);
    }

    public void pushJob(Job job) {
        context.getScheduleContext().pushJob(job);
    }

    public RuleSet getRuleSet() {
        return context.getCascadesContext().getRuleSet();
    }

    public boolean isOnce() {
        return once;
    }

    public ConnectContext getConnectContext() {
        return context.getCascadesContext().getConnectContext();
    }

    public abstract void execute();

    public EventProducer getEventTracer() {
        throw new UnsupportedOperationException("get_event_tracer is unsupported");
    }

    protected Optional<CopyInResult> invokeRewriteRuleWithTrace(Rule rule, Plan before, Group targetGroup) {
        context.onInvokeRule(rule.getRuleType());
        COUNTER_TRACER.log(CounterEvent.of(Memo.getStateId(),
                CounterType.EXPRESSION_TRANSFORM, targetGroup, targetGroup.getLogicalExpression(), before));

        List<Plan> afters = rule.transform(before, context.getCascadesContext());
        Preconditions.checkArgument(afters.size() == 1);
        Plan after = afters.get(0);
        if (after == before) {
            return Optional.empty();
        }

        CopyInResult result = context.getCascadesContext()
                .getMemo()
                .copyIn(after, targetGroup, rule.isRewrite());

        if (result.generateNewExpression || result.correspondingExpression.getOwnerGroup() != targetGroup) {
            getEventTracer().log(TransformEvent.of(targetGroup.getLogicalExpression(), before, afters,
                            rule.getRuleType()), rule::isRewrite);
        }

        return Optional.of(result);
    }

    /**
     * count the job execution times of groupExpressions, all groupExpressions will be inclusive.
     * TODO: count a specific groupExpression.
     * @param groupExpression the groupExpression at current job.
     */
    protected void countJobExecutionTimesOfGroupExpressions(GroupExpression groupExpression) {
        COUNTER_TRACER.log(CounterEvent.of(Memo.getStateId(), CounterType.JOB_EXECUTION,
                groupExpression.getOwnerGroup(), groupExpression, groupExpression.getPlan()));
    }

    public static BitSet getDisableRules(JobContext context) {
        return context.getCascadesContext().getAndCacheDisableRules();
    }
}