OriginalPlanner.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/Planner.java
// and modified by Doris

package org.apache.doris.planner;

import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.SelectListItem;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Config;
import org.apache.doris.common.FormatOptions;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.CommonResultSet;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.ResultSetMetaData;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.statistics.query.StatsDelta;
import org.apache.doris.thrift.TFetchOption;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TRuntimeFilterMode;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
 * The planner is responsible for turning parse trees into plan fragments that can be shipped off to backends for
 * execution.
 */
public class OriginalPlanner extends Planner {
    private static final Logger LOG = LogManager.getLogger(OriginalPlanner.class);

    private PlannerContext plannerContext;
    private SingleNodePlanner singleNodePlanner;
    private DistributedPlanner distributedPlanner;
    private Analyzer analyzer;

    public OriginalPlanner(Analyzer analyzer) {
        this.analyzer = analyzer;
    }

    public PlannerContext getPlannerContext() {
        return plannerContext;
    }

    public List<ScanNode> getScanNodes() {
        if (singleNodePlanner == null) {
            return Lists.newArrayList();
        }
        return singleNodePlanner.getScanNodes();
    }

    public void plan(StatementBase queryStmt, TQueryOptions queryOptions)
            throws UserException {
        this.queryOptions = queryOptions;
        createPlanFragments(queryStmt, analyzer, queryOptions);

        // update scan nodes visible version at the end of plan phase.
        ScanNode.setVisibleVersionForOlapScanNodes(getScanNodes());
    }

    @Override
    public List<RuntimeFilter> getRuntimeFilters() {
        return analyzer.getAssignedRuntimeFilter();
    }

    private void setResultExprScale(Analyzer analyzer, ArrayList<Expr> outputExprs) {
        for (TupleDescriptor tupleDesc : analyzer.getDescTbl().getTupleDescs()) {
            for (SlotDescriptor slotDesc : tupleDesc.getSlots()) {
                for (Expr expr : outputExprs) {
                    List<SlotId> slotList = Lists.newArrayList();
                    expr.getIds(null, slotList);
                    if ((!expr.getType().getPrimitiveType().isDecimalV2Type()
                            && expr.getType().getPrimitiveType().isDecimalV3Type())) {
                        continue;
                    }

                    if (!slotDesc.getType().getPrimitiveType().isDecimalV2Type()
                            && !slotDesc.getType().getPrimitiveType().isDecimalV3Type()) {
                        continue;
                    }

                    if (slotList.contains(slotDesc.getId()) && null != slotDesc.getColumn()) {
                        int outputScale = slotDesc.getColumn().getScale();
                        if (outputScale >= 0) {
                            if (outputScale > expr.getOutputScale()) {
                                expr.setOutputScale(outputScale);
                            }
                        }
                    }
                }
            }
        }
    }

    /**
     * Return combined explain string for all plan fragments.
     */
    @Override
    public void appendTupleInfo(StringBuilder str) {
        str.append(plannerContext.getRootAnalyzer().getDescTbl().getExplainString());
    }

    /**
     * Return hint information.
     */
    @Override
    public void appendHintInfo(StringBuilder str) {
    }

    /**
     * Create plan fragments for an analyzed statement, given a set of execution options. The fragments are returned in
     * a list such that element i of that list can only consume output of the following fragments j > i.
     */
    public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQueryOptions queryOptions)
            throws UserException {
        QueryStmt queryStmt;
        if (statement instanceof InsertStmt) {
            queryStmt = ((InsertStmt) statement).getQueryStmt();
        } else {
            queryStmt = (QueryStmt) statement;
        }
        plannerContext = new PlannerContext(analyzer, queryStmt, queryOptions, statement);
        singleNodePlanner = new SingleNodePlanner(plannerContext);
        PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
        ProjectPlanner projectPlanner = new ProjectPlanner(analyzer);
        projectPlanner.projectSingleNodePlan(queryStmt.getResultExprs(), singleNodePlan);
        if (statement instanceof InsertStmt) {
            InsertStmt insertStmt = (InsertStmt) statement;
            insertStmt.prepareExpressions();
        }

        // TODO chenhao16 , no used materialization work
        // compute referenced slots before calling computeMemLayout()
        //analyzer.markRefdSlots(analyzer, singleNodePlan, resultExprs, null);

        setResultExprScale(analyzer, queryStmt.getResultExprs());

        // materialized view selector
        boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer);
        if (selectFailed) {
            throw new MVSelectFailedException("Failed to select materialize view");
        }

        /**
         * - Under normal circumstances, computeMemLayout() will be executed
         *     at the end of the init function of the plan node.
         * Such as :
         * OlapScanNode {
         *     init () {
         *         analyzer.materializeSlots(conjuncts);
         *         computeTupleStatAndMemLayout(analyzer);
         *         computeStat();
         *     }
         * }
         * - However Doris is currently unable to determine
         *     whether it is possible to cut or increase the columns in the tuple after PlanNode.init().
         * - Therefore, for the time being, computeMemLayout() can only be placed
         *     after the completion of the entire single node planner.
         */
        analyzer.getDescTbl().computeMemLayout();
        singleNodePlan.finalize(analyzer);
        if (Config.enable_query_hit_stats && plannerContext.getStatement() != null
                && plannerContext.getStatement().getExplainOptions() == null) {
            collectQueryStat(singleNodePlan);
        }
        checkAndSetTopnOpt(singleNodePlan);

        if ((queryOptions.num_nodes == 1 || queryStmt.isPointQuery()) && !(statement instanceof InsertStmt)) {
            // single-node execution; we're almost done
            singleNodePlan = addUnassignedConjuncts(analyzer, singleNodePlan);
            fragments.add(new PlanFragment(plannerContext.getNextFragmentId(), singleNodePlan,
                    DataPartition.UNPARTITIONED));
        } else {
            // all select query are unpartitioned.
            distributedPlanner = new DistributedPlanner(plannerContext);
            fragments = distributedPlanner.createPlanFragments(singleNodePlan);
        }

        // Push sort node down to the bottom of olapscan.
        // Because the olapscan must be in the end. So get the last two nodes.
        pushSortToOlapScan();

        // Optimize the transfer of query statistic when query doesn't contain limit.
        PlanFragment rootFragment = fragments.get(fragments.size() - 1);
        QueryStatisticsTransferOptimizer queryStatisticTransferOptimizer
                = new QueryStatisticsTransferOptimizer(rootFragment);
        queryStatisticTransferOptimizer.optimizeQueryStatisticsTransfer();

        // Create runtime filters.
        if (!ConnectContext.get().getSessionVariable().getRuntimeFilterMode().toUpperCase()
                .equals(TRuntimeFilterMode.OFF.name())) {
            RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());
        }

        if (statement instanceof InsertStmt && !analyzer.getContext().isTxnModel()) {
            InsertStmt insertStmt = (InsertStmt) statement;
            rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments);
            rootFragment.setSink(insertStmt.getDataSink());
            insertStmt.complete();
            List<Expr> exprs = statement.getResultExprs();
            rootFragment.setOutputExprs(
                    Expr.substituteList(exprs, rootFragment.getPlanRoot().getOutputSmap(), analyzer, true));
        } else {
            List<Expr> resExprs = Expr.substituteList(queryStmt.getResultExprs(),
                    rootFragment.getPlanRoot().getOutputSmap(), analyzer, false);
            if (LOG.isDebugEnabled()) {
                LOG.debug("result Exprs {}", queryStmt.getResultExprs());
                LOG.debug("substitute result Exprs {}", resExprs);
            }
            rootFragment.setOutputExprs(resExprs);
        }
        rootFragment.setResultSinkType(ConnectContext.get().getResultSinkType());
        if (LOG.isDebugEnabled()) {
            LOG.debug("finalize plan fragments");
        }
        for (PlanFragment fragment : fragments) {
            fragment.finalize(queryStmt);
        }

        Collections.reverse(fragments);

        pushDownResultFileSink(analyzer);

        pushOutColumnUniqueIdsToOlapScan(rootFragment, analyzer);

        if (queryStmt instanceof SelectStmt) {
            SelectStmt selectStmt = (SelectStmt) queryStmt;
            if (selectStmt.isTwoPhaseReadOptEnabled()) {
                // Optimize query like `SELECT ... FROM <tbl> WHERE ... ORDER BY ... LIMIT ...`
                if (singleNodePlan instanceof SortNode
                        && singleNodePlan.getChildren().size() == 1
                        && ((SortNode) singleNodePlan).getChild(0) instanceof OlapScanNode) {
                    // Double check this plan to ensure it's a general topn query
                    injectRowIdColumnSlot();
                    ((SortNode) singleNodePlan).setUseTwoPhaseReadOpt(true);
                } else if (singleNodePlan instanceof OlapScanNode &&  singleNodePlan.getChildren().size() == 0) {
                    // Optimize query like `SELECT ... FROM <tbl> WHERE ... LIMIT ...`.
                    // This typically used when row store enabled, to reduce scan cost
                    injectRowIdColumnSlot();
                } else {
                    // This is not a general topn query, rollback needMaterialize flag
                    for (SlotDescriptor slot : analyzer.getDescTbl().getSlotDescs().values()) {
                        slot.setNeedMaterialize(true);
                    }
                }
            }
        }
    }

    /**
     * If there are unassigned conjuncts, returns a SelectNode on top of root that evaluate those conjuncts; otherwise
     * returns root unchanged.
     */
    private PlanNode addUnassignedConjuncts(Analyzer analyzer, PlanNode root)
            throws UserException {
        Preconditions.checkNotNull(root);
        // List<Expr> conjuncts = analyzer.getUnassignedConjuncts(root.getTupleIds());

        List<Expr> conjuncts = analyzer.getUnassignedConjuncts(root);
        if (conjuncts.isEmpty()) {
            return root;
        }
        // evaluate conjuncts in SelectNode
        SelectNode selectNode = new SelectNode(plannerContext.getNextNodeId(), root, conjuncts);
        selectNode.init(analyzer);
        Preconditions.checkState(selectNode.hasValidStats());
        return selectNode;
    }

    /**
     * This function is mainly used to try to push the top-level result file sink down one layer.
     * The result file sink after the pushdown can realize the function of concurrently exporting the result set.
     * Push down needs to meet the following conditions:
     * 1. The query enables the session variable of the concurrent export result set
     * 2. The top-level fragment is not a merge change node
     * 3. The export method uses the s3 method
     *
     * After satisfying the above three conditions,
     * the result file sink and the associated output expr will be pushed down to the next layer.
     * The second plan fragment performs expression calculation and derives the result set.
     * The top plan fragment will only summarize the status of the exported result set and return it to fe.
     */
    private void pushDownResultFileSink(Analyzer analyzer) {
        if (fragments.size() < 1) {
            return;
        }
        if (!(fragments.get(0).getSink() instanceof ResultFileSink)) {
            return;
        }
        if (!ConnectContext.get().getSessionVariable().isEnableParallelOutfile()) {
            return;
        }
        if (!(fragments.get(0).getPlanRoot() instanceof ExchangeNode)) {
            return;
        }
        PlanFragment topPlanFragment = fragments.get(0);
        ExchangeNode topPlanNode = (ExchangeNode) topPlanFragment.getPlanRoot();
        // try to push down result file sink
        if (topPlanNode.isFunctionalExchange()) {
            return;
        }
        PlanFragment secondPlanFragment = fragments.get(1);
        ResultFileSink resultFileSink = (ResultFileSink) topPlanFragment.getSink();
        if (resultFileSink.getStorageType() == StorageBackend.StorageType.BROKER) {
            return;
        }
        if (secondPlanFragment.getOutputExprs() != null) {
            return;
        }
        // create result file sink desc
        TupleDescriptor fileStatusDesc = ResultFileSink.constructFileStatusTupleDesc(analyzer.getDescTbl());
        resultFileSink.resetByDataStreamSink((DataStreamSink) secondPlanFragment.getSink());
        resultFileSink.setOutputTupleId(fileStatusDesc.getId());
        secondPlanFragment.setOutputExprs(topPlanFragment.getOutputExprs());
        secondPlanFragment.resetSink(resultFileSink);
        ResultSink resultSink = new ResultSink(topPlanNode.getId());
        topPlanFragment.resetSink(resultSink);
        topPlanFragment.resetOutputExprs(fileStatusDesc);
        topPlanFragment.getPlanRoot().resetTupleIds(Lists.newArrayList(fileStatusDesc.getId()));
    }

    private SlotDescriptor injectRowIdColumnSlot(Analyzer analyzer, TupleDescriptor tupleDesc) {
        SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(tupleDesc);
        if (LOG.isDebugEnabled()) {
            LOG.debug("inject slot {}", slotDesc);
        }
        String name = Column.ROWID_COL;
        Column col = new Column(name, Type.STRING, false, null, false, "",
                                        "rowid column");
        slotDesc.setType(Type.STRING);
        slotDesc.setColumn(col);
        slotDesc.setIsNullable(false);
        slotDesc.setIsMaterialized(true);
        return slotDesc;
    }

    // We use two phase read to optimize sql like: select * from tbl [where xxx = ???] [order by column1] [limit n]
    // in the first phase, we add an extra column `RowId` to Block, and sort blocks in TopN nodes
    // in the second phase, we have n rows, we do a fetch rpc to get all rowids date for the n rows
    // and reconconstruct the final block
    private void injectRowIdColumnSlot() {
        boolean injected = false;
        OlapTable olapTable = null;
        OlapScanNode scanNode = null;
        for (PlanFragment fragment : fragments) {
            PlanNode node = fragment.getPlanRoot();
            PlanNode parent = null;
            // OlapScanNode is the last node.
            // So, just get the last two node and check if they are SortNode and OlapScan.
            while (node.getChildren().size() != 0) {
                parent = node;
                node = node.getChildren().get(0);
            }

            // case1
            if ((node instanceof OlapScanNode) && (parent instanceof SortNode)) {
                SortNode sortNode = (SortNode) parent;
                scanNode = (OlapScanNode) node;
                SlotDescriptor slot = injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc());
                injectRowIdColumnSlot(analyzer, sortNode.getSortInfo().getSortTupleDescriptor());
                SlotRef extSlot = new SlotRef(slot);
                sortNode.getResolvedTupleExprs().add(extSlot);
                sortNode.getSortInfo().setUseTwoPhaseRead();
                injected = true;
                olapTable = scanNode.getOlapTable();
                break;
            }
            // case2
            if ((node instanceof OlapScanNode) && parent == null) {
                scanNode = (OlapScanNode) node;
                injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc());
                injected = true;
                olapTable = scanNode.getOlapTable();
                break;
            }
        }
        for (PlanFragment fragment : fragments) {
            if (injected && fragment.getSink() instanceof ResultSink) {
                TFetchOption fetchOption = olapTable.generateTwoPhaseReadOption(scanNode.getSelectedIndexId());
                ((ResultSink) fragment.getSink()).setFetchOption(fetchOption);
                break;
            }
        }
    }

    /**
     * Push sort down to olap scan.
     */
    private void pushSortToOlapScan() {
        for (PlanFragment fragment : fragments) {
            PlanNode node = fragment.getPlanRoot();
            PlanNode parent = null;

            // OlapScanNode is the last node.
            // So, just get the last two node and check if they are SortNode and OlapScan.
            while (node.getChildren().size() != 0) {
                parent = node;
                node = node.getChildren().get(0);
            }

            if (!(node instanceof OlapScanNode) || !(parent instanceof SortNode)) {
                continue;
            }
            SortNode sortNode = (SortNode) parent;
            OlapScanNode scanNode = (OlapScanNode) node;

            if (!scanNode.checkPushSort(sortNode)) {
                continue;
            }
            // If offset > 0, we push down (limit: limit + offset) to olap scan.
            if (sortNode.getOffset() > 0) {
                scanNode.setSortLimit(sortNode.getLimit() + sortNode.getOffset());
            } else {
                scanNode.setSortLimit(sortNode.getLimit());
            }
            scanNode.setSortInfo(sortNode.getSortInfo());
            scanNode.getSortInfo().setSortTupleSlotExprs(sortNode.resolvedTupleExprs);
        }
    }

    /**
     * outputColumnUniqueIds contain columns in OrderByExprs and outputExprs,
     * push output column unique id set to olap scan.
     *
     * when query to storage layer, there are need read raw data
     * for columns which in outputColumnUniqueIds
     *
     * for example:
     * select A from tb where B = 1 and C > 'hello' order by B;
     *
     * column unique id for `A` and `B` will put into outputColumnUniqueIds.
     *
    */
    // this opt will only work with nereidsPlanner
    private void pushOutColumnUniqueIdsToOlapScan(PlanFragment rootFragment, Analyzer analyzer) {
        Set<Integer> outputColumnUniqueIds = new HashSet<>();
        // add '-1' to avoid the optimization incorrect work with OriginalPlanner,
        // because in the storage layer will skip this optimization if outputColumnUniqueIds contains '-1',
        // to ensure the optimization only correct work with nereidsPlanner
        outputColumnUniqueIds.add(-1);

        for (PlanFragment fragment : fragments) {
            PlanNode node = fragment.getPlanRoot();
            if (!(node instanceof OlapScanNode)) {
                continue;
            }

            OlapScanNode scanNode = (OlapScanNode) node;
            scanNode.setOutputColumnUniqueIds(outputColumnUniqueIds);
        }
    }

    /**
     * optimize for topn query like: SELECT * FROM t1 WHERE a>100 ORDER BY b,c LIMIT 100
     * the pre-requirement is as follows:
     * 1. only contains SortNode + OlapScanNode
     * 2. limit > 0
     * 3. first expression of order by is a table column
     */
    private void checkAndSetTopnOpt(PlanNode node) {
        if (node instanceof SortNode && node.getChildren().size() == 1) {
            SortNode sortNode = (SortNode) node;
            PlanNode child = sortNode.getChild(0);
            if (child instanceof OlapScanNode && sortNode.getLimit() > 0
                    && ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null
                    && sortNode.getLimit() <= ConnectContext.get().getSessionVariable().topnFilterLimitThreshold
                    && sortNode.getSortInfo().getOrigOrderingExprs().size() > 0) {
                Expr firstSortExpr = sortNode.getSortInfo().getOrigOrderingExprs().get(0);
                if (firstSortExpr instanceof SlotRef && !firstSortExpr.getType().isFloatingPointType()) {
                    OlapScanNode scanNode = (OlapScanNode) child;
                    if (scanNode.isDupKeysOrMergeOnWrite()) {
                        sortNode.setUseTopnOpt(true);
                        // scanNode.setUseTopnOpt(true);
                    }
                }
            }
        }
    }

    private static class QueryStatisticsTransferOptimizer {
        private final PlanFragment root;

        public QueryStatisticsTransferOptimizer(PlanFragment root) {
            Preconditions.checkNotNull(root);
            this.root = root;
        }

        public void optimizeQueryStatisticsTransfer() {
            optimizeQueryStatisticsTransfer(root, null);
        }

        private void optimizeQueryStatisticsTransfer(PlanFragment fragment, PlanFragment parent) {
            if (parent != null && hasLimit(parent.getPlanRoot(), fragment.getPlanRoot())) {
                fragment.setTransferQueryStatisticsWithEveryBatch(true);
            }
            for (PlanFragment child : fragment.getChildren()) {
                optimizeQueryStatisticsTransfer(child, fragment);
            }
        }

        // Check whether leaf node contains limit.
        private boolean hasLimit(PlanNode ancestor, PlanNode successor) {
            final List<PlanNode> exchangeNodes = Lists.newArrayList();
            collectExchangeNode(ancestor, exchangeNodes);
            for (PlanNode leaf : exchangeNodes) {
                if (leaf.getChild(0) == successor
                        && leaf.hasLimit()) {
                    return true;
                }
            }
            return false;
        }

        private void collectExchangeNode(PlanNode planNode, List<PlanNode> exchangeNodes) {
            if (planNode instanceof ExchangeNode) {
                exchangeNodes.add(planNode);
            }

            for (PlanNode child : planNode.getChildren()) {
                if (child instanceof ExchangeNode) {
                    exchangeNodes.add(child);
                } else {
                    collectExchangeNode(child, exchangeNodes);
                }
            }
        }
    }

    @Override
    public DescriptorTable getDescTable() {
        return analyzer.getDescTbl();
    }

    private void collectQueryStat(PlanNode root) {
        try {
            if (root instanceof ScanNode) {
                StatsDelta delta = ((ScanNode) root).genQueryStats();
                if (delta != null && !delta.empty()) {
                    Env.getCurrentEnv().getQueryStats().addStats(delta);
                    if (!delta.getTabletStats().isEmpty()) {
                        Env.getCurrentEnv().getQueryStats().addStats(delta.getTabletStats());
                    }
                }
            }
            for (PlanNode child : root.getChildren()) {
                collectQueryStat(child);
            }
        } catch (UserException e) {
            LOG.info("failed to collect query stat: {}", e.getMessage());
        }
    }

    @Override
    public Optional<ResultSet> handleQueryInFe(StatementBase parsedStmt) {
        if (!(parsedStmt instanceof SelectStmt)) {
            return Optional.empty();
        }
        SelectStmt parsedSelectStmt = (SelectStmt) parsedStmt;
        if (!parsedSelectStmt.getTableRefs().isEmpty()) {
            return Optional.empty();
        }
        List<SelectListItem> selectItems = parsedSelectStmt.getSelectList().getItems();
        List<Column> columns = new ArrayList<>(selectItems.size());
        List<String> columnLabels = parsedSelectStmt.getColLabels();
        List<String> data = new ArrayList<>();
        FormatOptions options = FormatOptions.getDefault();
        for (int i = 0; i < selectItems.size(); i++) {
            SelectListItem item = selectItems.get(i);
            Expr expr = item.getExpr();
            String columnName = columnLabels.get(i);
            if (expr instanceof LiteralExpr) {
                columns.add(new Column(columnName, expr.getType()));
                data.add(((LiteralExpr) expr).getStringValueForQuery(options));
            } else {
                return Optional.empty();
            }
        }
        ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
        ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data));
        return Optional.of(resultSet);
    }
}