Coverage Report

Created: 2026-07-03 22:26

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/vtopn_pred.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/types.pb.h>
21
22
#include <utility>
23
24
#include "core/block/column_numbers.h"
25
#include "core/data_type/data_type.h"
26
#include "exec/common/util.hpp"
27
#include "exprs/function/simple_function_factory.h"
28
#include "exprs/vectorized_fn_call.h"
29
#include "exprs/vexpr.h"
30
#include "exprs/vslot_ref.h"
31
#include "runtime/query_context.h"
32
#include "runtime/runtime_predicate.h"
33
#include "runtime/runtime_state.h"
34
35
namespace doris {
36
37
// only used for dynamic topn filter
38
class VTopNPred : public VExpr {
39
    ENABLE_FACTORY_CREATOR(VTopNPred);
40
41
public:
42
    VTopNPred(const TExprNode& node, int source_node_id, VExprContextSPtr target_ctx)
43
19.1k
            : VExpr(node),
44
19.1k
              _source_node_id(source_node_id),
45
19.1k
              _expr_name(fmt::format("VTopNPred(source_node_id={})", _source_node_id)),
46
19.1k
              _target_ctx(std::move(target_ctx)) {}
47
9.55k
    bool is_topn_filter() const override { return true; }
48
49
5.45k
    static Status create_vtopn_pred(const TExpr& target_expr, int source_node_id, VExprSPtr& expr) {
50
5.45k
        VExprContextSPtr target_ctx;
51
5.45k
        RETURN_IF_ERROR(VExpr::create_expr_tree(target_expr, target_ctx));
52
53
5.45k
        TExprNode node;
54
5.45k
        node.__set_node_type(TExprNodeType::FUNCTION_CALL);
55
5.45k
        node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
56
5.45k
        node.__set_is_nullable(target_ctx->root()->is_nullable());
57
5.45k
        expr = VTopNPred::create_shared(node, source_node_id, target_ctx);
58
59
5.45k
        DCHECK(target_ctx->root() != nullptr);
60
5.45k
        expr->add_child(target_ctx->root());
61
62
5.45k
        return Status::OK();
63
5.45k
    }
64
65
4.76k
    int source_node_id() const { return _source_node_id; }
66
13.7k
    Status clone_node(VExprSPtr* cloned_expr) const override {
67
13.7k
        DORIS_CHECK(cloned_expr != nullptr);
68
13.7k
        *cloned_expr = VTopNPred::create_shared(clone_texpr_node(), _source_node_id, nullptr);
69
13.7k
        return Status::OK();
70
13.7k
    }
71
72
10.0k
    Status prepare(RuntimeState* state, const RowDescriptor& desc, VExprContext* context) override {
73
10.0k
        _predicate = &state->get_query_ctx()->get_runtime_predicate(_source_node_id);
74
10.0k
        RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
75
76
10.0k
        ColumnsWithTypeAndName argument_template;
77
10.0k
        argument_template.emplace_back(nullptr, _children[0]->data_type(),
78
10.0k
                                       _children[0]->expr_name());
79
10.0k
        argument_template.emplace_back(nullptr, _children[0]->data_type(), "topn value");
80
81
10.0k
        _function = SimpleFunctionFactory::instance().get_function(
82
10.0k
                _predicate->is_asc() ? "le" : "ge", argument_template, _data_type, {},
83
10.0k
                state->be_exec_version());
84
10.0k
        if (!_function) {
85
0
            return Status::InternalError("get function failed");
86
0
        }
87
10.0k
        return Status::OK();
88
10.0k
    }
89
90
    Status execute_column_impl(VExprContext* context, const Block* block, const Selector* selector,
91
169k
                               size_t count, ColumnPtr& result_column) const override {
92
169k
        if (!_predicate->has_value()) {
93
18.8k
            result_column = create_always_true_column(count, _data_type->is_nullable());
94
18.8k
            return Status::OK();
95
18.8k
        }
96
97
150k
        Block temp_block;
98
99
        // slot
100
150k
        ColumnPtr slot_column;
101
150k
        RETURN_IF_ERROR(_children[0]->execute_column(context, block, selector, count, slot_column));
102
150k
        auto slot_type = _children[0]->execute_type(block);
103
150k
        temp_block.insert({slot_column, slot_type, _children[0]->expr_name()});
104
150k
        int slot_id = 0;
105
106
        // topn value
107
150k
        Field field = _predicate->get_value();
108
150k
        auto column_ptr = _children[0]->data_type()->create_column_const(1, field);
109
150k
        int topn_value_id = VExpr::insert_param(&temp_block,
110
150k
                                                {column_ptr, _children[0]->data_type(), _expr_name},
111
150k
                                                std::max(count, column_ptr->size()));
112
113
        // if error(slot_id == -1), will return.
114
150k
        ColumnNumbers arguments = {static_cast<uint32_t>(slot_id),
115
150k
                                   static_cast<uint32_t>(topn_value_id)};
116
117
150k
        uint32_t num_columns_without_result = temp_block.columns();
118
        // prepare a column to save result
119
150k
        temp_block.insert({nullptr, _data_type, _expr_name});
120
121
150k
        RETURN_IF_ERROR(_function->execute(nullptr, temp_block, arguments,
122
150k
                                           num_columns_without_result, temp_block.rows()));
123
150k
        result_column = std::move(temp_block.get_by_position(num_columns_without_result).column);
124
150k
        if (is_nullable() && _predicate->nulls_first()) {
125
            // null values ​​are always not filtered
126
149k
            result_column = change_null_to_true(std::move(result_column));
127
149k
        }
128
150k
        DCHECK_EQ(result_column->size(), count);
129
150k
        return Status::OK();
130
150k
    }
131
132
790
    const std::string& expr_name() const override { return _expr_name; }
133
134
    // only used in external table (for min-max filter). get `slot > xxx`, not `function(slot) > xxx`.
135
4.45k
    bool get_binary_expr(VExprSPtr& new_root) const {
136
4.45k
        if (!get_child(0)->is_slot_ref()) {
137
            // top rf maybe is `xxx order by abs(column) limit xxx`.
138
440
            return false;
139
440
        }
140
141
4.01k
        if (!_predicate->has_value()) {
142
1.99k
            return false;
143
1.99k
        }
144
145
2.02k
        auto* slot_ref = assert_cast<VSlotRef*>(get_child(0).get());
146
2.02k
        auto slot_data_type = remove_nullable(slot_ref->data_type());
147
2.02k
        {
148
2.02k
            TFunction fn;
149
2.02k
            TFunctionName fn_name;
150
2.02k
            fn_name.__set_db_name("");
151
2.02k
            fn_name.__set_function_name(_predicate->is_asc() ? "le" : "ge");
152
2.02k
            fn.__set_name(fn_name);
153
2.02k
            fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
154
2.02k
            std::vector<TTypeDesc> arg_types;
155
2.02k
            arg_types.push_back(create_type_desc(slot_data_type->get_primitive_type(),
156
2.02k
                                                 slot_data_type->get_precision(),
157
2.02k
                                                 slot_data_type->get_scale()));
158
159
2.02k
            arg_types.push_back(create_type_desc(slot_data_type->get_primitive_type(),
160
2.02k
                                                 slot_data_type->get_precision(),
161
2.02k
                                                 slot_data_type->get_scale()));
162
2.02k
            fn.__set_arg_types(arg_types);
163
2.02k
            fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
164
2.02k
            fn.__set_has_var_args(false);
165
166
2.02k
            TExprNode texpr_node;
167
2.02k
            texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
168
2.02k
            texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
169
2.02k
            texpr_node.__set_opcode(_predicate->is_asc() ? TExprOpcode::LE : TExprOpcode::GE);
170
2.02k
            texpr_node.__set_fn(fn);
171
2.02k
            texpr_node.__set_num_children(2);
172
2.02k
            texpr_node.__set_is_nullable(is_nullable());
173
2.02k
            new_root = VectorizedFnCall::create_shared(texpr_node);
174
2.02k
        }
175
176
2.02k
        {
177
            // add slot
178
2.02k
            new_root->add_child(children().at(0));
179
2.02k
        }
180
        // add Literal
181
2.02k
        {
182
2.02k
            Field field = _predicate->get_value();
183
2.02k
            TExprNode node = create_texpr_node_from(field, slot_data_type->get_primitive_type(),
184
2.02k
                                                    slot_data_type->get_precision(),
185
2.02k
                                                    slot_data_type->get_scale());
186
2.02k
            new_root->add_child(VLiteral::create_shared(node));
187
2.02k
        }
188
189
        // Since the normal greater than or less than relationship does not consider the relationship of null values, the generated `col >=/<= xxx OR col is null.`
190
2.02k
        if (_predicate->nulls_first()) {
191
1.92k
            VExprSPtr col_is_null_node;
192
1.92k
            {
193
1.92k
                TFunction fn;
194
1.92k
                TFunctionName fn_name;
195
1.92k
                fn_name.__set_db_name("");
196
1.92k
                fn_name.__set_function_name("is_null_pred");
197
1.92k
                fn.__set_name(fn_name);
198
1.92k
                fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
199
1.92k
                std::vector<TTypeDesc> arg_types;
200
1.92k
                arg_types.push_back(create_type_desc(slot_data_type->get_primitive_type(),
201
1.92k
                                                     slot_data_type->get_precision(),
202
1.92k
                                                     slot_data_type->get_scale()));
203
1.92k
                fn.__set_arg_types(arg_types);
204
1.92k
                fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
205
1.92k
                fn.__set_has_var_args(false);
206
207
1.92k
                TExprNode texpr_node;
208
1.92k
                texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
209
1.92k
                texpr_node.__set_node_type(TExprNodeType::FUNCTION_CALL);
210
1.92k
                texpr_node.__set_fn(fn);
211
1.92k
                texpr_node.__set_num_children(1);
212
1.92k
                col_is_null_node = VectorizedFnCall::create_shared(texpr_node);
213
214
                // add slot.
215
1.92k
                col_is_null_node->add_child(children().at(0));
216
1.92k
            }
217
218
1.92k
            VExprSPtr or_node;
219
1.92k
            {
220
1.92k
                TExprNode texpr_node;
221
1.92k
                texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
222
1.92k
                texpr_node.__set_node_type(TExprNodeType::COMPOUND_PRED);
223
1.92k
                texpr_node.__set_opcode(TExprOpcode::COMPOUND_OR);
224
1.92k
                texpr_node.__set_num_children(2);
225
1.92k
                or_node = VectorizedFnCall::create_shared(texpr_node);
226
1.92k
            }
227
228
1.92k
            or_node->add_child(col_is_null_node);
229
1.92k
            or_node->add_child(new_root);
230
1.92k
            new_root = or_node;
231
1.92k
        }
232
233
2.02k
        return true;
234
4.01k
    }
235
236
private:
237
    int _source_node_id;
238
    std::string _expr_name;
239
    RuntimePredicate* _predicate = nullptr;
240
    FunctionBasePtr _function;
241
    VExprContextSPtr _target_ctx;
242
};
243
244
} // namespace doris