Coverage Report

Created: 2026-06-17 08:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/vexpr_context.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <glog/logging.h>
21
22
#include <algorithm>
23
#include <cstddef>
24
#include <memory>
25
#include <optional>
26
#include <unordered_map>
27
#include <utility>
28
#include <vector>
29
30
#include "common/factory_creator.h"
31
#include "common/status.h"
32
#include "core/block/block.h"
33
#include "core/block/column_with_type_and_name.h"
34
#include "core/column/column.h"
35
#include "exec/runtime_filter/runtime_filter_selectivity.h"
36
#include "exprs/function_context.h"
37
#include "exprs/vexpr_fwd.h"
38
#include "runtime/runtime_state.h"
39
#include "runtime/scan_filter_profile.h"
40
#include "storage/index/ann/ann_range_search_runtime.h"
41
#include "storage/index/ann/ann_search_params.h"
42
#include "storage/index/inverted/inverted_index_reader.h"
43
#include "storage/segment/column_reader.h"
44
45
namespace doris {
46
class RowDescriptor;
47
class RuntimeState;
48
} // namespace doris
49
50
namespace doris::segment_v2 {
51
class Segment;
52
class ColumnIterator;
53
} // namespace doris::segment_v2
54
55
namespace doris {
56
57
class ScoreRuntime;
58
using ScoreRuntimeSPtr = std::shared_ptr<ScoreRuntime>;
59
60
class IndexExecContext {
61
public:
62
    IndexExecContext(const std::vector<ColumnId>& col_ids,
63
                     const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& index_iterators,
64
                     const std::vector<IndexFieldNameAndTypePair>& storage_name_and_type_vec,
65
                     std::unordered_map<ColumnId, std::unordered_map<const VExpr*, bool>>&
66
                             common_expr_index_status,
67
                     ScoreRuntimeSPtr score_runtime, segment_v2::Segment* segment,
68
                     const segment_v2::ColumnIteratorOptions& column_iter_opts)
69
2.09M
            : _col_ids(col_ids),
70
2.09M
              _index_iterators(index_iterators),
71
2.09M
              _storage_name_and_type(storage_name_and_type_vec),
72
2.09M
              _expr_index_status(common_expr_index_status),
73
2.09M
              _score_runtime(std::move(score_runtime)),
74
2.09M
              _segment(segment),
75
2.09M
              _column_iter_opts(column_iter_opts) {}
76
77
16.8k
    segment_v2::IndexIterator* get_inverted_index_iterator_by_column_id(int column_index) const {
78
16.8k
        if (column_index < 0 || column_index >= _col_ids.size()) {
79
0
            return nullptr;
80
0
        }
81
16.8k
        const auto& column_id = _col_ids[column_index];
82
16.8k
        if (column_id >= _index_iterators.size()) {
83
0
            return nullptr;
84
0
        }
85
16.8k
        if (!_index_iterators[column_id]) {
86
4.75k
            return nullptr;
87
4.75k
        }
88
12.0k
        return _index_iterators[column_id].get();
89
16.8k
    }
90
91
14
    segment_v2::IndexIterator* get_inverted_index_iterator_by_id(ColumnId column_id) const {
92
14
        if (column_id >= _index_iterators.size()) {
93
0
            return nullptr;
94
0
        }
95
14
        if (!_index_iterators[column_id]) {
96
14
            return nullptr;
97
14
        }
98
0
        return _index_iterators[column_id].get();
99
14
    }
100
101
    const IndexFieldNameAndTypePair* get_storage_name_and_type_by_column_id(
102
16.5k
            int column_index) const {
103
16.5k
        if (column_index < 0 || column_index >= _col_ids.size()) {
104
0
            return nullptr;
105
0
        }
106
16.5k
        const auto& column_id = _col_ids[column_index];
107
16.5k
        if (column_id >= _storage_name_and_type.size()) {
108
2
            return nullptr;
109
2
        }
110
16.5k
        return &_storage_name_and_type[column_id];
111
16.5k
    }
112
113
28
    const IndexFieldNameAndTypePair* get_storage_name_and_type_by_id(ColumnId column_id) const {
114
28
        if (column_id >= _storage_name_and_type.size()) {
115
0
            return nullptr;
116
0
        }
117
28
        return &_storage_name_and_type[column_id];
118
28
    }
119
120
0
    int column_index_by_id(ColumnId column_id) const {
121
0
        for (int i = 0; i < _col_ids.size(); ++i) {
122
0
            if (_col_ids[i] == column_id) {
123
0
                return i;
124
0
            }
125
0
        }
126
0
        return -1;
127
0
    }
128
129
0
    bool get_column_id(int column_index, ColumnId* column_id) const {
130
0
        if (column_id == nullptr) {
131
0
            return false;
132
0
        }
133
0
        if (column_index < 0 || column_index >= _col_ids.size()) {
134
0
            return false;
135
0
        }
136
0
        *column_id = _col_ids[column_index];
137
0
        return true;
138
0
    }
139
140
28
    segment_v2::Segment* segment() const { return _segment; }
141
142
0
    const segment_v2::ColumnIteratorOptions& column_iter_opts() const { return _column_iter_opts; }
143
144
27.5k
    bool has_index_result_for_expr(const VExpr* expr) const {
145
27.5k
        return _index_result_bitmap.contains(expr);
146
27.5k
    }
147
148
    void set_index_result_for_expr(const VExpr* expr,
149
11.0k
                                   segment_v2::InvertedIndexResultBitmap bitmap) {
150
11.0k
        _index_result_bitmap[expr] = std::move(bitmap);
151
11.0k
    }
152
153
    std::unordered_map<const VExpr*, segment_v2::InvertedIndexResultBitmap>&
154
15.2k
    get_index_result_bitmap() {
155
15.2k
        return _index_result_bitmap;
156
15.2k
    }
157
158
24.0k
    std::unordered_map<const VExpr*, ColumnPtr>& get_index_result_column() {
159
24.0k
        return _index_result_column;
160
24.0k
    }
161
162
13.2k
    const segment_v2::InvertedIndexResultBitmap* get_index_result_for_expr(const VExpr* expr) {
163
13.2k
        auto iter = _index_result_bitmap.find(expr);
164
13.2k
        if (iter == _index_result_bitmap.end()) {
165
1.00k
            return nullptr;
166
1.00k
        }
167
12.2k
        return &iter->second;
168
13.2k
    }
169
170
1.43k
    void set_index_result_column_for_expr(const VExpr* expr, ColumnPtr column) {
171
1.43k
        _index_result_column[expr] = std::move(column);
172
1.43k
    }
173
174
9.61k
    void set_true_for_index_status(const VExpr* expr, int column_index) {
175
9.61k
        if (column_index < 0 || column_index >= _col_ids.size()) {
176
0
            return;
177
0
        }
178
9.61k
        const auto& column_id = _col_ids[column_index];
179
9.67k
        if (_expr_index_status.contains(column_id)) {
180
9.67k
            if (_expr_index_status[column_id].contains(expr)) {
181
9.67k
                _expr_index_status[column_id][expr] = true;
182
9.67k
            }
183
9.67k
        }
184
9.61k
    }
185
186
3.24k
    ScoreRuntimeSPtr get_score_runtime() const { return _score_runtime; }
187
188
5.15k
    void set_analyzer_ctx_for_expr(const VExpr* expr, InvertedIndexAnalyzerCtxSPtr analyzer_ctx) {
189
5.15k
        if (expr == nullptr || analyzer_ctx == nullptr) {
190
0
            return;
191
0
        }
192
5.15k
        _expr_analyzer_ctx[expr] = std::move(analyzer_ctx);
193
5.15k
    }
194
195
9.83k
    const InvertedIndexAnalyzerCtx* get_analyzer_ctx_for_expr(const VExpr* expr) const {
196
9.83k
        auto iter = _expr_analyzer_ctx.find(expr);
197
9.83k
        if (iter == _expr_analyzer_ctx.end()) {
198
5.29k
            return nullptr;
199
5.29k
        }
200
4.53k
        return iter->second.get();
201
9.83k
    }
202
203
2.09M
    void set_index_query_context(segment_v2::IndexQueryContextPtr index_query_context) {
204
2.09M
        _index_query_context = index_query_context;
205
2.09M
    }
206
207
1.33k
    const segment_v2::IndexQueryContextPtr& get_index_query_context() const {
208
1.33k
        return _index_query_context;
209
1.33k
    }
210
211
private:
212
    // A reference to a vector of column IDs for the current expression's output columns.
213
    const std::vector<ColumnId>& _col_ids;
214
215
    // A reference to a vector of unique pointers to index iterators.
216
    const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& _index_iterators;
217
218
    // A reference to a vector of storage name and type pairs related to schema.
219
    const std::vector<IndexFieldNameAndTypePair>& _storage_name_and_type;
220
221
    // A map of expressions to their corresponding inverted index result bitmaps.
222
    std::unordered_map<const VExpr*, segment_v2::InvertedIndexResultBitmap> _index_result_bitmap;
223
224
    // A map of expressions to their corresponding result columns.
225
    std::unordered_map<const VExpr*, ColumnPtr> _index_result_column;
226
227
    // Per-expression analyzer context for inverted index evaluation.
228
    std::unordered_map<const VExpr*, InvertedIndexAnalyzerCtxSPtr> _expr_analyzer_ctx;
229
230
    // A reference to a map of common expressions to their inverted index evaluation status.
231
    std::unordered_map<ColumnId, std::unordered_map<const VExpr*, bool>>& _expr_index_status;
232
233
    ScoreRuntimeSPtr _score_runtime;
234
235
    segment_v2::Segment* _segment = nullptr; // Ref
236
    segment_v2::ColumnIteratorOptions _column_iter_opts;
237
    segment_v2::IndexQueryContextPtr _index_query_context;
238
};
239
240
class VExprContext {
241
    ENABLE_FACTORY_CREATOR(VExprContext);
242
243
public:
244
29.0M
    VExprContext(VExprSPtr expr) : _root(std::move(expr)) {}
245
    ~VExprContext();
246
    [[nodiscard]] Status prepare(RuntimeState* state, const RowDescriptor& row_desc);
247
    [[nodiscard]] Status open(RuntimeState* state);
248
    [[nodiscard]] Status clone(RuntimeState* state, VExprContextSPtr& new_ctx);
249
    [[nodiscard]] Status execute(Block* block, int* result_column_id);
250
    [[nodiscard]] Status execute(const Block* block, ColumnPtr& result_column);
251
    [[nodiscard]] Status execute(const Block* block, ColumnWithTypeAndName& result_data);
252
    [[nodiscard]] DataTypePtr execute_type(const Block* block);
253
    [[nodiscard]] const std::string& expr_name() const;
254
    [[nodiscard]] bool is_blockable() const;
255
256
    [[nodiscard]] Status execute_const_expr(ColumnWithTypeAndName& result);
257
258
    double execute_cost() const;
259
260
15.4M
    VExprSPtr root() { return _root; }
261
28.6k
    void set_root(const VExprSPtr& expr) { _root = expr; }
262
19.6k
    void set_index_context(std::shared_ptr<IndexExecContext> index_context) {
263
19.6k
        _index_context = std::move(index_context);
264
19.6k
    }
265
266
1.03M
    std::shared_ptr<IndexExecContext> get_index_context() const { return _index_context; }
267
268
2.64k
    void attach_scan_filter(ScanFilterHandle handle) { _scan_filter_handle = std::move(handle); }
269
270
129k
    const ScanFilterHandle& scan_filter_handle() const { return _scan_filter_handle; }
271
272
    /// Creates a FunctionContext, and returns the index that's passed to fn_context() to
273
    /// retrieve the created context. Exprs that need a FunctionContext should call this in
274
    /// Prepare() and save the returned index. 'varargs_buffer_size', if specified, is the
275
    /// size of the varargs buffer in the created FunctionContext (see udf-internal.h).
276
    int register_function_context(RuntimeState* state, const DataTypePtr& return_type,
277
                                  const std::vector<DataTypePtr>& arg_types);
278
279
    /// Retrieves a registered FunctionContext. 'i' is the index returned by the call to
280
    /// register_function_context(). This should only be called by VExprs.
281
6.77M
    FunctionContext* fn_context(int i) {
282
6.77M
        if (i < 0 || i >= _fn_contexts.size()) {
283
0
            throw Exception(ErrorCode::INTERNAL_ERROR,
284
0
                            "fn_context index invalid, index={}, _fn_contexts.size()={}", i,
285
0
                            _fn_contexts.size());
286
0
        }
287
6.77M
        return _fn_contexts[i].get();
288
6.77M
    }
289
290
    // execute expr with inverted index which column a, b has inverted indexes
291
    //  but some situation although column b has indexes, but apply index is not useful, we should
292
    //  skip this expr, just do not apply index anymore.
293
    [[nodiscard]] Status evaluate_inverted_index(uint32_t segment_num_rows);
294
295
    bool all_expr_inverted_index_evaluated();
296
297
    Status execute_filter(const Block* block, uint8_t* __restrict result_filter_data, size_t rows,
298
                          bool accept_null, bool* can_filter_all);
299
300
    [[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block* block);
301
302
    [[nodiscard]] static Status filter_block(
303
            const VExprContextSPtrs& expr_contexts, Block* block, size_t column_to_keep,
304
            std::optional<ScanFilterStage> scan_filter_stage = std::nullopt);
305
306
    [[nodiscard]] static Status execute_conjuncts(
307
            const VExprContextSPtrs& ctxs, const std::vector<IColumn::Filter*>* filters,
308
            bool accept_null, const Block* block, IColumn::Filter* result_filter,
309
            bool* can_filter_all, std::optional<ScanFilterStage> scan_filter_stage = std::nullopt);
310
311
    [[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs& conjuncts,
312
                                                  const Block* block, ColumnUInt8& null_map,
313
                                                  IColumn::Filter& result_filter);
314
315
    static Status execute_conjuncts(
316
            const VExprContextSPtrs& ctxs, const std::vector<IColumn::Filter*>* filters,
317
            Block* block, IColumn::Filter* result_filter, bool* can_filter_all,
318
            std::optional<ScanFilterStage> scan_filter_stage = std::nullopt);
319
320
    [[nodiscard]] static Status execute_conjuncts_and_filter_block(
321
            const VExprContextSPtrs& ctxs, Block* block, std::vector<uint32_t>& columns_to_filter,
322
            int column_to_keep, std::optional<ScanFilterStage> scan_filter_stage = std::nullopt);
323
324
    static Status execute_conjuncts_and_filter_block(
325
            const VExprContextSPtrs& ctxs, Block* block, std::vector<uint32_t>& columns_to_filter,
326
            int column_to_keep, IColumn::Filter& filter,
327
            std::optional<ScanFilterStage> scan_filter_stage = std::nullopt);
328
329
    [[nodiscard]] static Status get_output_block_after_execute_exprs(const VExprContextSPtrs&,
330
                                                                     const Block&, Block*,
331
                                                                     bool do_projection = false);
332
333
18.2k
    int get_last_result_column_id() const {
334
18.2k
        DCHECK(_last_result_column_id != -1);
335
18.2k
        return _last_result_column_id;
336
18.2k
    }
337
338
149k
    RuntimeFilterSelectivity& get_runtime_filter_selectivity() {
339
149k
        if (!_rf_selectivity) {
340
0
            throw Exception(ErrorCode::INTERNAL_ERROR, "RuntimeFilterSelectivity is null");
341
0
        }
342
149k
        return *_rf_selectivity;
343
149k
    }
344
345
0
    FunctionContext::FunctionStateScope get_function_state_scope() const {
346
0
        return _is_clone ? FunctionContext::THREAD_LOCAL : FunctionContext::FRAGMENT_LOCAL;
347
0
    }
348
349
    void clone_fn_contexts(VExprContext* other);
350
351
0
    VExprContext& operator=(const VExprContext& other) {
352
0
        if (this == &other) {
353
0
            return *this;
354
0
        }
355
0
356
0
        _root = other._root;
357
0
        _is_clone = other._is_clone;
358
0
        _prepared = other._prepared;
359
0
        _opened = other._opened;
360
0
361
0
        for (const auto& fn : other._fn_contexts) {
362
0
            _fn_contexts.emplace_back(fn->clone());
363
0
        }
364
0
365
0
        _last_result_column_id = other._last_result_column_id;
366
0
        _depth_num = other._depth_num;
367
0
        _scan_filter_handle = other._scan_filter_handle;
368
0
        return *this;
369
0
    }
370
371
0
    VExprContext& operator=(VExprContext&& other) {
372
0
        _root = other._root;
373
0
        other._root = nullptr;
374
0
        _is_clone = other._is_clone;
375
0
        _prepared = other._prepared;
376
0
        _opened = other._opened;
377
0
        _fn_contexts = std::move(other._fn_contexts);
378
0
        _last_result_column_id = other._last_result_column_id;
379
0
        _depth_num = other._depth_num;
380
0
        _scan_filter_handle = other._scan_filter_handle;
381
0
        return *this;
382
0
    }
383
384
623k
    [[nodiscard]] static size_t get_memory_usage(const VExprContextSPtrs& contexts) {
385
623k
        size_t usage = 0;
386
623k
        std::for_each(contexts.cbegin(), contexts.cend(),
387
623k
                      [&usage](auto&& context) { usage += context->_memory_usage; });
388
623k
        return usage;
389
623k
    }
390
391
0
    [[nodiscard]] size_t get_memory_usage() const { return _memory_usage; }
392
393
    void prepare_ann_range_search(const doris::VectorSearchUserParams& params);
394
395
    Status evaluate_ann_range_search(
396
            const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& cid_to_index_iterators,
397
            const std::vector<ColumnId>& idx_to_cid,
398
            const std::vector<std::unique_ptr<segment_v2::ColumnIterator>>& column_iterators,
399
            const std::unordered_map<VExprContext*, std::unordered_map<ColumnId, VExpr*>>&
400
                    common_expr_to_slotref_map,
401
            roaring::Roaring& row_bitmap, segment_v2::AnnIndexStats& ann_index_stats,
402
            bool enable_result_cache, bool* ann_range_search_executed);
403
404
    uint64_t get_digest(uint64_t seed) const;
405
406
private:
407
    // Close method is called in vexpr context dector, not need call expicility
408
    void close();
409
410
    static void _reset_memory_usage(const VExprContextSPtrs& contexts);
411
412
    friend class VExpr;
413
414
    /// The expr tree this context is for.
415
    VExprSPtr _root;
416
417
    /// True if this context came from a Clone() call. Used to manage FunctionStateScope.
418
    bool _is_clone = false;
419
420
    /// Variables keeping track of current state.
421
    bool _prepared = false;
422
    bool _opened = false;
423
424
    /// FunctionContexts for each registered expression. The FunctionContexts are created
425
    /// and owned by this VExprContext.
426
    std::vector<std::unique_ptr<FunctionContext>> _fn_contexts;
427
428
    int _last_result_column_id = -1;
429
430
    /// The depth of expression-tree.
431
    int _depth_num = 0;
432
433
    std::shared_ptr<IndexExecContext> _index_context;
434
    ScanFilterHandle _scan_filter_handle;
435
    size_t _memory_usage = 0;
436
437
    segment_v2::AnnRangeSearchRuntime _ann_range_search_runtime;
438
    bool _suitable_for_ann_index = true;
439
440
    std::unique_ptr<RuntimeFilterSelectivity> _rf_selectivity =
441
            std::make_unique<RuntimeFilterSelectivity>();
442
};
443
} // namespace doris