Optimizer.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.jobs.executor;

import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.cascades.DeriveStatsJob;
import org.apache.doris.nereids.jobs.cascades.OptimizeGroupJob;
import org.apache.doris.nereids.jobs.joinorder.JoinOrderJob;
import org.apache.doris.nereids.jobs.rewrite.RewriteJob;
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.Memo;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.analysis.CheckAfterRewrite;
import org.apache.doris.nereids.rules.rewrite.AdjustNullable;
import org.apache.doris.nereids.rules.rewrite.ColumnPruning;
import org.apache.doris.nereids.rules.rewrite.MergeProjectable;
import org.apache.doris.nereids.rules.rewrite.PushDownExpressionsInHashCondition;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.util.MoreFieldsThread;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import java.util.Objects;

/**
 * Cascades style optimize:
 * Perform equivalent logical plan exploration and physical implementation enumeration,
 * try to find best plan under the guidance of statistic information and cost model.
 */
public class Optimizer {

    private final CascadesContext cascadesContext;

    public Optimizer(CascadesContext cascadesContext) {
        this.cascadesContext = Objects.requireNonNull(cascadesContext, "cascadesContext cannot be null");
    }

    /**
     * execute optimize, use dphyp or cascades according to join number and session variables.
     */
    public void execute() {
        MoreFieldsThread.keepFunctionSignature(() -> {
            // init memo
            cascadesContext.toMemo();
            // stats derive
            cascadesContext.getMemo().getRoot().getLogicalExpressions()
                    .forEach(groupExpression -> cascadesContext.pushJob(
                            new DeriveStatsJob(groupExpression, cascadesContext.getCurrentJobContext())));
            cascadesContext.getJobScheduler().executeJobPool(cascadesContext);
            if (cascadesContext.getStatementContext().isDpHyp() || isDpHyp(cascadesContext)) {
                // RightNow, dp hyper can only order 64 join operators
                dpHypOptimize();
                cascadesContext.getStatementContext().setDpHyp(false);
                cascadesContext.getStatementContext().setAfterDpHyper(true);
            }
            cascadesContext.pushJob(
                    new OptimizeGroupJob(cascadesContext.getMemo().getRoot(), cascadesContext.getCurrentJobContext()));
            cascadesContext.getJobScheduler().executeJobPool(cascadesContext);

            return null;
        });
    }

    /**
     * This method calc the result that if use dp hyper or not
     */
    public static boolean isDpHyp(CascadesContext cascadesContext) {
        boolean optimizeWithUnknownColStats = false;
        if (ConnectContext.get() != null && ConnectContext.get().getStatementContext() != null) {
            if (ConnectContext.get().getStatementContext().isHasUnknownColStats()) {
                optimizeWithUnknownColStats = true;
            }
        }
        // DPHyp optimize
        SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable();
        int maxTableCount = sessionVariable.getMaxTableCountUseCascadesJoinReorder();
        if (optimizeWithUnknownColStats) {
            // if column stats are unknown, 10~20 table-join is optimized by cascading framework
            maxTableCount = 2 * maxTableCount;
        }
        int continuousJoinNum = Memo.countMaxContinuousJoin(cascadesContext.getRewritePlan());
        cascadesContext.getStatementContext().setMaxContinuousJoin(continuousJoinNum);
        boolean isDpHyp = sessionVariable.enableDPHypOptimizer || continuousJoinNum > maxTableCount;
        boolean finalEnableDpHyp = !sessionVariable.isDisableJoinReorder()
                && !cascadesContext.isLeadingDisableJoinReorder()
                && continuousJoinNum <= sessionVariable.getMaxJoinNumberOfReorder()
                && isDpHyp;
        cascadesContext.getStatementContext().setDpHyp(finalEnableDpHyp);
        return finalEnableDpHyp;
    }

    private void dpHypOptimize() {
        Group root = cascadesContext.getMemo().getRoot();
        // Due to EnsureProjectOnTopJoin, root group can't be Join Group, so DPHyp doesn't change the root group
        cascadesContext.pushJob(new JoinOrderJob(root, cascadesContext.getCurrentJobContext()));
        cascadesContext.getJobScheduler().executeJobPool(cascadesContext);

        // 1) copy out logical plan from memo
        Plan plan = cascadesContext.getMemo().copyOutBestLogicalPlan();

        // 2) run PushDownExpressionsInHashCondition as a plan rewrite on a temporary context
        org.apache.doris.nereids.CascadesContext tempCtx = CascadesContext.newCurrentTreeContext(cascadesContext);
        tempCtx.setRewritePlan(plan);
        RewriteJob pushDownRewrite = AbstractBatchJobExecutor.topDown(new PushDownExpressionsInHashCondition(),
                new MergeProjectable());
        RewriteJob columnPrune = AbstractBatchJobExecutor.custom(RuleType.COLUMN_PRUNING, ColumnPruning::new);
        RewriteJob adjustNullable = AbstractBatchJobExecutor.custom(RuleType.ADJUST_NULLABLE,
                () -> new AdjustNullable(false));
        RewriteJob checkAfterRewrite = AbstractBatchJobExecutor.bottomUp(new CheckAfterRewrite());
        AbstractBatchJobExecutor executor = new AbstractBatchJobExecutor(tempCtx) {
            @Override
            public java.util.List<org.apache.doris.nereids.jobs.rewrite.RewriteJob> getJobs() {
                return com.google.common.collect.ImmutableList.of(pushDownRewrite, columnPrune, adjustNullable,
                        checkAfterRewrite);
            }
        };
        boolean oldFeDebugValue = tempCtx.getStatementContext().getConnectContext().getSessionVariable().feDebug;
        try {
            tempCtx.getStatementContext().getConnectContext().getSessionVariable().feDebug = false;
            executor.execute();
        } finally {
            tempCtx.getStatementContext().getConnectContext().getSessionVariable().feDebug = oldFeDebugValue;
        }

        // 3) copy rewritten plan into the main cascades context and rebuild memo
        Plan rewritten = tempCtx.getRewritePlan();
        cascadesContext.releaseMemo();
        cascadesContext.setRewritePlan(rewritten);
        // init memo
        cascadesContext.toMemo();
        // stats derive
        cascadesContext.getMemo().getRoot().getLogicalExpressions().forEach(groupExpression -> cascadesContext.pushJob(
                new DeriveStatsJob(groupExpression, cascadesContext.getCurrentJobContext())));
        cascadesContext.getJobScheduler().executeJobPool(cascadesContext);
    }

    private SessionVariable getSessionVariable() {
        return cascadesContext.getConnectContext().getSessionVariable();
    }
}