StmtRewriter.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/StmtRewriter.java
// and modified by Doris

package org.apache.doris.analysis;

import org.apache.doris.catalog.AggStateType;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.TableAliasGenerator;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * Class representing a statement rewriter. A statement rewriter performs subquery
 * unnesting on an analyzed parse tree.
 */
public class StmtRewriter {
    private static final Logger LOG = LoggerFactory.getLogger(StmtRewriter.class);

    private static final String BITMAP_CONTAINS = "bitmap_contains";

    /**
     * Rewrite the statement of an analysis result. The unanalyzed rewritten
     * statement is returned.
     */
    public static StatementBase rewrite(Analyzer analyzer, StatementBase parsedStmt)
            throws AnalysisException {
        if (parsedStmt instanceof QueryStmt) {
            QueryStmt analyzedStmt = (QueryStmt) parsedStmt;
            Preconditions.checkNotNull(analyzedStmt.analyzer);
            return rewriteQueryStatement(analyzedStmt, analyzer);
        } else if (parsedStmt instanceof InsertStmt) {
            final InsertStmt insertStmt = (InsertStmt) parsedStmt;
            final QueryStmt analyzedStmt = (QueryStmt) insertStmt.getQueryStmt();
            Preconditions.checkNotNull(analyzedStmt.analyzer);
            QueryStmt rewrittenQueryStmt = rewriteQueryStatement(analyzedStmt, analyzer);
            insertStmt.setQueryStmt(rewrittenQueryStmt);
        } else {
            throw new AnalysisException("Unsupported statement containing subqueries: "
                    + parsedStmt.toSql());
        }
        return parsedStmt;
    }

    /**
     * Calls the appropriate equal method based on the specific type of query stmt. See
     * rewriteSelectStatement() and rewriteUnionStatement() documentation.
     */
    public static QueryStmt rewriteQueryStatement(QueryStmt stmt, Analyzer analyzer)
            throws AnalysisException {
        Preconditions.checkNotNull(stmt);
        if (stmt instanceof SelectStmt) {
            return rewriteSelectStatement((SelectStmt) stmt, analyzer);
        } else if (stmt instanceof SetOperationStmt) {
            rewriteUnionStatement((SetOperationStmt) stmt, analyzer);
        } else {
            throw new AnalysisException("Subqueries not supported for "
                    + stmt.getClass().getSimpleName() + " statements");
        }
        return stmt;
    }

    private static SelectStmt rewriteSelectStatement(SelectStmt stmt, Analyzer analyzer)
            throws AnalysisException {
        SelectStmt result = stmt;
        // Rewrite all the subqueries in the FROM clause.
        for (TableRef tblRef : result.fromClause) {
            if (!(tblRef instanceof InlineViewRef)) {
                continue;
            }
            InlineViewRef inlineViewRef = (InlineViewRef) tblRef;
            QueryStmt rewrittenQueryStmt = rewriteQueryStatement(inlineViewRef.getViewStmt(),
                    inlineViewRef.getAnalyzer());
            inlineViewRef.setViewStmt(rewrittenQueryStmt);
        }
        // Rewrite all the subqueries in the WHERE clause.
        if (result.hasWhereClause()) {
            // Push negation to leaf operands.
            result.whereClause = Expr.pushNegationToOperands(result.whereClause);
            if (ConnectContext.get() == null) {
                // Check if we can equal the subqueries in the WHERE clause. OR predicates with
                // subqueries are not supported.
                if (hasSubqueryInDisjunction(result.whereClause)) {
                    throw new AnalysisException("Subqueries in OR predicates are not supported: "
                            + result.whereClause.toSql());
                }
            }
            rewriteWhereClauseSubqueries(result, analyzer);
        }
        // Rewrite all subquery in the having clause
        if (result.getHavingClauseAfterAnalyzed() != null
                && result.getHavingClauseAfterAnalyzed().getSubquery() != null) {
            result = rewriteHavingClauseSubqueries(result, analyzer);
        }
        result.sqlString = null;
        if (LOG.isDebugEnabled()) {
            LOG.debug("rewritten stmt: " + result.toSql());
        }
        return result;
    }

    /**
     * Rewrite having subquery.
     * Step1: equal having subquery to where subquery
     * Step2: equal where subquery
     * <p>
     * For example:
     * select cs_item_sk, sum(cs_sales_price) from catalog_sales a group by cs_item_sk
     * having sum(cs_sales_price) >
     * (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk);
     *
     * <p>
     * Step1: equal having subquery to where subquery
     * Outer query is changed to inline view in rewritten query
     * Inline view of outer query:
     * from (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a
     * Rewritten subquery of expr:
     * where a.sum_cs_sales_price >
     * (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk)
     * Rewritten query:
     * select cs_item_sk, a.sum_cs_sales_price from
     * (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a
     * where a.sum_cs_sales_price >
     * (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk)
     * <p>
     * Step2: equal where subquery
     * Inline view of subquery:
     * from (select b.cs_item_sk, min(cs_sales_price) from catalog_sales b group by cs_item_sk) c
     * Rewritten correlated predicate:
     * where c.cs_item_sk = a.cs_item_sk and a.sum_cs_sales_price > c.min(cs_sales_price)
     * The final stmt:
     * select a.cs_item_sk, a.sum_cs_sales_price from
     * (select cs_item_sk, sum(cs_sales_price) sum_cs_sales_price from catalog_sales group by cs_item_sk) a
     * join
     * (select b.cs_item_sk, min(b.cs_sales_price) min_cs_sales_price from catalog_sales b group by b.cs_item_sk) c
     * where c.cs_item_sk = a.cs_item_sk and a.sum_cs_sales_price > c.min_cs_sales_price;
     *
     * @param stmt
     * @param analyzer
     */
    private static SelectStmt rewriteHavingClauseSubqueries(SelectStmt stmt, Analyzer analyzer)
            throws AnalysisException {
        // prepare parameters
        SelectList selectList = stmt.getSelectList();
        List<String> columnLabels = stmt.getColLabels();
        Expr havingClause = stmt.getHavingClauseAfterAnalyzed();
        List<FunctionCallExpr> aggregateExprs = stmt.getAggInfo().getAggregateExprs();
        Preconditions.checkState(havingClause != null);
        Preconditions.checkState(havingClause.getSubquery() != null);
        List<OrderByElement> orderByElements = stmt.getOrderByElementsAfterAnalyzed();
        LimitElement limitElement = new LimitElement(stmt.getOffset(), stmt.getLimit());
        TableAliasGenerator tableAliasGenerator = stmt.getTableAliasGenerator();

        /*
         * The outer query is changed to inline view without having predicate
         * For example:
         * Query: select cs_item_sk, sum(cs_sales_price) from catalog_sales a group by cs_item_sk having ...;
         * Inline view:
         *     from (select cs_item_sk $ColumnA, sum(cs_sales_price) $ColumnB
         *     from catalog_sales a group by cs_item_sk) $TableA
         *
         * Add missing aggregation columns in select list
         * For example:
         * Query: select cs_item_sk from catalog_sales a group by cs_item_sk having sum(cs_sales_price) > 1
         * SelectList: select cs_item_sk
         * AggregateExprs:  sum(cs_sales_price)
         * Add missing aggregation columns: select cs_item_sk, sum(cs_sales_price)
         * Inline view:
         *     from (select cs_item_sk $ColumnA, sum(cs_sales_price) $ColumnB from catalog_sales a group by
         *     cs_item_sk) $TableA
         */
        SelectStmt inlineViewQuery = (SelectStmt) stmt.clone();
        inlineViewQuery.reset();
        // the having, order by and limit should be move to outer query
        inlineViewQuery.removeHavingClause();
        inlineViewQuery.removeOrderByElements();
        inlineViewQuery.removeLimitElement();
        // add missing aggregation columns
        SelectList selectListOfInlineViewQuery = addMissingAggregationColumns(selectList, aggregateExprs);
        inlineViewQuery.setSelectList(selectListOfInlineViewQuery);
        // add a new alias for all of columns in subquery
        List<String> colAliasOfInlineView = Lists.newArrayList();
        List<Expr> leftExprList = Lists.newArrayList();
        for (int i = 0; i < selectListOfInlineViewQuery.getItems().size(); ++i) {
            leftExprList.add(selectListOfInlineViewQuery.getItems().get(i).getExpr().clone());
            colAliasOfInlineView.add(inlineViewQuery.getColumnAliasGenerator().getNextAlias());
        }
        InlineViewRef inlineViewRef = new InlineViewRef(tableAliasGenerator.getNextAlias(), inlineViewQuery,
                colAliasOfInlineView);
        try {
            inlineViewRef.analyze(analyzer);
        } catch (UserException e) {
            throw new AnalysisException(e.getMessage());
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Outer query is changed to {}", inlineViewRef.tableRefToSql());
        }

        /*
         * Columns which belong to outer query can substitute for output columns of inline view
         * For example:
         * Having clause: sum(cs_sales_price) >
         *                   (select min(cs_sales_price) from catalog_sales b where a.cs_item_sk = b.cs_item_sk);
         * Order by: sum(cs_sales_price), a.cs_item_sk
         * Select list: cs_item_sk, sum(cs_sales_price)
         * Columns which belong to outer query: sum(cs_sales_price), a.cs_item_sk
         * SMap: <cs_item_sk $ColumnA> <sum(cs_sales_price) $ColumnB>
         * After substitute
         * Having clause: $ColumnB >
         *                   (select min(cs_sales_price) from catalog_sales b where $ColumnA = b.cs_item_sk)
         * Order by: $ColumnB, $ColumnA
         * Select list: $ColumnA cs_item_sk, $ColumnB sum(cs_sales_price)
         */
        /*
         * Prepare select list of new query.
         * Generate a new select item for each original columns in select list
         */
        ExprSubstitutionMap smap = new ExprSubstitutionMap();
        List<SelectListItem> inlineViewItems = inlineViewQuery.getSelectList().getItems();
        for (int i = 0; i < inlineViewItems.size(); i++) {
            Expr leftExpr = leftExprList.get(i);
            Expr rightExpr = new SlotRef(inlineViewRef.getAliasAsName(), colAliasOfInlineView.get(i));
            rightExpr.analyze(analyzer);
            smap.put(leftExpr, rightExpr);
        }
        havingClause.reset();
        Expr newWherePredicate = havingClause.substitute(smap, analyzer, false);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Having predicate is changed to " + newWherePredicate.toSql());
        }
        ArrayList<OrderByElement> newOrderByElements = null;
        if (orderByElements != null) {
            newOrderByElements = Lists.newArrayList();
            for (OrderByElement orderByElement : orderByElements) {
                OrderByElement newOrderByElement = new OrderByElement(orderByElement.getExpr().reset().substitute(smap),
                        orderByElement.getIsAsc(), orderByElement.getNullsFirstParam());
                newOrderByElements.add(newOrderByElement);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Order by element is changed to " + newOrderByElement.toSql());
                }
            }
        }
        List<SelectListItem> newSelectItems = Lists.newArrayList();
        for (int i = 0; i < selectList.getItems().size(); i++) {
            SelectListItem newItem = new SelectListItem(selectList.getItems().get(i).getExpr().reset().substitute(smap),
                    columnLabels.get(i));
            newSelectItems.add(newItem);
            if (LOG.isDebugEnabled()) {
                LOG.debug("New select item is changed to " + newItem.toSql());
            }
        }
        SelectList newSelectList = new SelectList(newSelectItems, selectList.isDistinct());

        // construct rewritten query
        List<TableRef> newTableRefList = Lists.newArrayList();
        newTableRefList.add(inlineViewRef);
        FromClause newFromClause = new FromClause(newTableRefList);
        SelectStmt result = new SelectStmt(newSelectList, newFromClause, newWherePredicate, null, null,
                newOrderByElements, limitElement);
        result.setTableAliasGenerator(tableAliasGenerator);
        try {
            result.analyze(analyzer);
        } catch (UserException e) {
            throw new AnalysisException(e.getMessage());
        }
        LOG.info("New stmt {} is constructed after rewritten subquery of having clause.", result.toSql());

        // equal where subquery
        result = rewriteSelectStatement(result, analyzer);
        if (LOG.isDebugEnabled()) {
            LOG.debug("The final stmt is " + result.toSql());
        }
        return result;
    }

    /**
     * Add missing aggregation columns
     *
     * @param selectList:     select a, sum(v1)
     * @param aggregateExprs: sum(v1), sum(v2)
     * @return select a, sum(v1), sum(v2)
     */
    private static SelectList addMissingAggregationColumns(SelectList selectList,
                                                           List<FunctionCallExpr> aggregateExprs) {
        SelectList result = selectList.clone();
        for (FunctionCallExpr functionCallExpr : aggregateExprs) {
            boolean columnExists = false;
            for (SelectListItem selectListItem : selectList.getItems()) {
                if (selectListItem.getExpr().equals(functionCallExpr)) {
                    columnExists = true;
                    break;
                }
            }
            if (!columnExists) {
                SelectListItem selectListItem = new SelectListItem(functionCallExpr.clone().reset(), null);
                result.addItem(selectListItem);
            }
        }
        return result;
    }

    /**
     * Rewrite all operands in a UNION. The conditions that apply to SelectStmt rewriting
     * also apply here.
     */
    private static void rewriteUnionStatement(SetOperationStmt stmt, Analyzer analyzer)
            throws AnalysisException {
        for (SetOperationStmt.SetOperand operand : stmt.getOperands()) {
            QueryStmt queryStmt = operand.getQueryStmt();
            if (queryStmt instanceof SelectStmt) {
                QueryStmt rewrittenQueryStmt = rewriteSelectStatement((SelectStmt) queryStmt, operand.getAnalyzer());
                operand.setQueryStmt(rewrittenQueryStmt);
            } else if (queryStmt instanceof SetOperationStmt) {
                rewriteUnionStatement((SetOperationStmt) queryStmt, operand.getAnalyzer());
            } else {
                throw new IllegalStateException("Rewrite union statement failed. "
                    + "Because QueryStmt is neither SelectStmt nor SetOperationStmt");
            }
        }
    }

    /**
     * Returns true if the Expr tree rooted at 'expr' has at least one subquery
     * that participates in a disjunction.
     */
    private static boolean hasSubqueryInDisjunction(Expr expr) {
        if (!(expr instanceof CompoundPredicate)) {
            return false;
        }
        if (Expr.IS_OR_PREDICATE.apply(expr)) {
            return expr.contains(Subquery.class);
        }
        for (Expr child : expr.getChildren()) {
            if (hasSubqueryInDisjunction(child)) {
                return true;
            }
        }
        return false;
    }

    private static void extractExprWithSubquery(boolean inDisjunct, Expr expr,
            List<Expr> subqueryExprInConjunct, List<Expr> subqueryExprInDisjunct) {
        if (!(expr instanceof CompoundPredicate)) {
            if (expr.contains(Subquery.class)) {
                // remove redundant sub-query by compare two sub-query with equals
                if (inDisjunct) {
                    if (!subqueryExprInDisjunct.contains(expr)) {
                        subqueryExprInDisjunct.add(expr);
                    }
                } else {
                    if (!subqueryExprInConjunct.contains(expr)) {
                        subqueryExprInConjunct.add(expr);
                    }
                }
            }
        } else {
            for (Expr child : expr.getChildren()) {
                extractExprWithSubquery(inDisjunct || Expr.IS_OR_PREDICATE.apply(expr), child,
                        subqueryExprInConjunct, subqueryExprInDisjunct);
            }
        }
    }

    /**
     * Rewrite all subqueries of a stmt's WHERE clause. Initially, all the
     * conjuncts containing subqueries are extracted from the WHERE clause and are
     * replaced with true BoolLiterals. Subsequently, each extracted conjunct is
     * merged into its parent select block by converting it into a join.
     * Conjuncts with subqueries that themselves contain conjuncts with subqueries are
     * recursively rewritten in a bottom up fashion.
     * <p>
     * The following example illustrates the bottom up rewriting of nested queries.
     * Suppose we have the following three level nested query Q0:
     * <p>
     * SELECT *
     * FROM T1                                            : Q0
     * WHERE T1.a IN (SELECT a
     * FROM T2 WHERE T2.b IN (SELECT b
     * FROM T3))
     * AND T1.c < 10;
     * <p>
     * This query will be rewritten as follows. Initially, the IN predicate
     * T1.a IN (SELECT a FROM T2 WHERE T2.b IN (SELECT b FROM T3)) is extracted
     * from the top level block (Q0) since it contains a subquery and is
     * replaced by a true BoolLiteral, resulting in the following query Q1:
     * <p>
     * SELECT * FROM T1 WHERE TRUE : Q1
     * <p>
     * Since the stmt in the extracted predicate contains a conjunct with a subquery,
     * it is also rewritten. As before, rewriting stmt SELECT a FROM T2
     * WHERE T2.b IN (SELECT b FROM T3) works by first extracting the conjunct that
     * contains the subquery (T2.b IN (SELECT b FROM T3)) and substituting it with
     * a true BoolLiteral, producing the following stmt Q2:
     * <p>
     * SELECT a FROM T2 WHERE TRUE : Q2
     * <p>
     * The predicate T2.b IN (SELECT b FROM T3) is then merged with Q2,
     * producing the following unnested query Q3:
     * <p>
     * SELECT a FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1 ON T2.b = $a$1.b : Q3
     * <p>
     * The extracted IN predicate becomes:
     * <p>
     * T1.a IN (SELECT a FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1 ON T2.b = $a$1.b)
     * <p>
     * Finally, the rewritten IN predicate is merged with query block Q1,
     * producing the following unnested query (WHERE clauses that contain only
     * conjunctions of true BoolLiterals are eliminated):
     * <p>
     * SELECT *
     * FROM T1 LEFT SEMI JOIN (SELECT a
     * FROM T2 LEFT SEMI JOIN (SELECT b FROM T3) $a$1
     * ON T2.b = $a$1.b) $a$1
     * ON $a$1.a = T1.a
     * WHERE T1.c < 10;
     */
    private static void rewriteWhereClauseSubqueries(SelectStmt stmt, Analyzer analyzer)
            throws AnalysisException {
        int numTableRefs = stmt.fromClause.size();
        // we must use two same set structs to process conjuncts and disjuncts
        // because the same sub-query could appear in both at the same time.
        // if we use one ExprSubstitutionMap, the sub-query will be replaced by wrong expr.
        ArrayList<Expr> exprsWithSubqueriesInConjuncts = Lists.newArrayList();
        ArrayList<Expr> exprsWithSubqueriesInDisjuncts = Lists.newArrayList();
        ExprSubstitutionMap conjunctsSmap = new ExprSubstitutionMap();
        ExprSubstitutionMap disjunctsSmap = new ExprSubstitutionMap();
        List<TupleDescriptor> markTuples = Lists.newArrayList();
        List<Expr> subqueryInConjunct = Lists.newArrayList();
        List<Expr> subqueryInDisjunct = Lists.newArrayList();
        // Check if all the conjuncts in the WHERE clause that contain subqueries
        // can currently be rewritten as a join.
        extractExprWithSubquery(false, stmt.whereClause, subqueryInConjunct, subqueryInDisjunct);
        for (Expr conjunct : subqueryInConjunct) {
            processOneSubquery(stmt, exprsWithSubqueriesInConjuncts,
                    conjunctsSmap, markTuples, conjunct, analyzer, false);
        }
        for (Expr conjunct : subqueryInDisjunct) {
            processOneSubquery(stmt, exprsWithSubqueriesInDisjuncts,
                    disjunctsSmap, markTuples, conjunct, analyzer, true);
        }
        stmt.whereClause = stmt.whereClause.substitute(conjunctsSmap, disjunctsSmap, analyzer, false);

        boolean hasNewVisibleTuple = false;
        // Recursively equal all the exprs that contain subqueries and merge them with 'stmt'.
        for (Expr expr : exprsWithSubqueriesInConjuncts) {
            if (mergeExpr(stmt, rewriteExpr(expr, analyzer), analyzer, null)) {
                hasNewVisibleTuple = true;
            }
        }
        for (int i = 0; i < exprsWithSubqueriesInDisjuncts.size(); i++) {
            Expr expr = exprsWithSubqueriesInDisjuncts.get(i);
            if (mergeExpr(stmt, rewriteExpr(expr, analyzer), analyzer, markTuples.get(i))) {
                hasNewVisibleTuple = true;
            }
        }
        if (canEliminate(stmt.whereClause)) {
            stmt.whereClause = null;
        }
        if (hasNewVisibleTuple) {
            replaceUnqualifiedStarItems(stmt, numTableRefs);
        }
    }

    private static void processOneSubquery(SelectStmt stmt,
            List<Expr> exprsWithSubqueries, ExprSubstitutionMap smap, List<TupleDescriptor> markTuples,
            Expr exprWithSubquery, Analyzer analyzer, boolean isMark) throws AnalysisException {
        List<Subquery> subqueries = Lists.newArrayList();
        exprWithSubquery.collectAll(Predicates.instanceOf(Subquery.class), subqueries);
        if (subqueries.size() == 0) {
            return;
        }
        if (subqueries.size() > 1) {
            throw new AnalysisException("Multiple subqueries are not supported in "
                    + "expression: " + exprWithSubquery.toSql());
        }
        if (!(exprWithSubquery instanceof InPredicate)
                && !(exprWithSubquery instanceof ExistsPredicate)
                && !(exprWithSubquery instanceof BinaryPredicate)
                && !exprWithSubquery.contains(Expr.IS_SCALAR_SUBQUERY)) {
            throw new AnalysisException("Non-scalar subquery is not supported in "
                    + "expression: "
                    + exprWithSubquery.toSql());
        }
        if (exprWithSubquery instanceof BinaryPredicate && (childrenContainInOrExists(exprWithSubquery))) {
            throw new AnalysisException("Not support binaryOperator children at least one is in or exists subquery"
                    + exprWithSubquery.toSql());
        }

        if (exprWithSubquery instanceof ExistsPredicate) {
            // Check if we can determine the result of an ExistsPredicate during analysis.
            // If so, replace the predicate with a BoolLiteral predicate and remove it from
            // the list of predicates to be rewritten.
            BoolLiteral boolLiteral = replaceExistsPredicate((ExistsPredicate) exprWithSubquery);
            if (boolLiteral != null) {
                boolLiteral.analyze(analyzer);
                smap.put(exprWithSubquery, boolLiteral);
                return;
            }
        }

        // Replace all the supported exprs with subqueries with true BoolLiterals
        // using a smap.
        if (isMark) {
            TupleDescriptor markTuple = analyzer.getDescTbl().createTupleDescriptor();
            markTuple.setAliases(new String[]{stmt.getTableAliasGenerator().getNextAlias()}, true);
            SlotDescriptor markSlot = analyzer.addSlotDescriptor(markTuple);
            String slotName = stmt.getColumnAliasGenerator().getNextAlias();
            markSlot.setType(ScalarType.BOOLEAN);
            markSlot.setIsMaterialized(true);
            markSlot.setIsNullable(true);
            markSlot.setColumn(new Column(slotName, ScalarType.BOOLEAN));
            SlotRef markRef = new SlotRef(markSlot);
            markRef.setTblName(new TableName(null, null, markTuple.getAlias()));
            markRef.setLabel(slotName);
            smap.put(exprWithSubquery, markRef);
            markTuples.add(markTuple);
            exprsWithSubqueries.add(exprWithSubquery);
        } else {
            BoolLiteral boolLiteral = new BoolLiteral(true);
            boolLiteral.analyze(analyzer);
            smap.put(exprWithSubquery, boolLiteral);
            exprsWithSubqueries.add(exprWithSubquery);
        }
    }

    private static boolean childrenContainInOrExists(Expr expr) {
        boolean contain = false;
        for (Expr child : expr.getChildren()) {
            contain = contain || child instanceof InPredicate || child instanceof ExistsPredicate;
            if (contain) {
                break;
            }
        }
        return contain;
    }

    /**
     * Replace an ExistsPredicate that contains a subquery with a BoolLiteral if we
     * can determine its result without evaluating it. Return null if the result of the
     * ExistsPredicate can only be determined at run-time.
     */
    private static BoolLiteral replaceExistsPredicate(ExistsPredicate predicate) {
        Subquery subquery = predicate.getSubquery();
        Preconditions.checkNotNull(subquery);
        SelectStmt subqueryStmt = (SelectStmt) subquery.getStatement();
        BoolLiteral boolLiteral = null;
        if (subqueryStmt.getAnalyzer().hasEmptyResultSet()) {
            boolLiteral = new BoolLiteral(predicate.isNotExists());
        } else if (subqueryStmt.hasAggInfo() && subqueryStmt.getAggInfo().hasAggregateExprs()
                && !subqueryStmt.hasAnalyticInfo() && subqueryStmt.getHavingPred() == null) {
            boolLiteral = new BoolLiteral(!predicate.isNotExists());
        }
        return boolLiteral;
    }

    /**
     * Return true if the Expr tree rooted at 'expr' can be safely
     * eliminated, i.e. it only consists of conjunctions of true BoolLiterals.
     */
    private static boolean canEliminate(Expr expr) {
        for (Expr conjunct : expr.getConjuncts()) {
            if (!Expr.IS_TRUE_LITERAL.apply(conjunct)) {
                return false;
            }
        }
        return true;
    }

    /*
     * Modifies in place an expr that contains a subquery by rewriting its
     * subquery stmt. The modified analyzed expr is returned.
     */
    private static Expr rewriteExpr(Expr expr, Analyzer analyzer)
            throws AnalysisException {
        // Extract the subquery and equal it.
        Subquery subquery = expr.getSubquery();
        Preconditions.checkNotNull(subquery);
        QueryStmt rewrittenStmt = rewriteSelectStatement((SelectStmt) subquery.getStatement(), subquery.getAnalyzer());
        // Create a new Subquery with the rewritten stmt and use a substitution map
        // to replace the original subquery from the expr.

        rewrittenStmt = rewrittenStmt.clone();
        rewrittenStmt.reset();
        Subquery newSubquery = new Subquery(rewrittenStmt);
        newSubquery.analyze(analyzer);

        ExprSubstitutionMap smap = new ExprSubstitutionMap();
        smap.put(subquery, newSubquery);
        return expr.substitute(smap, analyzer, false);
    }

    private static void canRewriteScalarFunction(Expr expr, Expr conjunct)
            throws AnalysisException {
        if (expr.getSubquery().isScalarSubquery()) {
            if (conjunct instanceof BinaryPredicate
                    && ((BinaryPredicate) conjunct).getOp() == BinaryPredicate.Operator.EQ) {
                return;
            }
            throw new AnalysisException("scalar subquery's correlatedPredicates's operator must be EQ");
        }
    }

    /**
     * origin stmt: select * from t1 where t1=(select k1 from t2);
     *
     * @param stmt     select * from t1 where true;
     * @param expr     t1=(select k1 from t2). The expr has already be rewritten.
     * @param analyzer
     * @return
     * @throws AnalysisException
     */
    private static boolean mergeExpr(SelectStmt stmt, Expr expr,
            Analyzer analyzer, TupleDescriptor markTuple) throws AnalysisException {
        // LOG.warn("dhc mergeExpr stmt={} expr={}", stmt, expr);
        if (LOG.isDebugEnabled()) {
            LOG.debug("SUBQUERY mergeExpr stmt={} expr={}", stmt.toSql(), expr.toSql());
        }
        Preconditions.checkNotNull(expr);
        Preconditions.checkNotNull(analyzer);
        Preconditions.checkState(expr.getSubquery().getAnalyzer() != null,
                "subquery must be analyze address=" + System.identityHashCode(expr.getSubquery()));
        boolean updateSelectList = false;

        SelectStmt subqueryStmt = (SelectStmt) expr.getSubquery().getStatement();
        // Create a new inline view from the subquery stmt. The inline view will be added
        // to the stmt's table refs later. Explicitly set the inline view's column labels
        // to eliminate any chance that column aliases from the parent query could reference
        // select items from the inline view after the equal.
        List<String> colLabels = Lists.newArrayList();
        // add a new alias for all columns in subquery
        for (int i = 0; i < subqueryStmt.getColLabels().size(); ++i) {
            colLabels.add(subqueryStmt.getColumnAliasGenerator().getNextAlias());
        }
        // (select k1 $a from t2) $b
        InlineViewRef inlineView = new InlineViewRef(
                stmt.getTableAliasGenerator().getNextAlias(), subqueryStmt, colLabels);

        // Extract all correlated predicates from the subquery.
        List<Expr> onClauseConjuncts = extractCorrelatedPredicates(subqueryStmt);
        if (!onClauseConjuncts.isEmpty()) {
            canRewriteCorrelatedSubquery(expr, onClauseConjuncts);
            // For correlated subqueries that are eligible for equal by transforming
            // into a join, a LIMIT clause has no effect on the results, so we can
            // safely remove it.
            // subqueryStmt.limitElement = null;
            subqueryStmt.limitElement = new LimitElement();
        }

        // Update the subquery's select list and/or its GROUP BY clause by adding
        // exprs from the extracted correlated predicates.
        boolean updateGroupBy = expr.getSubquery().isScalarSubquery()
                || (expr instanceof ExistsPredicate && subqueryStmt.hasAggInfo());
        // boolean updateGroupBy = false;
        List<Expr> lhsExprs = Lists.newArrayList();
        List<Expr> rhsExprs = Lists.newArrayList();

        for (Expr conjunct : onClauseConjuncts) {
            canRewriteScalarFunction(expr, conjunct);
            updateInlineView(inlineView, conjunct, stmt.getTableRefIds(),
                    lhsExprs, rhsExprs, updateGroupBy);
        }

        /*
         * Situation: The expr is an uncorrelated subquery for outer stmt.
         * Rewrite: Add a limit 1 for subquery.
         * origin stmt: select * from t1 where exists (select * from table2);
         * expr: exists (select * from table2)
         * outer stmt: select * from t1
         * onClauseConjuncts: empty.
         */
        if (expr instanceof ExistsPredicate && onClauseConjuncts.isEmpty()) {
            // For uncorrelated subqueries, we limit the number of rows returned by the
            // subquery.
            subqueryStmt.setLimit(1);
        }

        // Analyzing the inline view trigger reanalysis of the subquery's select statement.
        // However, the statement is already analyzed and since statement analysis is not
        // idempotent, the analysis needs to be reset (by a call to clone()).
        // inlineView = (InlineViewRef) inlineView.clone();
        inlineView.reset();
        try {
            inlineView.analyze(analyzer);
        } catch (UserException e) {
            throw new AnalysisException(e.getMessage());
        }
        inlineView.setLeftTblRef(stmt.fromClause.get(stmt.fromClause.size() - 1));

        stmt.fromClause.add(inlineView);
        JoinOperator joinOp = JoinOperator.LEFT_SEMI_JOIN;

        // Create a join conjunct from the expr that contains a subquery.
        Expr joinConjunct = createJoinConjunct(expr, inlineView, analyzer, !onClauseConjuncts.isEmpty());
        if (joinConjunct != null) {
            SelectListItem firstItem =
                    ((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0);
            if (!onClauseConjuncts.isEmpty()
                    && firstItem.getExpr().contains(Expr.NON_NULL_EMPTY_AGG)) {
                // Correlated subqueries with an aggregate function that returns non-null on
                // an empty input are rewritten using a LEFT OUTER JOIN because we
                // need to ensure that there is one agg value for every tuple of 'stmt'
                // (parent select block), even for those tuples of 'stmt' that get rejected
                // by the subquery due to some predicate. The new join conjunct is added to
                // stmt's WHERE clause because it needs to be applied to the result of the
                // LEFT OUTER JOIN (both matched and unmatched tuples).
                if (markTuple != null) {
                    // replace
                    ExprSubstitutionMap smap = new ExprSubstitutionMap();
                    smap.put(new SlotRef(markTuple.getSlots().get(0)), joinConjunct);
                    stmt.whereClause.substitute(smap);
                    markTuple = null;
                } else {
                    stmt.whereClause =
                            CompoundPredicate.createConjunction(joinConjunct, stmt.whereClause);
                }
                joinConjunct = null;
                joinOp = JoinOperator.LEFT_OUTER_JOIN;
                updateSelectList = true;
            }

            if (joinConjunct != null) {
                onClauseConjuncts.add(joinConjunct);
            }
        }

        // Create the ON clause from the extracted correlated predicates.
        Expr onClausePredicate =
                CompoundPredicate.createConjunctivePredicate(onClauseConjuncts);
        if (onClausePredicate == null) {
            Preconditions.checkState(expr instanceof ExistsPredicate);
            if (((ExistsPredicate) expr).isNotExists()) {
                throw new AnalysisException("Unsupported uncorrelated NOT EXISTS subquery: "
                        + subqueryStmt.toSql());
            }
            // We don't have an ON clause predicate to create an equi-join. Rewrite the
            // subquery using a CROSS JOIN.
            // TODO This is very expensive. Remove it when we implement independent
            // subquery evaluation.
            joinOp = JoinOperator.CROSS_JOIN;
            inlineView.setMark(markTuple);
            inlineView.setJoinOp(joinOp);
            LOG.warn("uncorrelated subquery rewritten using a cross join");
            // Indicate that new visible tuples may be added in stmt's select list.
            return true;
        }

        // Create an smap from the original select-list exprs of the select list to
        // the corresponding inline-view columns.
        ExprSubstitutionMap smap = new ExprSubstitutionMap();
        Preconditions.checkState(lhsExprs.size() == rhsExprs.size());
        for (int i = 0; i < lhsExprs.size(); ++i) {
            Expr lhsExpr = lhsExprs.get(i);
            Expr rhsExpr = rhsExprs.get(i);
            rhsExpr.analyze(analyzer);
            smap.put(lhsExpr, rhsExpr);
        }

        onClausePredicate = onClausePredicate.substitute(smap, analyzer, false);

        // Check for references to ancestor query blocks (cycles in the dependency
        // graph of query blocks are not supported).
        if (!onClausePredicate.isBoundByTupleIds(stmt.getTableRefIds())) {
            throw new AnalysisException("Unsupported correlated subquery: "
                    + subqueryStmt.toSql());
        }

        // Check if we have a valid ON clause for an equi-join.
        boolean hasEqJoinPred = false;
        for (Expr conjunct : onClausePredicate.getConjuncts()) {
            if (!(conjunct instanceof BinaryPredicate)) {
                continue;
            }
            BinaryPredicate.Operator operator = ((BinaryPredicate) conjunct).getOp();
            if (!operator.isEquivalence()) {
                continue;
            }
            List<TupleId> lhsTupleIds = Lists.newArrayList();
            conjunct.getChild(0).getIds(lhsTupleIds, null);
            if (lhsTupleIds.isEmpty()) {
                continue;
            }
            List<TupleId> rhsTupleIds = Lists.newArrayList();
            conjunct.getChild(1).getIds(rhsTupleIds, null);
            if (rhsTupleIds.isEmpty()) {
                continue;
            }
            // Check if columns from the outer query block (stmt) appear in both sides
            // of the binary predicate.
            if ((lhsTupleIds.contains(inlineView.getDesc().getId()) && lhsTupleIds.size() > 1)
                    || (rhsTupleIds.contains(inlineView.getDesc().getId())
                    && rhsTupleIds.size() > 1)) {
                continue;
            }
            hasEqJoinPred = true;
            break;
        }

        boolean isInBitmap = false;
        if (!hasEqJoinPred && !inlineView.isCorrelated()) {
            // Join with InPredicate is actually an equal join, so we choose HashJoin.
            if (expr instanceof ExistsPredicate) {
                joinOp = ((ExistsPredicate) expr).isNotExists() ? JoinOperator.LEFT_ANTI_JOIN :
                        JoinOperator.LEFT_SEMI_JOIN;
            } else if (expr instanceof InPredicate && !(joinConjunct instanceof BitmapFilterPredicate)) {
                joinOp = ((InPredicate) expr).isNotIn() ? JoinOperator.LEFT_ANTI_JOIN : JoinOperator.LEFT_SEMI_JOIN;
                if ((joinConjunct instanceof FunctionCallExpr
                        && (((FunctionCallExpr) joinConjunct).getFnName().getFunction()
                        .equalsIgnoreCase(BITMAP_CONTAINS)))) {
                    isInBitmap = true;
                }
            } else {
                joinOp = JoinOperator.CROSS_JOIN;
                // We can equal the aggregate subquery using a cross join. All conjuncts
                // that were extracted from the subquery are added to stmt's WHERE clause.
                if (markTuple != null) {
                    // replace
                    ExprSubstitutionMap markSmap = new ExprSubstitutionMap();
                    markSmap.put(new SlotRef(markTuple.getSlots().get(0)), onClausePredicate);
                    stmt.whereClause.substitute(markSmap);
                    markTuple = null;
                } else {
                    stmt.whereClause =
                            CompoundPredicate.createConjunction(onClausePredicate, stmt.whereClause);
                }
            }

            inlineView.setMark(markTuple);
            inlineView.setJoinOp(joinOp);
            inlineView.setInBitmap(isInBitmap);
            if (joinOp != JoinOperator.CROSS_JOIN) {
                inlineView.setOnClause(onClausePredicate);
            }
            // Indicate that the CROSS JOIN may add a new visible tuple to stmt's
            // select list (if the latter contains an unqualified star item '*')
            return true;
        }

        // We have a valid equi-join conjunct.
        if (expr instanceof InPredicate
                && ((InPredicate) expr).isNotIn()
                || expr instanceof ExistsPredicate
                && ((ExistsPredicate) expr).isNotExists()) {
            // For the case of a NOT IN with an eq join conjunct, replace the join
            // conjunct with a conjunct that uses the null-matching eq operator.
            if (expr instanceof InPredicate && markTuple == null) {
                joinOp = JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
                List<TupleId> tIds = Lists.newArrayList();
                joinConjunct.getIds(tIds, null);
                if (tIds.size() <= 1 || !tIds.contains(inlineView.getDesc().getId())) {
                    throw new AnalysisException("Unsupported NOT IN predicate with subquery: "
                            + expr.toSql());
                }
                // Replace the EQ operator in the generated join conjunct with a
                // null-matching EQ operator.
                for (Expr conjunct : onClausePredicate.getConjuncts()) {
                    if (conjunct.equals(joinConjunct)) {
                        Preconditions.checkState(conjunct instanceof BinaryPredicate);
                        Preconditions.checkState(((BinaryPredicate) conjunct).getOp()
                                == BinaryPredicate.Operator.EQ);
                        // ((BinaryPredicate)conjunct).setOp(BinaryPredicate.Operator.NULL_MATCHING_EQ);
                        break;
                    }
                }
            } else {
                joinOp = expr instanceof InPredicate ? JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN
                        : JoinOperator.LEFT_ANTI_JOIN;
            }
        }

        inlineView.setMark(markTuple);
        inlineView.setJoinOp(joinOp);
        inlineView.setOnClause(onClausePredicate);
        return updateSelectList;
    }

    /**
     * Replace all unqualified star exprs ('*') from stmt's select list with qualified
     * ones, i.e. tbl_1.*,...,tbl_n.*, where tbl_1,...,tbl_n are the visible tablerefs
     * in stmt. 'tableIndx' indicates the maximum tableRef ordinal to consider when
     * replacing an unqualified star item.
     */
    private static void replaceUnqualifiedStarItems(SelectStmt stmt, int tableIdx) {
        Preconditions.checkState(tableIdx < stmt.fromClause.size());
        ArrayList<SelectListItem> newItems = Lists.newArrayList();
        for (int i = 0; i < stmt.selectList.getItems().size(); ++i) {
            SelectListItem item = stmt.selectList.getItems().get(i);
            if (!item.isStar() || item.getTblName() != null) {
                newItems.add(item);
                continue;
            }
            // '*' needs to be replaced by tbl1.*,...,tbln.*, where
            // tbl1,...,tbln are the visible tableRefs in stmt.
            for (int j = 0; j < tableIdx; ++j) {
                TableRef tableRef = stmt.fromClause.get(j);
                if (tableRef.getJoinOp() == JoinOperator.LEFT_SEMI_JOIN
                        || tableRef.getJoinOp() == JoinOperator.LEFT_ANTI_JOIN
                        || tableRef.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
                    continue;
                }
                newItems.add(SelectListItem.createStarItem(tableRef.getAliasAsName()));
            }
        }
        Preconditions.checkState(!newItems.isEmpty());
        boolean isDistinct = stmt.selectList.isDistinct();
        stmt.selectList = new SelectList(newItems, isDistinct);
    }

    /**
     * Return true if the expr tree rooted at 'root' contains a correlated
     * predicate.
     */
    private static boolean containsCorrelatedPredicate(Expr root, List<TupleId> tupleIds) {
        if (isCorrelatedPredicate(root, tupleIds)) {
            return true;
        }
        for (Expr child : root.getChildren()) {
            if (containsCorrelatedPredicate(child, tupleIds)) {
                return true;
            }
        }
        return false;
    }

    /**
     * Returns true if 'expr' is a correlated predicate. A predicate is
     * correlated if at least one of its SlotRefs belongs to an ancestor
     * query block (i.e. is not bound by the given 'tupleIds').
     */
    private static boolean isCorrelatedPredicate(Expr expr, List<TupleId> tupleIds) {
        return (expr instanceof BinaryPredicate || expr instanceof SlotRef
                || expr instanceof IsNullPredicate) && !expr.isBoundByTupleIds(tupleIds);
    }

    /**
     * Extract all correlated predicates of a subquery.
     * <p>
     * TODO Handle correlated predicates in a HAVING clause.
     */
    private static ArrayList<Expr> extractCorrelatedPredicates(SelectStmt subqueryStmt)
            throws AnalysisException {
        List<TupleId> subqueryTupleIds = subqueryStmt.getTableRefIds();
        ArrayList<Expr> correlatedPredicates = Lists.newArrayList();

        if (subqueryStmt.hasWhereClause()) {
            if (!canExtractCorrelatedPredicates(subqueryStmt.getWhereClause(), subqueryTupleIds)) {
                throw new AnalysisException("Disjunctions with correlated predicates "
                        + "are not supported: " + subqueryStmt.getWhereClause().toSql());
            }
            // Extract the correlated predicates from the subquery's WHERE clause and
            // replace them with true BoolLiterals.
            Expr newWhereClause = extractCorrelatedPredicates(subqueryStmt.getWhereClause(),
                    subqueryTupleIds, correlatedPredicates);
            if (canEliminate(newWhereClause)) {
                newWhereClause = null;
            }
            subqueryStmt.setWhereClause(newWhereClause);
        }

        // Process all correlated predicates from subquery's ON clauses.
        for (TableRef tableRef : subqueryStmt.getTableRefs()) {
            if (tableRef.getOnClause() == null) {
                continue;
            }

            ArrayList<Expr> onClauseCorrelatedPreds = Lists.newArrayList();
            Expr newOnClause = extractCorrelatedPredicates(tableRef.getOnClause(),
                    subqueryTupleIds, onClauseCorrelatedPreds);
            if (onClauseCorrelatedPreds.isEmpty()) {
                continue;
            }

            correlatedPredicates.addAll(onClauseCorrelatedPreds);
            if (canEliminate(newOnClause)) {
                // After the extraction of correlated predicates from an ON clause,
                // the latter may only contain conjunctions of True BoolLiterals. In
                // this case, we can eliminate the ON clause and set the join type to
                // CROSS JOIN.
                tableRef.setJoinOp(JoinOperator.CROSS_JOIN);
                tableRef.setOnClause(null);
            } else {
                tableRef.setOnClause(newOnClause);
            }
        }
        return correlatedPredicates;
    }

    /**
     * Extract all correlated predicates from the expr tree rooted at 'root' and
     * replace them with true BoolLiterals. The modified expr tree is returned
     * and the extracted correlated predicates are added to 'matches'.
     */
    private static Expr extractCorrelatedPredicates(Expr root, List<TupleId> tupleIds, ArrayList<Expr> matches) {
        if (isCorrelatedPredicate(root, tupleIds)) {
            matches.add(root);
            return new BoolLiteral(true);
        }
        for (int i = 0; i < root.getChildren().size(); ++i) {
            root.getChildren().set(i, extractCorrelatedPredicates(root.getChild(i), tupleIds, matches));
        }
        return root;
    }

    /**
     * Returns true if we can extract the correlated predicates from 'expr'. A
     * correlated predicate cannot be extracted if it is part of a disjunction.
     */
    private static boolean canExtractCorrelatedPredicates(Expr expr,
                                                          List<TupleId> subqueryTupleIds) {
        if (!(expr instanceof CompoundPredicate)) {
            return true;
        }
        if (Expr.IS_OR_PREDICATE.apply(expr)) {
            return !containsCorrelatedPredicate(expr, subqueryTupleIds);
        }
        for (Expr child : expr.getChildren()) {
            if (!canExtractCorrelatedPredicates(child, subqueryTupleIds)) {
                return false;
            }
        }
        return true;
    }

    /**
     * Checks if an expr containing a correlated subquery is eligible for equal by
     * transforming into a join. 'correlatedPredicates' contains the correlated
     * predicates identified in the subquery. Throws an AnalysisException if 'expr'
     * is not eligible for equal.
     * TODO: Merge all the equal eligibility tests into a single function.
     */
    private static void canRewriteCorrelatedSubquery(
            Expr expr, List<Expr> correlatedPredicates)
            throws AnalysisException {
        Preconditions.checkNotNull(expr);
        Preconditions.checkState(expr.contains(Subquery.class));
        SelectStmt stmt = (SelectStmt) expr.getSubquery().getStatement();
        Preconditions.checkNotNull(stmt);
        // Only specified function could be supported in correlated subquery of binary predicate.
        if (expr instanceof BinaryPredicate) {
            if (stmt.getSelectList().getItems().size() != 1) {
                throw new AnalysisException("The subquery only support one item in select clause");
            }
            SelectListItem item = stmt.getSelectList().getItems().get(0);
            if (!item.getExpr().contains(Expr.CORRELATED_SUBQUERY_SUPPORT_AGG_FN)) {
                throw new AnalysisException("The select item in correlated subquery of binary predicate should only "
                        + "be sum, min, max, avg and count. Current subquery:" + stmt.toSql());
            }
        }
        // Grouping and/or aggregation (including analytic functions) is forbidden in correlated subquery of in
        // predicate.
        if (expr instanceof InPredicate && (stmt.hasAggInfo() || stmt.hasAnalyticInfo())) {
            LOG.warn("canRewriteCorrelatedSubquery fail, expr={} subquery={}", expr.toSql(), stmt.toSql());
            throw new AnalysisException("Unsupported correlated subquery"
                    + " with grouping and/or aggregation: " + stmt.toSql());
        }

        final com.google.common.base.Predicate<Expr> isSingleSlotRef =
                arg -> arg.unwrapSlotRef(false) != null;

        // A HAVING clause is only allowed on correlated EXISTS subqueries with
        // correlated binary predicates of the form Slot = Slot (see IMPALA-2734)
        // TODO Handle binary predicates with IS NOT DISTINCT op
        if (expr instanceof ExistsPredicate && stmt.hasHavingClause()
                && !correlatedPredicates.isEmpty()
                && (!stmt.hasAggInfo()
                || !Iterables.all(correlatedPredicates,
                Predicates.or(Expr.IS_EQ_BINARY_PREDICATE, isSingleSlotRef)))) {
            throw new AnalysisException(
                    "Unsupported correlated EXISTS subquery with a " + "HAVING clause: " + stmt.toSql());
        }

        // The following correlated subqueries with a limit clause are supported:
        // 1. EXISTS subqueries
        // 2. Scalar subqueries with aggregation
        if (stmt.hasLimitClause()
                && (!(expr instanceof BinaryPredicate) || !stmt.hasAggInfo()
                || stmt.selectList.isDistinct())
                && !(expr instanceof ExistsPredicate)) {
            throw new AnalysisException("Unsupported correlated subquery with a "
                    + "LIMIT clause: " + stmt.toSql());
        }
    }

    /**
     * Update the subquery within an inline view by expanding its select list with exprs
     * from a correlated predicate 'expr' that will be 'moved' to an ON clause in the
     * subquery's parent query block. We need to make sure that every expr extracted from
     * the subquery references an item in the subquery's select list. If 'updateGroupBy'
     * is true, the exprs extracted from 'expr' are also added in stmt's GROUP BY clause.
     * Throws an AnalysisException if we need to update the GROUP BY clause but
     * both the lhs and rhs of 'expr' reference a tuple of the subquery stmt.
     */
    private static void updateInlineView(
            InlineViewRef inlineView, Expr expr, List<TupleId> parentQueryTids,
            List<Expr> lhsExprs, List<Expr> rhsExprs, boolean updateGroupBy)
            throws AnalysisException {
        SelectStmt stmt = (SelectStmt) inlineView.getViewStmt();
        List<TupleId> subqueryTblIds = stmt.getTableRefIds();
        ArrayList<Expr> groupByExprs = null;
        if (updateGroupBy) {
            groupByExprs = Lists.newArrayList();
        }

        List<SelectListItem> items = stmt.selectList.getItems();
        // Collect all the SlotRefs from 'expr' and identify those that are bound by
        // subquery tuple ids.
        ArrayList<Expr> slotRefs = Lists.newArrayList();
        expr.collectAll(Predicates.instanceOf(SlotRef.class), slotRefs);
        List<Expr> exprsBoundBySubqueryTids = Lists.newArrayList();
        for (Expr slotRef : slotRefs) {
            if (slotRef.isBoundByTupleIds(subqueryTblIds)) {
                exprsBoundBySubqueryTids.add(slotRef);
            }
        }
        // The correlated predicate only references slots from a parent block,
        // no need to update the subquery's select or group by list.
        if (exprsBoundBySubqueryTids.isEmpty()) {
            return;
        }
        if (updateGroupBy) {
            Preconditions.checkState(expr instanceof BinaryPredicate);
            Expr exprBoundBySubqueryTids = null;
            if (exprsBoundBySubqueryTids.size() > 1) {
                // If the predicate contains multiple SlotRefs bound by subquery tuple
                // ids, they must all be on the same side of that predicate.
                if (expr.getChild(0).isBoundByTupleIds(subqueryTblIds)
                        && expr.getChild(1).isBoundByTupleIds(parentQueryTids)) {
                    exprBoundBySubqueryTids = expr.getChild(0);
                } else if (expr.getChild(0).isBoundByTupleIds(parentQueryTids)
                        && expr.getChild(1).isBoundByTupleIds(subqueryTblIds)) {
                    exprBoundBySubqueryTids = expr.getChild(1);
                } else {
                    throw new AnalysisException("All subquery columns "
                            + "that participate in a predicate must be on the same side of "
                            + "that predicate: " + expr.toSql());
                }
            } else {
                Preconditions.checkState(exprsBoundBySubqueryTids.size() == 1);
                exprBoundBySubqueryTids = exprsBoundBySubqueryTids.get(0);
            }
            exprsBoundBySubqueryTids.clear();
            exprsBoundBySubqueryTids.add(exprBoundBySubqueryTids);
        }

        // Add the exprs bound by subquery tuple ids to the select list and
        // register it for substitution. We use a temporary substitution map
        // because we cannot at this point analyze the new select list expr. Once
        // the new inline view is analyzed, the entries from this map will be
        // added to an ExprSubstitutionMap.
        for (Expr boundExpr : exprsBoundBySubqueryTids) {
            String colAlias = stmt.getColumnAliasGenerator().getNextAlias();
            items.add(new SelectListItem(boundExpr, null));
            inlineView.getExplicitColLabels().add(colAlias);
            lhsExprs.add(boundExpr);
            rhsExprs.add(new SlotRef(inlineView.getAliasAsName(), colAlias));
            if (groupByExprs != null) {
                groupByExprs.add(boundExpr);
            }
        }

        // Update the subquery's select list.
        boolean isDistinct = stmt.selectList.isDistinct();
        Preconditions.checkState(!isDistinct);
        stmt.selectList = new SelectList(items, isDistinct);
        // Update subquery's GROUP BY clause
        if (groupByExprs != null && !groupByExprs.isEmpty()) {
            if (stmt.hasGroupByClause()) {
                stmt.groupByClause.getGroupingExprs().addAll(groupByExprs);
                stmt.groupByClause.getOriGroupingExprs().addAll(groupByExprs);
            } else {
                stmt.groupByClause = new GroupByClause(groupByExprs, GroupByClause.GroupingType.GROUP_BY);
            }
        }
    }

    // If left expr of in predicate is a constant and a bitmap filter cannot be used, then the normal nested loop join
    // process is used, with the join Conjunct being `bitmap_contains`,
    // e.g. 'select k1, k2 from (select 2 k1, 11 k2) t where k1 in (select bitmap_col from bitmap_tbl)'.
    private static Expr createInBitmapConjunct(Expr exprWithSubquery, SlotRef bitmapSlotRef, Analyzer analyzer,
            boolean isCorrelated) throws AnalysisException {
        if (isCorrelated) {
            throw new AnalysisException("In bitmap does not support correlated subquery: " + exprWithSubquery.toSql());
        }

        boolean useBitmapFilter = false;
        List<SlotRef> slotRefs = Lists.newArrayList();
        exprWithSubquery.getChild(0).collect(SlotRef.class, slotRefs);
        for (SlotRef slotRef : slotRefs) {
            List<Expr> sourceExprs = slotRef.getDesc().getSourceExprs();
            if (sourceExprs.isEmpty() || sourceExprs.stream().anyMatch(expr -> !expr.isConstant())) {
                useBitmapFilter = true;
            }
        }

        Expr pred = useBitmapFilter ? new BitmapFilterPredicate(exprWithSubquery.getChild(0), bitmapSlotRef,
                ((InPredicate) exprWithSubquery).isNotIn()) : new FunctionCallExpr(new FunctionName(BITMAP_CONTAINS),
                Lists.newArrayList(bitmapSlotRef, exprWithSubquery.getChild(0)));
        pred.analyze(analyzer);
        return pred;
    }

    /**
     * Converts an expr containing a subquery into an analyzed conjunct to be
     * used in a join. The conversion is performed in place by replacing the
     * subquery with the first expr from the select list of 'inlineView'.
     * If 'isCorrelated' is true and the first expr from the inline view contains
     * an aggregate function that returns non-null on an empty input,
     * the aggregate function is wrapped into a 'zeroifnull' function.
     */
    private static Expr createJoinConjunct(Expr exprWithSubquery, InlineViewRef inlineView,
            Analyzer analyzer, boolean isCorrelated) throws AnalysisException {
        Preconditions.checkNotNull(exprWithSubquery);
        Preconditions.checkNotNull(inlineView);
        Preconditions.checkState(exprWithSubquery.contains(Subquery.class));
        if (exprWithSubquery instanceof ExistsPredicate) {
            return null;
        }
        // Create a SlotRef from the first item of inlineView's select list
        SlotRef slotRef = new SlotRef(new TableName(null, null, inlineView.getAlias()),
                inlineView.getColLabels().get(0));
        slotRef.analyze(analyzer);
        Expr subquerySubstitute = slotRef;
        if (exprWithSubquery instanceof InPredicate) {
            if (slotRef.getType().isBitmapType()) {
                return createInBitmapConjunct(exprWithSubquery, slotRef, analyzer, isCorrelated);
            }
            BinaryPredicate pred = new BinaryPredicate(BinaryPredicate.Operator.EQ,
                    exprWithSubquery.getChild(0), slotRef);
            pred.analyze(analyzer);
            return pred;
        }

        Subquery subquery = exprWithSubquery.getSubquery();
        ExprSubstitutionMap smap = new ExprSubstitutionMap();
        SelectListItem item =
                ((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0);
        if (isCorrelated && item.getExpr().contains(Expr.NON_NULL_EMPTY_AGG)) {
            // TODO: Add support for multiple agg functions that return non-null on an
            //   empty input, by wrapping them with zeroifnull functions before the inline
            //   view is analyzed.
            if (!Expr.NON_NULL_EMPTY_AGG.apply(item.getExpr())
                    && (!(item.getExpr() instanceof CastExpr)
                    || !Expr.NON_NULL_EMPTY_AGG.apply(item.getExpr().getChild(0)))) {
                throw new AnalysisException("Aggregate function that returns non-null on "
                        + "an empty input cannot be used in an expression in a "
                        + "correlated subquery's select list: " + subquery.toSql());
            }

            List<Expr> aggFns = Lists.newArrayList();
            item.getExpr().collectAll(Expr.NON_NULL_EMPTY_AGG, aggFns);
            // TODO Generalize this by making the aggregate functions aware of the
            // literal expr that they return on empty input, e.g. max returns a
            // NullLiteral whereas count returns a NumericLiteral.
            if (aggFns.get(0).getFn().getReturnType().isNumericType()) {
                FunctionCallExpr zeroIfNull = new FunctionCallExpr("ifnull",
                        Lists.newArrayList((Expr) slotRef, new IntLiteral(0, Type.BIGINT)));
                zeroIfNull.analyze(analyzer);
                subquerySubstitute = zeroIfNull;
            } else if (aggFns.get(0).getFn().getReturnType().isStringType()) {
                List<Expr> params = Lists.newArrayList();
                params.add(slotRef);
                params.add(new StringLiteral(""));
                FunctionCallExpr ifnull = new FunctionCallExpr("ifnull", params);
                ifnull.analyze(analyzer);
                subquerySubstitute = ifnull;
            } else {
                throw new AnalysisException("Unsupported aggregate function used in "
                        + "a correlated subquery's select list: " + subquery.toSql());
            }
        }
        smap.put(subquery, subquerySubstitute);
        return exprWithSubquery.substitute(smap, analyzer, false);
    }

    public static boolean rewriteByPolicy(StatementBase statementBase, Analyzer analyzer) throws UserException {
        // old planner no use
        return false;
    }

    /**
     *
     * @param column the column of SlotRef
     * @param selectList new selectList for selectStmt
     * @param groupByExprs group by Exprs for selectStmt
     * @return true if ref can be rewritten
     */
    private static boolean rewriteSelectList(Column column, SelectList selectList, ArrayList<Expr> groupByExprs) {
        SlotRef slot = new SlotRef(null, column.getName());
        if (column.isKey()) {
            selectList.addItem(new SelectListItem(slot, column.getName()));
            groupByExprs.add(slot);
            return true;
        } else if (column.isAggregated()) {
            FunctionCallExpr func = generateAggFunction(slot, column);
            if (func != null) {
                selectList.addItem(new SelectListItem(func, column.getName()));
                return true;
            }
        }
        return false;
    }

    /**
     * rewrite stmt for querying random distributed table, construct an aggregation node for pre-agg
     * * CREATE TABLE `tbl` (
     *   `k1` BIGINT NULL DEFAULT "10",
     *   `k3` SMALLINT NULL,
     *   `a` BIGINT SUM NULL DEFAULT "0"
     * ) ENGINE=OLAP
     * AGGREGATE KEY(`k1`, `k2`)
     * DISTRIBUTED BY RANDOM BUCKETS 1
     * PROPERTIES (
     * "replication_allocation" = "tag.location.default: 1"
     * )
     * e.g.,
     * original: select * from tbl
     * rewrite: select * from (select k1, k2, sum(pv) from tbl group by k1, k2) t
     * do not rewrite if no need two phase agg:
     * e.g.,
     *     1. select max(k1) from tbl
     *     2. select sum(a) from tbl
     *
     * @param statementBase stmt to rewrite
     * @param analyzer the analyzer
     * @return true if rewritten
     * @throws UserException
     */
    public static boolean rewriteForRandomDistribution(StatementBase statementBase, Analyzer analyzer)
            throws UserException {
        boolean reAnalyze = false;
        if (!(statementBase instanceof SelectStmt)) {
            return false;
        }
        SelectStmt selectStmt = (SelectStmt) statementBase;
        for (int i = 0; i < selectStmt.fromClause.size(); i++) {
            TableRef tableRef = selectStmt.fromClause.get(i);
            // Recursively rewrite subquery
            if (tableRef instanceof InlineViewRef) {
                InlineViewRef viewRef = (InlineViewRef) tableRef;
                if (rewriteForRandomDistribution(viewRef.getQueryStmt(), viewRef.getAnalyzer())) {
                    reAnalyze = true;
                }
                continue;
            }
            TableIf table = tableRef.getTable();
            if (!(table instanceof OlapTable)) {
                continue;
            }
            // only rewrite random distributed AGG_KEY table
            OlapTable olapTable = (OlapTable) table;
            if (olapTable.getKeysType() != KeysType.AGG_KEYS) {
                continue;
            }
            DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
            if (distributionInfo.getType() != DistributionInfo.DistributionInfoType.RANDOM) {
                continue;
            }

            // check agg function and column agg type
            AggregateInfo aggInfo = selectStmt.getAggInfo();
            GroupByClause groupByClause = selectStmt.getGroupByClause();
            boolean aggTypeMatch = true;
            if (aggInfo != null || groupByClause != null) {
                if (aggInfo != null) {
                    ArrayList<FunctionCallExpr> aggExprs = aggInfo.getAggregateExprs();
                    if (aggExprs.stream().anyMatch(expr -> !aggTypeMatch(expr.getFnName().getFunction(), expr))) {
                        aggTypeMatch = false;
                    }
                    List<Expr> groupExprs = aggInfo.getGroupingExprs();
                    if (groupExprs.stream().anyMatch(expr -> !isKeyOrConstantExpr(expr))) {
                        aggTypeMatch = false;
                    }
                }
                if (groupByClause != null) {
                    List<Expr> groupByExprs = groupByClause.getGroupingExprs();
                    if (groupByExprs.stream().anyMatch(expr -> !isKeyOrConstantExpr(expr))) {
                        aggTypeMatch = false;
                    }
                }
                if (aggTypeMatch) {
                    continue;
                }
            }
            // construct a new InlineViewRef for pre-agg
            boolean canRewrite = true;
            SelectList selectList = new SelectList();
            ArrayList<Expr> groupingExprs = new ArrayList<>();
            List<Column> columns = olapTable.getBaseSchema();
            for (Column col : columns) {
                if (!rewriteSelectList(col, selectList, groupingExprs)) {
                    canRewrite = false;
                    break;
                }
            }
            if (!canRewrite) {
                continue;
            }
            Expr whereClause = selectStmt.getWhereClause() == null ? null : selectStmt.getWhereClause().clone();
            SelectStmt newSelectSmt = new SelectStmt(selectList,
                    new FromClause(Lists.newArrayList(tableRef)),
                    whereClause,
                    new GroupByClause(groupingExprs, GroupByClause.GroupingType.GROUP_BY),
                    null,
                    null,
                    LimitElement.NO_LIMIT);
            InlineViewRef inlineViewRef = new InlineViewRef(tableRef.getAliasAsName().getTbl(), newSelectSmt);
            inlineViewRef.setJoinOp(tableRef.getJoinOp());
            inlineViewRef.setLeftTblRef(tableRef.getLeftTblRef());
            inlineViewRef.setOnClause(tableRef.getOnClause());
            tableRef.setOnClause(null);
            tableRef.setLeftTblRef(null);
            tableRef.setOnClause(null);
            if (selectStmt.fromClause.size() > i + 1) {
                selectStmt.fromClause.get(i + 1).setLeftTblRef(inlineViewRef);
            }
            selectStmt.fromClause.set(i, inlineViewRef);
            selectStmt.analyze(analyzer);
            reAnalyze = true;
        }
        return reAnalyze;
    }

    /**
     * check if the agg type of functionCall match the agg type of column
     * @param functionName the functionName of functionCall
     * @param expr FunctionCallExpr
     * @return true if agg type match
     */
    private static boolean aggTypeMatch(String functionName, Expr expr) {
        if (expr.getChildren().isEmpty()) {
            if (expr instanceof SlotRef) {
                Column col = ((SlotRef) expr).getDesc().getColumn();
                if (col.isKey()) {
                    return functionName.equalsIgnoreCase("MAX")
                            || functionName.equalsIgnoreCase("MIN");
                }
                if (col.isAggregated()) {
                    AggregateType aggType = col.getAggregationType();
                    // agg type not mach
                    if (aggType == AggregateType.GENERIC) {
                        return col.getType().isAggStateType();
                    }
                    if (aggType == AggregateType.HLL_UNION) {
                        return functionName.equalsIgnoreCase(FunctionSet.HLL_UNION)
                                || functionName.equalsIgnoreCase(FunctionSet.HLL_UNION_AGG);
                    }
                    if (aggType == AggregateType.BITMAP_UNION) {
                        return functionName.equalsIgnoreCase(FunctionSet.BITMAP_UNION)
                                || functionName.equalsIgnoreCase(FunctionSet.BITMAP_UNION_COUNT)
                                || functionName.equalsIgnoreCase(FunctionSet.BITMAP_INTERSECT);
                    }
                    return functionName.equalsIgnoreCase(aggType.name());
                }
            }
            return false;
        }
        List<Expr> children = expr.getChildren();
        return children.stream().allMatch(child -> aggTypeMatch(functionName, child));
    }

    /**
     * check if the columns in expr is key column or constant, if group by clause contains value column, need rewrite
     *
     * @param expr expr to check
     * @return true if all columns is key column or constant
     */
    private static boolean isKeyOrConstantExpr(Expr expr) {
        if (expr instanceof SlotRef) {
            Column col = ((SlotRef) expr).getDesc().getColumn();
            return col.isKey();
        } else if (expr.isConstant()) {
            return true;
        }
        List<Expr> children = expr.getChildren();
        return children.stream().allMatch(StmtRewriter::isKeyOrConstantExpr);
    }

    /**
     * generate aggregation function according to the aggType of column
     *
     * @param slot slot of column
     * @return aggFunction generated
     */
    private static FunctionCallExpr generateAggFunction(SlotRef slot, Column column) {
        AggregateType aggregateType = column.getAggregationType();
        switch (aggregateType) {
            case SUM:
            case MAX:
            case MIN:
            case HLL_UNION:
            case BITMAP_UNION:
            case QUANTILE_UNION:
                FunctionName funcName = new FunctionName(aggregateType.toString().toLowerCase());
                return new FunctionCallExpr(funcName, new FunctionParams(false, Lists.newArrayList(slot)));
            case GENERIC:
                Type type = column.getType();
                if (!type.isAggStateType()) {
                    return null;
                }
                AggStateType aggState = (AggStateType) type;
                // use AGGREGATE_FUNCTION_UNION to aggregate multiple agg_state into one
                FunctionName functionName = new FunctionName(aggState.getFunctionName() + "_union");
                return new FunctionCallExpr(functionName, new FunctionParams(false, Lists.newArrayList(slot)));
            default:
                return null;
        }
    }
}