be/src/exec/operator/scan_operator.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <cstdint> |
21 | | #include <set> |
22 | | #include <string> |
23 | | |
24 | | #include "common/status.h" |
25 | | #include "common/thread_safety_annotations.h" |
26 | | #include "core/field.h" |
27 | | #include "exec/common/util.hpp" |
28 | | #include "exec/operator/operator.h" |
29 | | #include "exec/pipeline/dependency.h" |
30 | | #include "exec/runtime_filter/runtime_filter_consumer_helper.h" |
31 | | #include "exec/runtime_filter/runtime_filter_partition_pruner.h" |
32 | | #include "exec/scan/scan_node.h" |
33 | | #include "exec/scan/scanner_context.h" |
34 | | #include "exprs/function_filter.h" |
35 | | #include "exprs/vectorized_fn_call.h" |
36 | | #include "exprs/vin_predicate.h" |
37 | | #include "runtime/descriptors.h" |
38 | | #include "storage/predicate/filter_olap_param.h" |
39 | | |
40 | | namespace doris { |
41 | | class ScannerDelegate; |
42 | | class OlapScanner; |
43 | | } // namespace doris |
44 | | |
45 | | namespace doris { |
46 | | |
47 | | enum class PushDownType { |
48 | | // The predicate can not be pushed down to data source |
49 | | UNACCEPTABLE, |
50 | | // The predicate can be pushed down to data source |
51 | | // and the data source can fully evaludate it |
52 | | ACCEPTABLE, |
53 | | // The predicate can be pushed down to data source |
54 | | // but the data source can not fully evaluate it. |
55 | | PARTIAL_ACCEPTABLE |
56 | | }; |
57 | | |
58 | | class ScanLocalStateBase : public PipelineXLocalState<> { |
59 | | public: |
60 | | ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent) |
61 | 125 | : PipelineXLocalState<>(state, parent), _helper(parent->runtime_filter_descs()) {} |
62 | 125 | ~ScanLocalStateBase() override = default; |
63 | | |
64 | | [[nodiscard]] virtual bool should_run_serial() const = 0; |
65 | | |
66 | | virtual RuntimeProfile* scanner_profile() = 0; |
67 | | |
68 | | [[nodiscard]] virtual const TupleDescriptor* input_tuple_desc() const = 0; |
69 | | [[nodiscard]] virtual const TupleDescriptor* output_tuple_desc() const = 0; |
70 | | |
71 | | virtual int64_t limit_per_scanner() = 0; |
72 | | virtual std::atomic<int64_t>* shared_scan_limit_ptr() = 0; |
73 | | |
74 | | virtual void set_scan_ranges(RuntimeState* state, |
75 | | const std::vector<TScanRangeParams>& scan_ranges) = 0; |
76 | | virtual TPushAggOp::type get_push_down_agg_type() = 0; |
77 | | |
78 | | // If scan operator is serial operator(like topn), its real parallelism is 1. |
79 | | // Otherwise, its real parallelism is query_parallel_instance_num. |
80 | | // query_parallel_instance_num of olap table is usually equal to session var parallel_pipeline_task_num. |
81 | | // for file scan operator, its real parallelism will be 1 if it is in batch mode. |
82 | | // Related pr: |
83 | | // https://github.com/apache/doris/pull/42460 |
84 | | // https://github.com/apache/doris/pull/44635 |
85 | | [[nodiscard]] virtual int max_scanners_concurrency(RuntimeState* state) const; |
86 | | [[nodiscard]] virtual int min_scanners_concurrency(RuntimeState* state) const; |
87 | | [[nodiscard]] virtual ScannerScheduler* scan_scheduler(RuntimeState* state) const; |
88 | | |
89 | | // Thread-safe check whether a partition has been pruned by runtime filter. |
90 | | // Callable from any scan type's scanner in scheduling threads. |
91 | | bool is_partition_pruned(int64_t partition_id) const; |
92 | | |
93 | 0 | [[nodiscard]] std::string get_name() { return _parent->get_name(); } |
94 | | |
95 | 15 | uint64_t get_condition_cache_digest() const { return _condition_cache_digest; } |
96 | | |
97 | | Status update_late_arrival_runtime_filter(RuntimeState* state, int& arrived_rf_num); |
98 | | |
99 | | Status clone_conjunct_ctxs(VExprContextSPtrs& scanner_conjuncts); |
100 | | |
101 | | protected: |
102 | | friend class ScannerContext; |
103 | | friend class Scanner; |
104 | | |
105 | | virtual Status _init_profile() = 0; |
106 | | |
107 | | // Hook for subclasses to react after new runtime filters are appended. |
108 | | // Called inside update_late_arrival_runtime_filter() while _conjuncts_lock is held. |
109 | | // Default implementation runs partition pruning on the newly appended RFs. |
110 | | virtual void _on_runtime_filter_update(); |
111 | | |
112 | | void _do_partition_pruning_by_rf(); |
113 | | |
114 | | std::atomic<bool> _opened {false}; |
115 | | |
116 | | DependencySPtr _scan_dependency = nullptr; |
117 | | |
118 | | std::shared_ptr<RuntimeProfile> _scanner_profile; |
119 | | RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr; |
120 | | // Num of newly created free blocks when running query |
121 | | RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; |
122 | | // Max num of scanner thread |
123 | | RuntimeProfile::Counter* _max_scan_concurrency = nullptr; |
124 | | RuntimeProfile::Counter* _min_scan_concurrency = nullptr; |
125 | | RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr; |
126 | | // time of get block from scanner |
127 | | RuntimeProfile::Counter* _scan_timer = nullptr; |
128 | | RuntimeProfile::Counter* _scan_cpu_timer = nullptr; |
129 | | // time of filter output block from scanner |
130 | | RuntimeProfile::Counter* _filter_timer = nullptr; |
131 | | // rows read from the scanner (including those discarded by (pre)filters) |
132 | | RuntimeProfile::Counter* _rows_read_counter = nullptr; |
133 | | |
134 | | RuntimeProfile::Counter* _num_scanners = nullptr; |
135 | | |
136 | | RuntimeProfile::Counter* _wait_for_rf_timer = nullptr; |
137 | | |
138 | | RuntimeProfile::Counter* _scan_rows = nullptr; |
139 | | RuntimeProfile::Counter* _scan_bytes = nullptr; |
140 | | |
141 | | AnnotatedMutex _conjuncts_lock; |
142 | | RuntimeFilterConsumerHelper _helper; |
143 | | // magic number as seed to generate hash value for condition cache |
144 | | uint64_t _condition_cache_digest = 0; |
145 | | // condition cache filter stats |
146 | | RuntimeProfile::Counter* _condition_cache_hit_counter = nullptr; |
147 | | RuntimeProfile::Counter* _condition_cache_filtered_rows_counter = nullptr; |
148 | | |
149 | | // ---- Runtime-filter partition pruning (scan-agnostic) ---- |
150 | | RuntimeFilterPartitionPruner _rf_partition_pruner; |
151 | | RuntimeProfile::Counter* _partitions_pruned_by_rf_counter = nullptr; |
152 | | RuntimeProfile::Counter* _total_partitions_rf_counter = nullptr; |
153 | | |
154 | | // Moved from ScanLocalState<Derived> to avoid re-instantiation for each Derived type. |
155 | | std::atomic<bool> _eos = false; |
156 | | int _max_pushdown_conditions_per_column = 1024; |
157 | | // Save all function predicates which may be pushed down to data source. |
158 | | std::vector<FunctionFilter> _push_down_functions; |
159 | | |
160 | | // Virtual methods with default implementations; overridden by subclasses when supported. |
161 | | // Declared here so that the normalize methods below (non-Derived-template) can call them. |
162 | 0 | virtual bool _push_down_topn(const RuntimePredicate& predicate) { return false; } |
163 | 0 | virtual PushDownType _should_push_down_bloom_filter() const { |
164 | 0 | return PushDownType::UNACCEPTABLE; |
165 | 0 | } |
166 | 0 | virtual PushDownType _should_push_down_topn_filter() const { |
167 | 0 | return PushDownType::UNACCEPTABLE; |
168 | 0 | } |
169 | 0 | virtual PushDownType _should_push_down_bitmap_filter() const { |
170 | 0 | return PushDownType::UNACCEPTABLE; |
171 | 0 | } |
172 | 0 | virtual PushDownType _should_push_down_is_null_predicate(VectorizedFnCall* fn_call) const { |
173 | 0 | return PushDownType::UNACCEPTABLE; |
174 | 0 | } |
175 | 0 | virtual PushDownType _should_push_down_in_predicate() const { |
176 | 0 | return PushDownType::UNACCEPTABLE; |
177 | 0 | } |
178 | | virtual PushDownType _should_push_down_binary_predicate( |
179 | | VectorizedFnCall* fn_call, VExprContext* expr_ctx, Field& constant_val, |
180 | 0 | const std::set<std::string> fn_name) const { |
181 | 0 | return PushDownType::UNACCEPTABLE; |
182 | 0 | } |
183 | | virtual Status _should_push_down_function_filter(VectorizedFnCall* fn_call, |
184 | | VExprContext* expr_ctx, |
185 | | StringRef* constant_str, |
186 | | doris::FunctionContext** fn_ctx, |
187 | 0 | PushDownType& pdt) { |
188 | 0 | pdt = PushDownType::UNACCEPTABLE; |
189 | 0 | return Status::OK(); |
190 | 0 | } |
191 | | |
192 | | // Non-templated normalize methods, moved here to avoid re-compilation per Derived type. |
193 | | Status _eval_const_conjuncts(VExprContext* expr_ctx, PushDownType* pdt); |
194 | | Status _normalize_bloom_filter(VExprContext* expr_ctx, const VExprSPtr& root, |
195 | | SlotDescriptor* slot, |
196 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
197 | | PushDownType* pdt); |
198 | | Status _normalize_topn_filter(VExprContext* expr_ctx, const VExprSPtr& root, |
199 | | SlotDescriptor* slot, |
200 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
201 | | PushDownType* pdt); |
202 | | Status _normalize_bitmap_filter(VExprContext* expr_ctx, const VExprSPtr& root, |
203 | | SlotDescriptor* slot, |
204 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
205 | | PushDownType* pdt); |
206 | | Status _normalize_function_filters(VExprContext* expr_ctx, SlotDescriptor* slot, |
207 | | PushDownType* pdt); |
208 | | |
209 | | // Inner PrimitiveType-template methods. Moved to base to avoid N(Derived)×M(PrimitiveType) |
210 | | // instantiation blowup: now instantiated M times total instead of N×M times. |
211 | | template <PrimitiveType T> |
212 | | Status _normalize_in_predicate(VExprContext* expr_ctx, const VExprSPtr& root, |
213 | | SlotDescriptor* slot, |
214 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
215 | | ColumnValueRange<T>& range, PushDownType* pdt); |
216 | | template <PrimitiveType T> |
217 | | Status _normalize_binary_predicate(VExprContext* expr_ctx, const VExprSPtr& root, |
218 | | SlotDescriptor* slot, |
219 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
220 | | ColumnValueRange<T>& range, PushDownType* pdt); |
221 | | template <PrimitiveType T> |
222 | | Status _normalize_is_null_predicate(VExprContext* expr_ctx, const VExprSPtr& root, |
223 | | SlotDescriptor* slot, |
224 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
225 | | ColumnValueRange<T>& range, PushDownType* pdt); |
226 | | template <PrimitiveType PrimitiveType, typename ChangeFixedValueRangeFunc> |
227 | | Status _change_value_range(bool is_equal_op, ColumnValueRange<PrimitiveType>& range, |
228 | | const Field& value, const ChangeFixedValueRangeFunc& func, |
229 | | const std::string& fn_name); |
230 | | }; |
231 | | |
232 | | template <typename LocalStateType> |
233 | | class ScanOperatorX; |
234 | | template <typename Derived> |
235 | | class ScanLocalState : public ScanLocalStateBase { |
236 | | ENABLE_FACTORY_CREATOR(ScanLocalState); |
237 | | ScanLocalState(RuntimeState* state, OperatorXBase* parent) |
238 | 125 | : ScanLocalStateBase(state, parent) {}_ZN5doris14ScanLocalStateINS_18MockScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Line | Count | Source | 238 | 107 | : ScanLocalStateBase(state, parent) {} |
_ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Line | Count | Source | 238 | 15 | : ScanLocalStateBase(state, parent) {} |
_ZN5doris14ScanLocalStateINS_18FileScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Line | Count | Source | 238 | 3 | : ScanLocalStateBase(state, parent) {} |
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE |
239 | 125 | ~ScanLocalState() override = default; _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEED2Ev Line | Count | Source | 239 | 107 | ~ScanLocalState() override = default; |
_ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEED2Ev Line | Count | Source | 239 | 15 | ~ScanLocalState() override = default; |
_ZN5doris14ScanLocalStateINS_18FileScanLocalStateEED2Ev Line | Count | Source | 239 | 3 | ~ScanLocalState() override = default; |
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEED2Ev Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEED2Ev Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEED2Ev |
240 | | |
241 | | virtual Status init(RuntimeState* state, LocalStateInfo& info) override; |
242 | | |
243 | | virtual Status open(RuntimeState* state) override; |
244 | | |
245 | | Status close(RuntimeState* state) override; |
246 | | std::string debug_string(int indentation_level) const final; |
247 | | |
248 | | [[nodiscard]] bool should_run_serial() const override; |
249 | | |
250 | 37 | RuntimeProfile* scanner_profile() override { return _scanner_profile.get(); }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE15scanner_profileEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE15scanner_profileEv _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE15scanner_profileEv Line | Count | Source | 250 | 37 | RuntimeProfile* scanner_profile() override { return _scanner_profile.get(); } |
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE15scanner_profileEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE15scanner_profileEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE15scanner_profileEv |
251 | | |
252 | | [[nodiscard]] const TupleDescriptor* input_tuple_desc() const override; |
253 | | [[nodiscard]] const TupleDescriptor* output_tuple_desc() const override; |
254 | | |
255 | | int64_t limit_per_scanner() override; |
256 | | std::atomic<int64_t>* shared_scan_limit_ptr() override; |
257 | | |
258 | | void set_scan_ranges(RuntimeState* state, |
259 | 1 | const std::vector<TScanRangeParams>& scan_ranges) override {}_ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Line | Count | Source | 259 | 1 | const std::vector<TScanRangeParams>& scan_ranges) override {} |
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE |
260 | | |
261 | | TPushAggOp::type get_push_down_agg_type() override; |
262 | | |
263 | 0 | std::vector<Dependency*> execution_dependencies() override { |
264 | 0 | if (_filter_dependencies.empty()) { |
265 | 0 | return {}; |
266 | 0 | } |
267 | 0 | std::vector<Dependency*> res(_filter_dependencies.size()); |
268 | 0 | std::transform(_filter_dependencies.begin(), _filter_dependencies.end(), res.begin(), |
269 | 0 | [](DependencySPtr dep) { return dep.get(); });Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18MockScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18FileScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ |
270 | 0 | return res; |
271 | 0 | } Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE22execution_dependenciesEv |
272 | | |
273 | 0 | std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; }Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18MockScanLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18OlapScanLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18FileScanLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_21GroupCommitLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18JDBCScanLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18MetaScanLocalStateEE12dependenciesEv |
274 | | |
275 | 0 | std::vector<int> get_topn_filter_source_node_ids(RuntimeState* state, bool push_down) { |
276 | 0 | std::vector<int> result; |
277 | 0 | for (int id : _parent->cast<typename Derived::Parent>()._topn_filter_source_node_ids) { |
278 | 0 | if (!state->get_query_ctx()->has_runtime_predicate(id)) { |
279 | | // compatible with older versions fe |
280 | 0 | continue; |
281 | 0 | } |
282 | | |
283 | 0 | const auto& pred = state->get_query_ctx()->get_runtime_predicate(id); |
284 | 0 | if (!pred.enable()) { |
285 | 0 | continue; |
286 | 0 | } |
287 | 0 | if (_push_down_topn(pred) == push_down) { |
288 | 0 | result.push_back(id); |
289 | 0 | } |
290 | 0 | } |
291 | 0 | return result; |
292 | 0 | } Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb |
293 | | |
294 | | protected: |
295 | | template <typename LocalStateType> |
296 | | friend class ScanOperatorX; |
297 | | friend class ScannerContext; |
298 | | friend class Scanner; |
299 | | |
300 | | Status _init_profile() override; |
301 | 0 | virtual Status _process_conjuncts(RuntimeState* state) { |
302 | 0 | _do_partition_pruning_by_rf(); |
303 | 0 | return _normalize_conjuncts(state); |
304 | 0 | } Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE |
305 | 0 | virtual bool _should_push_down_common_expr(const VExprSPtr&) { return false; }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE29_should_push_down_common_exprERKSt10shared_ptrINS_5VExprEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE29_should_push_down_common_exprERKSt10shared_ptrINS_5VExprEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE29_should_push_down_common_exprERKSt10shared_ptrINS_5VExprEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE29_should_push_down_common_exprERKSt10shared_ptrINS_5VExprEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE29_should_push_down_common_exprERKSt10shared_ptrINS_5VExprEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE29_should_push_down_common_exprERKSt10shared_ptrINS_5VExprEE |
306 | | |
307 | 0 | virtual bool _storage_no_merge() { return false; }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE17_storage_no_mergeEv |
308 | 0 | virtual bool _is_key_column(const std::string& col_name) { return false; }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE |
309 | | |
310 | | // Create a list of scanners. |
311 | | // The number of scanners is related to the implementation of the data source, |
312 | | // predicate conditions, and scheduling strategy. |
313 | | // So this method needs to be implemented separately by the subclass of ScanNode. |
314 | | // Finally, a set of scanners that have been prepared are returned. |
315 | 0 | virtual Status _init_scanners(std::list<ScannerSPtr>* scanners) { return Status::OK(); }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE |
316 | | |
317 | | Status _normalize_conjuncts(RuntimeState* state); |
318 | | // Normalize a conjunct and try to convert it to column predicate recursively. |
319 | | Status _normalize_predicate(VExprContext* context, const VExprSPtr& root, |
320 | | VExprSPtr& output_expr); |
321 | | bool _is_predicate_acting_on_slot(const VExprSPtrs& children, SlotDescriptor** slot_desc, |
322 | | ColumnValueRangeType** range); |
323 | | Status _prepare_scanners(); |
324 | | |
325 | | // Submit the scanner to the thread pool and start execution |
326 | | Status _start_scanners(const std::list<std::shared_ptr<ScannerDelegate>>& scanners); |
327 | | |
328 | | // For some conjunct there is chance to elimate cast operator |
329 | | // Eg. Variant's sub column could eliminate cast in storage layer if |
330 | | // cast dst column type equals storage column type |
331 | | void get_cast_types_for_variants(); |
332 | | void _filter_and_collect_cast_type_for_variant( |
333 | | const VExpr* expr, |
334 | | std::unordered_map<std::string, std::vector<DataTypePtr>>& colname_to_cast_types); |
335 | | |
336 | | Status _get_topn_filters(RuntimeState* state); |
337 | | |
338 | | // Stores conjuncts that have been fully pushed down to the storage layer as predicate columns. |
339 | | // These expr contexts are kept alive to prevent their FunctionContext and constant strings |
340 | | // from being freed prematurely. |
341 | | VExprContextSPtrs _stale_expr_ctxs; |
342 | | VExprContextSPtrs _common_expr_ctxs_push_down; |
343 | | |
344 | | atomic_shared_ptr<ScannerContext> _scanner_ctx; |
345 | | |
346 | | // colname -> cast dst type |
347 | | std::map<std::string, DataTypePtr> _cast_types_for_variants; |
348 | | |
349 | | // slot id -> ColumnValueRange |
350 | | // Parsed from conjuncts |
351 | | phmap::flat_hash_map<int, ColumnValueRangeType> _slot_id_to_value_range; |
352 | | phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> _slot_id_to_predicates; |
353 | | std::vector<std::shared_ptr<MutilColumnBlockPredicate>> _or_predicates; |
354 | | |
355 | | std::vector<std::shared_ptr<Dependency>> _filter_dependencies; |
356 | | |
357 | | // ScanLocalState owns the ownership of scanner, scanner context only has its weakptr |
358 | | std::list<std::shared_ptr<ScannerDelegate>> _scanners; |
359 | | Arena _arena; |
360 | | int _instance_idx = 0; |
361 | | }; |
362 | | |
363 | | template <typename LocalStateType> |
364 | | class ScanOperatorX : public OperatorX<LocalStateType> { |
365 | | public: |
366 | | Status init(const TPlanNode& tnode, RuntimeState* state) override; |
367 | | Status prepare(RuntimeState* state) override; |
368 | | Status get_block(RuntimeState* state, Block* block, bool* eos) override; |
369 | 1 | Status get_block_after_projects(RuntimeState* state, Block* block, bool* eos) override { |
370 | 1 | Status status = get_block(state, block, eos); |
371 | 1 | if (status.ok()) { |
372 | 1 | state->get_local_state(operator_id())->update_output_block_counters(*block); |
373 | 1 | } |
374 | 1 | return status; |
375 | 1 | } _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Line | Count | Source | 369 | 1 | Status get_block_after_projects(RuntimeState* state, Block* block, bool* eos) override { | 370 | 1 | Status status = get_block(state, block, eos); | 371 | 1 | if (status.ok()) { | 372 | 1 | state->get_local_state(operator_id())->update_output_block_counters(*block); | 373 | 1 | } | 374 | 1 | return status; | 375 | 1 | } |
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb |
376 | 0 | [[nodiscard]] bool is_source() const override { return true; }Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MockScanLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18OlapScanLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18FileScanLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_21GroupCommitLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18JDBCScanLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MetaScanLocalStateEE9is_sourceEv |
377 | | |
378 | | [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state) override; |
379 | | |
380 | 125 | const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override { |
381 | 125 | return _runtime_filter_descs; |
382 | 125 | } _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE20runtime_filter_descsEv Line | Count | Source | 380 | 107 | const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override { | 381 | 107 | return _runtime_filter_descs; | 382 | 107 | } |
_ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE20runtime_filter_descsEv Line | Count | Source | 380 | 15 | const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override { | 381 | 15 | return _runtime_filter_descs; | 382 | 15 | } |
_ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE20runtime_filter_descsEv Line | Count | Source | 380 | 3 | const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override { | 381 | 3 | return _runtime_filter_descs; | 382 | 3 | } |
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE20runtime_filter_descsEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE20runtime_filter_descsEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE20runtime_filter_descsEv |
383 | | |
384 | | // Expose this operator's per-fragment shared partition-boundary parse |
385 | | // result to the non-templated ScanLocalStateBase so it can drive runtime |
386 | | // filter partition pruning without down-casting to a specific scan type. |
387 | | // Subclasses are expected to populate `_parsed_partition_boundaries` from |
388 | | // their own partition-boundary thrift field inside their `prepare()` |
389 | | // override before any LocalState observes the result. |
390 | 0 | const ParsedPartitionBoundaries* parsed_partition_boundaries() const override { |
391 | 0 | return &_parsed_partition_boundaries; |
392 | 0 | } Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MockScanLocalStateEE27parsed_partition_boundariesEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18OlapScanLocalStateEE27parsed_partition_boundariesEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18FileScanLocalStateEE27parsed_partition_boundariesEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_21GroupCommitLocalStateEE27parsed_partition_boundariesEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18JDBCScanLocalStateEE27parsed_partition_boundariesEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MetaScanLocalStateEE27parsed_partition_boundariesEv |
393 | | |
394 | 0 | [[nodiscard]] virtual int get_column_id(const std::string& col_name) const { return -1; }Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MockScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18OlapScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18FileScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_21GroupCommitLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18JDBCScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MetaScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE |
395 | | |
396 | 0 | TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE22get_push_down_agg_typeEv |
397 | | |
398 | 0 | DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { |
399 | 0 | if (OperatorX<LocalStateType>::is_serial_operator()) { |
400 | | // `is_serial_operator()` returns true means we ignore the distribution. |
401 | 0 | return {ExchangeType::NOOP}; |
402 | 0 | } |
403 | 0 | return {ExchangeType::BUCKET_HASH_SHUFFLE}; |
404 | 0 | } Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MockScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18OlapScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18FileScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_21GroupCommitLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18JDBCScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MetaScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE |
405 | | |
406 | 0 | void set_low_memory_mode(RuntimeState* state) override { |
407 | 0 | auto& local_state = get_local_state(state); |
408 | |
|
409 | 0 | if (auto ctx = local_state._scanner_ctx.load()) { |
410 | 0 | ctx->clear_free_blocks(); |
411 | 0 | } |
412 | 0 | } Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE |
413 | | |
414 | | using OperatorX<LocalStateType>::node_id; |
415 | | using OperatorX<LocalStateType>::operator_id; |
416 | | using OperatorX<LocalStateType>::get_local_state; |
417 | | |
418 | | #ifdef BE_TEST |
419 | 23 | ScanOperatorX() = default; |
420 | | #endif |
421 | | |
422 | | protected: |
423 | | using LocalState = LocalStateType; |
424 | | friend class OlapScanner; |
425 | | ScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, |
426 | | const DescriptorTbl& descs, int parallel_tasks = 0); |
427 | 43 | virtual ~ScanOperatorX() = default; _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEED2Ev Line | Count | Source | 427 | 23 | virtual ~ScanOperatorX() = default; |
_ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEED2Ev Line | Count | Source | 427 | 17 | virtual ~ScanOperatorX() = default; |
_ZN5doris13ScanOperatorXINS_18FileScanLocalStateEED2Ev Line | Count | Source | 427 | 3 | virtual ~ScanOperatorX() = default; |
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEED2Ev Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEED2Ev Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEED2Ev |
428 | | template <typename Derived> |
429 | | friend class ScanLocalState; |
430 | | friend class OlapScanLocalState; |
431 | | |
432 | | // For load scan node, there should be both input and output tuple descriptor. |
433 | | // For query scan node, there is only output_tuple_desc. |
434 | | TupleId _input_tuple_id = -1; |
435 | | TupleId _output_tuple_id = -1; |
436 | | const TupleDescriptor* _input_tuple_desc = nullptr; |
437 | | const TupleDescriptor* _output_tuple_desc = nullptr; |
438 | | |
439 | | phmap::flat_hash_map<int, SlotDescriptor*> _slot_id_to_slot_desc; |
440 | | std::unordered_map<std::string, int> _colname_to_slot_id; |
441 | | |
442 | | // These two values are from query_options |
443 | | int _max_scan_key_num = 48; |
444 | | int _max_pushdown_conditions_per_column = 1024; |
445 | | |
446 | | // If the query like select * from table limit 10; then the query should run in |
447 | | // single scanner to avoid too many scanners which will cause lots of useless read. |
448 | | bool _should_run_serial = false; |
449 | | |
450 | | VExprContextSPtrs _common_expr_ctxs_push_down; |
451 | | |
452 | | // If sort info is set, push limit to each scanner; |
453 | | int64_t _limit_per_scanner = -1; |
454 | | |
455 | | // Shared remaining limit across all parallel instances and their scanners. |
456 | | // Initialized to _limit (SQL LIMIT); -1 means no limit. |
457 | | std::atomic<int64_t> _shared_scan_limit {-1}; |
458 | | |
459 | | std::vector<TRuntimeFilterDesc> _runtime_filter_descs; |
460 | | |
461 | | TPushAggOp::type _push_down_agg_type; |
462 | | |
463 | | // Record the value of the aggregate function 'count' from doris's be |
464 | | int64_t _push_down_count = -1; |
465 | | const int _parallel_tasks = 0; |
466 | | |
467 | | std::vector<int> _topn_filter_source_node_ids; |
468 | | |
469 | | std::shared_ptr<MemShareArbitrator> _mem_arb = nullptr; |
470 | | std::shared_ptr<MemLimiter> _mem_limiter = nullptr; |
471 | | |
472 | | // Shared parse result of partition boundaries for runtime-filter partition |
473 | | // pruning. Lives here (rather than on the Olap-specific subclass) so any |
474 | | // future scan type can populate it in its `prepare()` override and reuse |
475 | | // the generic pruning machinery in ScanLocalStateBase. |
476 | | ParsedPartitionBoundaries _parsed_partition_boundaries; |
477 | | }; |
478 | | |
479 | | } // namespace doris |