Coverage Report

Created: 2026-06-12 10:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/scan_operator.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <cstdint>
21
#include <set>
22
#include <string>
23
24
#include "common/status.h"
25
#include "common/thread_safety_annotations.h"
26
#include "core/field.h"
27
#include "exec/common/util.hpp"
28
#include "exec/operator/operator.h"
29
#include "exec/pipeline/dependency.h"
30
#include "exec/runtime_filter/runtime_filter_consumer_helper.h"
31
#include "exec/runtime_filter/runtime_filter_partition_pruner.h"
32
#include "exec/scan/scan_node.h"
33
#include "exec/scan/scanner_context.h"
34
#include "exprs/function_filter.h"
35
#include "exprs/vectorized_fn_call.h"
36
#include "exprs/vin_predicate.h"
37
#include "runtime/descriptors.h"
38
#include "storage/predicate/filter_olap_param.h"
39
40
namespace doris {
41
class ScannerDelegate;
42
class OlapScanner;
43
} // namespace doris
44
45
namespace doris {
46
47
enum class PushDownType {
48
    // The predicate can not be pushed down to data source
49
    UNACCEPTABLE,
50
    // The predicate can be pushed down to data source
51
    // and the data source can fully evaludate it
52
    ACCEPTABLE,
53
    // The predicate can be pushed down to data source
54
    // but the data source can not fully evaluate it.
55
    PARTIAL_ACCEPTABLE
56
};
57
58
class ScanLocalStateBase : public PipelineXLocalState<> {
59
public:
60
    ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent)
61
126
            : PipelineXLocalState<>(state, parent), _helper(parent->runtime_filter_descs()) {}
62
126
    ~ScanLocalStateBase() override = default;
63
64
    [[nodiscard]] virtual bool should_run_serial() const = 0;
65
66
    virtual RuntimeProfile* scanner_profile() = 0;
67
68
    [[nodiscard]] virtual const TupleDescriptor* input_tuple_desc() const = 0;
69
    [[nodiscard]] virtual const TupleDescriptor* output_tuple_desc() const = 0;
70
71
    virtual int64_t limit_per_scanner() = 0;
72
    virtual std::atomic<int64_t>* shared_scan_limit_ptr() = 0;
73
74
    virtual void set_scan_ranges(RuntimeState* state,
75
                                 const std::vector<TScanRangeParams>& scan_ranges) = 0;
76
    virtual TPushAggOp::type get_push_down_agg_type() = 0;
77
78
    // If scan operator is serial operator(like topn), its real parallelism is 1.
79
    // Otherwise, its real parallelism is query_parallel_instance_num.
80
    // query_parallel_instance_num of olap table is usually equal to session var parallel_pipeline_task_num.
81
    // for file scan operator, its real parallelism will be 1 if it is in batch mode.
82
    // Related pr:
83
    // https://github.com/apache/doris/pull/42460
84
    // https://github.com/apache/doris/pull/44635
85
    [[nodiscard]] virtual int max_scanners_concurrency(RuntimeState* state) const;
86
    [[nodiscard]] virtual int min_scanners_concurrency(RuntimeState* state) const;
87
    [[nodiscard]] virtual ScannerScheduler* scan_scheduler(RuntimeState* state) const;
88
89
    // Thread-safe check whether a partition has been pruned by runtime filter.
90
    // Callable from any scan type's scanner in scheduling threads.
91
    bool is_partition_pruned(int64_t partition_id) const;
92
93
0
    [[nodiscard]] std::string get_name() { return _parent->get_name(); }
94
95
15
    uint64_t get_condition_cache_digest() const { return _condition_cache_digest; }
96
97
    Status update_late_arrival_runtime_filter(RuntimeState* state, int& arrived_rf_num);
98
99
    Status clone_conjunct_ctxs(VExprContextSPtrs& scanner_conjuncts);
100
101
protected:
102
    friend class ScannerContext;
103
    friend class Scanner;
104
105
    virtual Status _init_profile() = 0;
106
107
    // Hook for subclasses to react after new runtime filters are appended.
108
    // Called inside update_late_arrival_runtime_filter() while _conjuncts_lock is held.
109
    // Default implementation runs partition pruning on the newly appended RFs.
110
    virtual Status _on_runtime_filter_update();
111
112
    Status _do_partition_pruning_by_rf();
113
114
    std::atomic<bool> _opened {false};
115
116
    DependencySPtr _scan_dependency = nullptr;
117
118
    std::shared_ptr<RuntimeProfile> _scanner_profile;
119
    RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr;
120
    // Num of newly created free blocks when running query
121
    RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
122
    // Max num of scanner thread
123
    RuntimeProfile::Counter* _max_scan_concurrency = nullptr;
124
    RuntimeProfile::Counter* _min_scan_concurrency = nullptr;
125
    RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr;
126
    // time of get block from scanner
127
    RuntimeProfile::Counter* _scan_timer = nullptr;
128
    RuntimeProfile::Counter* _scan_cpu_timer = nullptr;
129
    // time of filter output block from scanner
130
    RuntimeProfile::Counter* _filter_timer = nullptr;
131
    // rows read from the scanner (including those discarded by (pre)filters)
132
    RuntimeProfile::Counter* _rows_read_counter = nullptr;
133
134
    RuntimeProfile::Counter* _num_scanners = nullptr;
135
136
    RuntimeProfile::Counter* _wait_for_rf_timer = nullptr;
137
138
    RuntimeProfile::Counter* _scan_rows = nullptr;
139
    RuntimeProfile::Counter* _scan_bytes = nullptr;
140
141
    AnnotatedMutex _conjuncts_lock;
142
    RuntimeFilterConsumerHelper _helper;
143
    // magic number as seed to generate hash value for condition cache
144
    uint64_t _condition_cache_digest = 0;
145
    // condition cache filter stats
146
    RuntimeProfile::Counter* _condition_cache_hit_counter = nullptr;
147
    RuntimeProfile::Counter* _condition_cache_filtered_rows_counter = nullptr;
148
149
    // ---- Runtime-filter partition pruning (scan-agnostic) ----
150
    RuntimeFilterPartitionPruner _rf_partition_pruner;
151
    RuntimeProfile::Counter* _partitions_pruned_by_rf_counter = nullptr;
152
    RuntimeProfile::Counter* _total_partitions_rf_counter = nullptr;
153
154
    // Moved from ScanLocalState<Derived> to avoid re-instantiation for each Derived type.
155
    std::atomic<bool> _eos = false;
156
    int _max_pushdown_conditions_per_column = 1024;
157
    // Save all function predicates which may be pushed down to data source.
158
    std::vector<FunctionFilter> _push_down_functions;
159
160
    // Virtual methods with default implementations; overridden by subclasses when supported.
161
    // Declared here so that the normalize methods below (non-Derived-template) can call them.
162
0
    virtual bool _push_down_topn(const RuntimePredicate& predicate) { return false; }
163
0
    virtual PushDownType _should_push_down_bloom_filter() const {
164
0
        return PushDownType::UNACCEPTABLE;
165
0
    }
166
0
    virtual PushDownType _should_push_down_topn_filter() const {
167
0
        return PushDownType::UNACCEPTABLE;
168
0
    }
169
0
    virtual PushDownType _should_push_down_is_null_predicate(VectorizedFnCall* fn_call) const {
170
0
        return PushDownType::UNACCEPTABLE;
171
0
    }
172
0
    virtual PushDownType _should_push_down_in_predicate() const {
173
0
        return PushDownType::UNACCEPTABLE;
174
0
    }
175
    virtual PushDownType _should_push_down_binary_predicate(
176
            VectorizedFnCall* fn_call, VExprContext* expr_ctx, Field& constant_val,
177
0
            const std::set<std::string> fn_name) const {
178
0
        return PushDownType::UNACCEPTABLE;
179
0
    }
180
    virtual Status _should_push_down_function_filter(VectorizedFnCall* fn_call,
181
                                                     VExprContext* expr_ctx,
182
                                                     StringRef* constant_str,
183
                                                     doris::FunctionContext** fn_ctx,
184
0
                                                     PushDownType& pdt) {
185
0
        pdt = PushDownType::UNACCEPTABLE;
186
0
        return Status::OK();
187
0
    }
188
189
    // Non-templated normalize methods, moved here to avoid re-compilation per Derived type.
190
    Status _eval_const_conjuncts(VExprContext* expr_ctx, PushDownType* pdt);
191
    Status _normalize_bloom_filter(VExprContext* expr_ctx, const VExprSPtr& root,
192
                                   SlotDescriptor* slot,
193
                                   std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
194
                                   PushDownType* pdt);
195
    Status _normalize_topn_filter(VExprContext* expr_ctx, const VExprSPtr& root,
196
                                  SlotDescriptor* slot,
197
                                  std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
198
                                  PushDownType* pdt);
199
    Status _normalize_function_filters(VExprContext* expr_ctx, SlotDescriptor* slot,
200
                                       PushDownType* pdt);
201
202
    // Inner PrimitiveType-template methods. Moved to base to avoid N(Derived)×M(PrimitiveType)
203
    // instantiation blowup: now instantiated M times total instead of N×M times.
204
    template <PrimitiveType T>
205
    Status _normalize_in_predicate(VExprContext* expr_ctx, const VExprSPtr& root,
206
                                   SlotDescriptor* slot,
207
                                   std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
208
                                   ColumnValueRange<T>& range, PushDownType* pdt);
209
    template <PrimitiveType T>
210
    Status _normalize_binary_predicate(VExprContext* expr_ctx, const VExprSPtr& root,
211
                                       SlotDescriptor* slot,
212
                                       std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
213
                                       ColumnValueRange<T>& range, PushDownType* pdt);
214
    template <PrimitiveType T>
215
    Status _normalize_is_null_predicate(VExprContext* expr_ctx, const VExprSPtr& root,
216
                                        SlotDescriptor* slot,
217
                                        std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
218
                                        ColumnValueRange<T>& range, PushDownType* pdt);
219
    template <PrimitiveType PrimitiveType, typename ChangeFixedValueRangeFunc>
220
    Status _change_value_range(bool is_equal_op, ColumnValueRange<PrimitiveType>& range,
221
                               const Field& value, const ChangeFixedValueRangeFunc& func,
222
                               const std::string& fn_name);
223
};
224
225
template <typename LocalStateType>
226
class ScanOperatorX;
227
template <typename Derived>
228
class ScanLocalState : public ScanLocalStateBase {
229
    ENABLE_FACTORY_CREATOR(ScanLocalState);
230
    ScanLocalState(RuntimeState* state, OperatorXBase* parent)
231
126
            : ScanLocalStateBase(state, parent) {}
_ZN5doris14ScanLocalStateINS_18MockScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE
Line
Count
Source
231
108
            : ScanLocalStateBase(state, parent) {}
_ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE
Line
Count
Source
231
15
            : ScanLocalStateBase(state, parent) {}
_ZN5doris14ScanLocalStateINS_18FileScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE
Line
Count
Source
231
3
            : ScanLocalStateBase(state, parent) {}
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE
232
126
    ~ScanLocalState() override = default;
_ZN5doris14ScanLocalStateINS_18MockScanLocalStateEED2Ev
Line
Count
Source
232
108
    ~ScanLocalState() override = default;
_ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEED2Ev
Line
Count
Source
232
15
    ~ScanLocalState() override = default;
_ZN5doris14ScanLocalStateINS_18FileScanLocalStateEED2Ev
Line
Count
Source
232
3
    ~ScanLocalState() override = default;
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEED2Ev
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEED2Ev
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEED2Ev
233
234
    Status init(RuntimeState* state, LocalStateInfo& info) override;
235
236
    Status open(RuntimeState* state) override;
237
238
    Status close(RuntimeState* state) override;
239
    std::string debug_string(int indentation_level) const final;
240
241
    [[nodiscard]] bool should_run_serial() const override;
242
243
37
    RuntimeProfile* scanner_profile() override { return _scanner_profile.get(); }
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE15scanner_profileEv
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE15scanner_profileEv
_ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE15scanner_profileEv
Line
Count
Source
243
37
    RuntimeProfile* scanner_profile() override { return _scanner_profile.get(); }
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE15scanner_profileEv
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE15scanner_profileEv
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE15scanner_profileEv
244
245
    [[nodiscard]] const TupleDescriptor* input_tuple_desc() const override;
246
    [[nodiscard]] const TupleDescriptor* output_tuple_desc() const override;
247
248
    int64_t limit_per_scanner() override;
249
    std::atomic<int64_t>* shared_scan_limit_ptr() override;
250
251
    void set_scan_ranges(RuntimeState* state,
252
1
                         const std::vector<TScanRangeParams>& scan_ranges) override {}
_ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE
Line
Count
Source
252
1
                         const std::vector<TScanRangeParams>& scan_ranges) override {}
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE
253
254
    TPushAggOp::type get_push_down_agg_type() override;
255
256
0
    std::vector<Dependency*> execution_dependencies() override {
257
0
        if (_filter_dependencies.empty()) {
258
0
            return {};
259
0
        }
260
0
        std::vector<Dependency*> res(_filter_dependencies.size());
261
0
        std::transform(_filter_dependencies.begin(), _filter_dependencies.end(), res.begin(),
262
0
                       [](DependencySPtr dep) { return dep.get(); });
Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18MockScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_
Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_
Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18FileScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_
Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_
Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_
Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_
263
0
        return res;
264
0
    }
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE22execution_dependenciesEv
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE22execution_dependenciesEv
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE22execution_dependenciesEv
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE22execution_dependenciesEv
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE22execution_dependenciesEv
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE22execution_dependenciesEv
265
266
0
    std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; }
Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18MockScanLocalStateEE12dependenciesEv
Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18OlapScanLocalStateEE12dependenciesEv
Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18FileScanLocalStateEE12dependenciesEv
Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_21GroupCommitLocalStateEE12dependenciesEv
Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18JDBCScanLocalStateEE12dependenciesEv
Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18MetaScanLocalStateEE12dependenciesEv
267
268
0
    std::vector<int> get_topn_filter_source_node_ids(RuntimeState* state, bool push_down) {
269
0
        std::vector<int> result;
270
0
        for (int id : _parent->cast<typename Derived::Parent>()._topn_filter_source_node_ids) {
271
0
            const auto& pred = state->get_query_ctx()->get_runtime_predicate(id);
272
0
            if (!pred.enable()) {
273
0
                continue;
274
0
            }
275
0
            if (_push_down_topn(pred) == push_down) {
276
0
                result.push_back(id);
277
0
            }
278
0
        }
279
0
        return result;
280
0
    }
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb
281
282
protected:
283
    template <typename LocalStateType>
284
    friend class ScanOperatorX;
285
    friend class ScannerContext;
286
    friend class Scanner;
287
288
    Status _init_profile() override;
289
0
    virtual Status _process_conjuncts(RuntimeState* state) {
290
0
        RETURN_IF_ERROR(_do_partition_pruning_by_rf());
291
0
        return _normalize_conjuncts(state);
292
0
    }
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE
293
0
    virtual bool _should_push_down_common_expr(const VExprSPtr&) { return false; }
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE29_should_push_down_common_exprERKSt10shared_ptrINS_5VExprEE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE29_should_push_down_common_exprERKSt10shared_ptrINS_5VExprEE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE29_should_push_down_common_exprERKSt10shared_ptrINS_5VExprEE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE29_should_push_down_common_exprERKSt10shared_ptrINS_5VExprEE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE29_should_push_down_common_exprERKSt10shared_ptrINS_5VExprEE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE29_should_push_down_common_exprERKSt10shared_ptrINS_5VExprEE
294
295
0
    virtual bool _storage_no_merge() { return false; }
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE17_storage_no_mergeEv
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE17_storage_no_mergeEv
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE17_storage_no_mergeEv
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE17_storage_no_mergeEv
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE17_storage_no_mergeEv
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE17_storage_no_mergeEv
296
0
    virtual bool _is_key_column(const std::string& col_name) { return false; }
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
297
298
    // Create a list of scanners.
299
    // The number of scanners is related to the implementation of the data source,
300
    // predicate conditions, and scheduling strategy.
301
    // So this method needs to be implemented separately by the subclass of ScanNode.
302
    // Finally, a set of scanners that have been prepared are returned.
303
0
    virtual Status _init_scanners(std::list<ScannerSPtr>* scanners) { return Status::OK(); }
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE
304
305
    Status _normalize_conjuncts(RuntimeState* state);
306
    // Normalize a conjunct and try to convert it to column predicate recursively.
307
    Status _normalize_predicate(VExprContext* context, const VExprSPtr& root,
308
                                VExprSPtr& output_expr);
309
    bool _is_predicate_acting_on_slot(const VExprSPtrs& children, SlotDescriptor** slot_desc,
310
                                      ColumnValueRangeType** range);
311
    Status _prepare_scanners();
312
313
    // Submit the scanner to the thread pool and start execution
314
    Status _start_scanners(const std::list<std::shared_ptr<ScannerDelegate>>& scanners);
315
316
    // For some conjunct there is chance to elimate cast operator
317
    // Eg. Variant's sub column could eliminate cast in storage layer if
318
    // cast dst column type equals storage column type
319
    void get_cast_types_for_variants();
320
    void _filter_and_collect_cast_type_for_variant(
321
            const VExpr* expr,
322
            std::unordered_map<std::string, std::vector<DataTypePtr>>& colname_to_cast_types);
323
324
    Status _get_topn_filters(RuntimeState* state);
325
326
    // Stores conjuncts that have been fully pushed down to the storage layer as predicate columns.
327
    // These expr contexts are kept alive to prevent their FunctionContext and constant strings
328
    // from being freed prematurely.
329
    VExprContextSPtrs _stale_expr_ctxs;
330
    VExprContextSPtrs _common_expr_ctxs_push_down;
331
332
    atomic_shared_ptr<ScannerContext> _scanner_ctx;
333
334
    // colname -> cast dst type
335
    std::map<std::string, DataTypePtr> _cast_types_for_variants;
336
337
    // slot id -> ColumnValueRange
338
    // Parsed from conjuncts
339
    phmap::flat_hash_map<int, ColumnValueRangeType> _slot_id_to_value_range;
340
    phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> _slot_id_to_predicates;
341
    std::vector<std::shared_ptr<MutilColumnBlockPredicate>> _or_predicates;
342
343
    std::vector<std::shared_ptr<Dependency>> _filter_dependencies;
344
345
    // ScanLocalState owns the ownership of scanner, scanner context only has its weakptr
346
    std::list<std::shared_ptr<ScannerDelegate>> _scanners;
347
    Arena _arena;
348
    int _instance_idx = 0;
349
};
350
351
template <typename LocalStateType>
352
class ScanOperatorX : public OperatorX<LocalStateType> {
353
public:
354
    Status init(const TPlanNode& tnode, RuntimeState* state) override;
355
    Status prepare(RuntimeState* state) override;
356
    Status get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
357
1
    Status get_block_after_projects(RuntimeState* state, Block* block, bool* eos) override {
358
1
        Status status = OperatorX<LocalStateType>::get_block(state, block, eos);
359
1
        if (status.ok()) {
360
1
            state->get_local_state(operator_id())->update_output_block_counters(*block);
361
1
        }
362
1
        return status;
363
1
    }
_ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb
Line
Count
Source
357
1
    Status get_block_after_projects(RuntimeState* state, Block* block, bool* eos) override {
358
1
        Status status = OperatorX<LocalStateType>::get_block(state, block, eos);
359
1
        if (status.ok()) {
360
1
            state->get_local_state(operator_id())->update_output_block_counters(*block);
361
1
        }
362
1
        return status;
363
1
    }
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb
364
0
    [[nodiscard]] bool is_source() const override { return true; }
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MockScanLocalStateEE9is_sourceEv
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18OlapScanLocalStateEE9is_sourceEv
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18FileScanLocalStateEE9is_sourceEv
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_21GroupCommitLocalStateEE9is_sourceEv
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18JDBCScanLocalStateEE9is_sourceEv
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MetaScanLocalStateEE9is_sourceEv
365
366
    [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state) override;
367
368
126
    const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
369
126
        return _runtime_filter_descs;
370
126
    }
_ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE20runtime_filter_descsEv
Line
Count
Source
368
108
    const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
369
108
        return _runtime_filter_descs;
370
108
    }
_ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE20runtime_filter_descsEv
Line
Count
Source
368
15
    const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
369
15
        return _runtime_filter_descs;
370
15
    }
_ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE20runtime_filter_descsEv
Line
Count
Source
368
3
    const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
369
3
        return _runtime_filter_descs;
370
3
    }
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE20runtime_filter_descsEv
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE20runtime_filter_descsEv
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE20runtime_filter_descsEv
371
372
    // Expose this operator's per-fragment shared partition-boundary parse
373
    // result to the non-templated ScanLocalStateBase so it can drive runtime
374
    // filter partition pruning without down-casting to a specific scan type.
375
    // Subclasses are expected to populate `_parsed_partition_boundaries` from
376
    // their own partition-boundary thrift field inside their `prepare()`
377
    // override before any LocalState observes the result.
378
0
    const ParsedPartitionBoundaries* parsed_partition_boundaries() const override {
379
0
        return &_parsed_partition_boundaries;
380
0
    }
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MockScanLocalStateEE27parsed_partition_boundariesEv
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18OlapScanLocalStateEE27parsed_partition_boundariesEv
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18FileScanLocalStateEE27parsed_partition_boundariesEv
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_21GroupCommitLocalStateEE27parsed_partition_boundariesEv
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18JDBCScanLocalStateEE27parsed_partition_boundariesEv
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MetaScanLocalStateEE27parsed_partition_boundariesEv
381
382
0
    [[nodiscard]] virtual int get_column_id(const std::string& col_name) const { return -1; }
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MockScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18OlapScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18FileScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_21GroupCommitLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18JDBCScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MetaScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
383
384
99
    [[nodiscard]] virtual bool can_push_down_column_predicate(const SlotDescriptor*) const {
385
99
        return true;
386
99
    }
_ZNK5doris13ScanOperatorXINS_18MockScanLocalStateEE30can_push_down_column_predicateEPKNS_14SlotDescriptorE
Line
Count
Source
384
99
    [[nodiscard]] virtual bool can_push_down_column_predicate(const SlotDescriptor*) const {
385
99
        return true;
386
99
    }
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18OlapScanLocalStateEE30can_push_down_column_predicateEPKNS_14SlotDescriptorE
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18FileScanLocalStateEE30can_push_down_column_predicateEPKNS_14SlotDescriptorE
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_21GroupCommitLocalStateEE30can_push_down_column_predicateEPKNS_14SlotDescriptorE
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18JDBCScanLocalStateEE30can_push_down_column_predicateEPKNS_14SlotDescriptorE
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MetaScanLocalStateEE30can_push_down_column_predicateEPKNS_14SlotDescriptorE
387
388
0
    TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE22get_push_down_agg_typeEv
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE22get_push_down_agg_typeEv
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE22get_push_down_agg_typeEv
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE22get_push_down_agg_typeEv
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE22get_push_down_agg_typeEv
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE22get_push_down_agg_typeEv
389
390
0
    DataDistribution required_data_distribution(RuntimeState* /*state*/) const override {
391
0
        if (OperatorX<LocalStateType>::is_serial_operator()) {
392
            // `is_serial_operator()` returns true means we ignore the distribution.
393
0
            return {ExchangeType::NOOP};
394
0
        }
395
0
        return {ExchangeType::BUCKET_HASH_SHUFFLE};
396
0
    }
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MockScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18OlapScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18FileScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_21GroupCommitLocalStateEE26required_data_distributionEPNS_12RuntimeStateE
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18JDBCScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE
Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MetaScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE
397
398
0
    void set_low_memory_mode(RuntimeState* state) override {
399
0
        auto& local_state = get_local_state(state);
400
401
0
        if (auto ctx = local_state._scanner_ctx.load()) {
402
0
            ctx->clear_free_blocks();
403
0
        }
404
0
    }
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE
405
406
    using OperatorX<LocalStateType>::node_id;
407
    using OperatorX<LocalStateType>::operator_id;
408
    using OperatorX<LocalStateType>::get_local_state;
409
410
#ifdef BE_TEST
411
24
    ScanOperatorX() = default;
412
#endif
413
414
protected:
415
    using LocalState = LocalStateType;
416
    friend class OlapScanner;
417
    ScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
418
                  const DescriptorTbl& descs, int parallel_tasks = 0);
419
44
    virtual ~ScanOperatorX() = default;
_ZN5doris13ScanOperatorXINS_18MockScanLocalStateEED2Ev
Line
Count
Source
419
24
    virtual ~ScanOperatorX() = default;
_ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEED2Ev
Line
Count
Source
419
17
    virtual ~ScanOperatorX() = default;
_ZN5doris13ScanOperatorXINS_18FileScanLocalStateEED2Ev
Line
Count
Source
419
3
    virtual ~ScanOperatorX() = default;
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEED2Ev
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEED2Ev
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEED2Ev
420
    template <typename Derived>
421
    friend class ScanLocalState;
422
    friend class OlapScanLocalState;
423
424
    // For load scan node, there should be both input and output tuple descriptor.
425
    // For query scan node, there is only output_tuple_desc.
426
    TupleId _input_tuple_id = -1;
427
    TupleId _output_tuple_id = -1;
428
    const TupleDescriptor* _input_tuple_desc = nullptr;
429
    const TupleDescriptor* _output_tuple_desc = nullptr;
430
431
    phmap::flat_hash_map<int, SlotDescriptor*> _slot_id_to_slot_desc;
432
    std::unordered_map<std::string, int> _colname_to_slot_id;
433
434
    // These two values are from query_options
435
    int _max_scan_key_num = 48;
436
    int _max_pushdown_conditions_per_column = 1024;
437
438
    // If the query like select * from table limit 10; then the query should run in
439
    // single scanner to avoid too many scanners which will cause lots of useless read.
440
    bool _should_run_serial = false;
441
442
    VExprContextSPtrs _common_expr_ctxs_push_down;
443
444
    // If sort info is set, push limit to each scanner;
445
    int64_t _limit_per_scanner = -1;
446
447
    // Shared remaining limit across all parallel instances and their scanners.
448
    // Initialized to _limit (SQL LIMIT); -1 means no limit.
449
    std::atomic<int64_t> _shared_scan_limit {-1};
450
451
    std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
452
453
    TPushAggOp::type _push_down_agg_type;
454
455
    // Record the value of the aggregate function 'count' from doris's be
456
    int64_t _push_down_count = -1;
457
    const int _parallel_tasks = 0;
458
459
    std::vector<int> _topn_filter_source_node_ids;
460
461
    std::shared_ptr<MemShareArbitrator> _mem_arb = nullptr;
462
    std::shared_ptr<MemLimiter> _mem_limiter = nullptr;
463
464
    // Shared parse result of partition boundaries for runtime-filter partition
465
    // pruning. Lives here (rather than on the Olap-specific subclass) so any
466
    // future scan type can populate it in its `prepare()` override and reuse
467
    // the generic pruning machinery in ScanLocalStateBase.
468
    ParsedPartitionBoundaries _parsed_partition_boundaries;
469
};
470
471
} // namespace doris