Coverage Report

Created: 2025-10-30 15:07

/root/doris/be/src/runtime/runtime_predicate.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <functional>
21
#include <memory>
22
#include <mutex>
23
#include <shared_mutex>
24
#include <string>
25
26
#include "common/status.h"
27
#include "exec/olap_common.h"
28
#include "olap/shared_predicate.h"
29
#include "olap/tablet_schema.h"
30
#include "runtime/define_primitive_type.h"
31
#include "runtime/primitive_type.h"
32
#include "util/binary_cast.hpp"
33
#include "vec/common/arena.h"
34
#include "vec/core/field.h"
35
#include "vec/core/types.h"
36
#include "vec/runtime/vdatetime_value.h"
37
38
namespace doris {
39
class ColumnPredicate;
40
41
namespace vectorized {
42
43
class RuntimePredicate {
44
public:
45
    RuntimePredicate(const TTopnFilterDesc& desc);
46
47
    void init_target(int32_t target_node_id,
48
                     phmap::flat_hash_map<int, SlotDescriptor*> slot_id_to_slot_desc);
49
50
0
    bool enable() const {
51
        // when sort node and scan node are not in the same fragment, predicate will be disabled
52
0
        std::shared_lock<std::shared_mutex> rlock(_rwlock);
53
0
        return _detected_source && _detected_target;
  Branch (53:16): [True: 0, False: 0]
  Branch (53:36): [True: 0, False: 0]
54
0
    }
55
56
0
    void set_detected_source() {
57
0
        std::unique_lock<std::shared_mutex> wlock(_rwlock);
58
0
        _detected_source = true;
59
0
    }
60
61
0
    Status set_tablet_schema(int32_t target_node_id, TabletSchemaSPtr tablet_schema) {
62
0
        std::unique_lock<std::shared_mutex> wlock(_rwlock);
63
0
        check_target_node_id(target_node_id);
64
0
        if (_contexts[target_node_id].tablet_schema) {
  Branch (64:13): [True: 0, False: 0]
65
0
            return Status::OK();
66
0
        }
67
0
        RETURN_IF_ERROR(tablet_schema->have_column(_contexts[target_node_id].col_name));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
68
0
        _contexts[target_node_id].tablet_schema = tablet_schema;
69
0
        int64_t index = DORIS_TRY(_contexts[target_node_id].get_field_index())
Line
Count
Source
716
0
    ({                                           \
717
0
        auto&& res = (stmt);                     \
718
0
        using T = std::decay_t<decltype(res)>;   \
719
0
        if (!res.has_value()) [[unlikely]] {     \
  Branch (719:13): [True: 0, False: 0]
720
0
            return std::forward<T>(res).error(); \
721
0
        }                                        \
722
0
        std::forward<T>(res).value();            \
723
0
    });
70
0
                                _contexts[target_node_id]
71
0
                                        .predicate = SharedPredicate::create_shared(index);
72
0
        return Status::OK();
73
0
    }
74
75
0
    std::shared_ptr<ColumnPredicate> get_predicate(int32_t target_node_id) {
76
0
        std::shared_lock<std::shared_mutex> rlock(_rwlock);
77
0
        check_target_node_id(target_node_id);
78
0
        return _contexts.find(target_node_id)->second.predicate;
79
0
    }
80
81
    Status update(const Field& value);
82
83
0
    bool has_value() const {
84
0
        std::shared_lock<std::shared_mutex> rlock(_rwlock);
85
0
        return _has_value;
86
0
    }
87
88
0
    Field get_value() const {
89
0
        std::shared_lock<std::shared_mutex> rlock(_rwlock);
90
0
        return _orderby_extrem;
91
0
    }
92
93
0
    std::string get_col_name(int32_t target_node_id) const {
94
0
        check_target_node_id(target_node_id);
95
0
        return _contexts.find(target_node_id)->second.col_name;
96
0
    }
97
98
0
    bool is_asc() const { return _is_asc; }
99
100
0
    bool nulls_first() const { return _nulls_first; }
101
102
0
    bool target_is_slot(int32_t target_node_id) const {
103
0
        check_target_node_id(target_node_id);
104
0
        return _contexts.find(target_node_id)->second.target_is_slot();
105
0
    }
106
107
0
    const TExpr& get_texpr(int32_t target_node_id) const {
108
0
        check_target_node_id(target_node_id);
109
0
        return _contexts.find(target_node_id)->second.expr;
110
0
    }
111
112
private:
113
0
    void check_target_node_id(int32_t target_node_id) const {
114
0
        if (!_contexts.contains(target_node_id)) {
  Branch (114:13): [True: 0, False: 0]
115
0
            std::string msg = "context target node ids: [";
116
0
            bool first = true;
117
0
            for (auto p : _contexts) {
  Branch (117:25): [True: 0, False: 0]
118
0
                if (first) {
  Branch (118:21): [True: 0, False: 0]
119
0
                    first = false;
120
0
                } else {
121
0
                    msg += ',';
122
0
                }
123
0
                msg += std::to_string(p.first);
124
0
            }
125
0
            msg += "], input target node is: " + std::to_string(target_node_id);
126
0
            DCHECK(false) << msg;
127
0
        }
128
0
    }
129
    struct TargetContext {
130
        TExpr expr;
131
        std::string col_name;
132
        TabletSchemaSPtr tablet_schema;
133
        std::shared_ptr<ColumnPredicate> predicate;
134
135
0
        Result<int32_t> get_field_index() {
136
0
            const auto& column = *DORIS_TRY(tablet_schema->column(col_name));
Line
Count
Source
716
0
    ({                                           \
717
0
        auto&& res = (stmt);                     \
718
0
        using T = std::decay_t<decltype(res)>;   \
719
0
        if (!res.has_value()) [[unlikely]] {     \
  Branch (719:13): [True: 0, False: 0]
720
0
            return std::forward<T>(res).error(); \
721
0
        }                                        \
722
0
        std::forward<T>(res).value();            \
723
0
    });
137
0
            return tablet_schema->field_index(column.unique_id());
138
0
        }
139
140
0
        bool target_is_slot() const { return expr.nodes[0].node_type == TExprNodeType::SLOT_REF; }
141
    };
142
143
    bool _init(PrimitiveType type);
144
145
    mutable std::shared_mutex _rwlock;
146
147
    bool _nulls_first;
148
    bool _is_asc;
149
    std::map<int32_t, TargetContext> _contexts;
150
151
    Field _orderby_extrem {Field::Types::Null};
152
    Arena _predicate_arena;
153
    std::function<std::string(const Field&)> _get_value_fn;
154
    std::function<ColumnPredicate*(const TabletColumn&, int, const std::string&, bool,
155
                                   vectorized::Arena*)>
156
            _pred_constructor;
157
    bool _detected_source = false;
158
    bool _detected_target = false;
159
    bool _has_value = false;
160
};
161
162
} // namespace vectorized
163
} // namespace doris