be/src/exec/operator/scan_operator.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <cstdint> |
21 | | #include <mutex> |
22 | | #include <set> |
23 | | #include <string> |
24 | | |
25 | | #include "common/status.h" |
26 | | #include "core/field.h" |
27 | | #include "exec/common/util.hpp" |
28 | | #include "exec/operator/operator.h" |
29 | | #include "exec/pipeline/dependency.h" |
30 | | #include "exec/runtime_filter/runtime_filter_consumer_helper.h" |
31 | | #include "exec/scan/scan_node.h" |
32 | | #include "exec/scan/scanner_context.h" |
33 | | #include "exprs/function_filter.h" |
34 | | #include "exprs/vectorized_fn_call.h" |
35 | | #include "exprs/vin_predicate.h" |
36 | | #include "runtime/descriptors.h" |
37 | | #include "storage/predicate/filter_olap_param.h" |
38 | | |
39 | | namespace doris { |
40 | | class ScannerDelegate; |
41 | | class OlapScanner; |
42 | | } // namespace doris |
43 | | |
44 | | namespace doris { |
45 | | |
46 | | enum class PushDownType { |
47 | | // The predicate can not be pushed down to data source |
48 | | UNACCEPTABLE, |
49 | | // The predicate can be pushed down to data source |
50 | | // and the data source can fully evaludate it |
51 | | ACCEPTABLE, |
52 | | // The predicate can be pushed down to data source |
53 | | // but the data source can not fully evaluate it. |
54 | | PARTIAL_ACCEPTABLE |
55 | | }; |
56 | | |
57 | | class ScanLocalStateBase : public PipelineXLocalState<> { |
58 | | public: |
59 | | ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent) |
60 | 123 | : PipelineXLocalState<>(state, parent), _helper(parent->runtime_filter_descs()) {} |
61 | 123 | ~ScanLocalStateBase() override = default; |
62 | | |
63 | | [[nodiscard]] virtual bool should_run_serial() const = 0; |
64 | | |
65 | | virtual RuntimeProfile* scanner_profile() = 0; |
66 | | |
67 | | [[nodiscard]] virtual const TupleDescriptor* input_tuple_desc() const = 0; |
68 | | [[nodiscard]] virtual const TupleDescriptor* output_tuple_desc() const = 0; |
69 | | |
70 | | virtual int64_t limit_per_scanner() = 0; |
71 | | |
72 | | virtual void set_scan_ranges(RuntimeState* state, |
73 | | const std::vector<TScanRangeParams>& scan_ranges) = 0; |
74 | | virtual TPushAggOp::type get_push_down_agg_type() = 0; |
75 | | |
76 | | // If scan operator is serial operator(like topn), its real parallelism is 1. |
77 | | // Otherwise, its real parallelism is query_parallel_instance_num. |
78 | | // query_parallel_instance_num of olap table is usually equal to session var parallel_pipeline_task_num. |
79 | | // for file scan operator, its real parallelism will be 1 if it is in batch mode. |
80 | | // Related pr: |
81 | | // https://github.com/apache/doris/pull/42460 |
82 | | // https://github.com/apache/doris/pull/44635 |
83 | | [[nodiscard]] virtual int max_scanners_concurrency(RuntimeState* state) const; |
84 | | [[nodiscard]] virtual int min_scanners_concurrency(RuntimeState* state) const; |
85 | | [[nodiscard]] virtual ScannerScheduler* scan_scheduler(RuntimeState* state) const; |
86 | | |
87 | 0 | [[nodiscard]] std::string get_name() { return _parent->get_name(); } |
88 | | |
89 | 15 | uint64_t get_condition_cache_digest() const { return _condition_cache_digest; } |
90 | | |
91 | | Status update_late_arrival_runtime_filter(RuntimeState* state, int& arrived_rf_num); |
92 | | |
93 | | Status clone_conjunct_ctxs(VExprContextSPtrs& scanner_conjuncts); |
94 | | |
95 | | protected: |
96 | | friend class ScannerContext; |
97 | | friend class Scanner; |
98 | | |
99 | | virtual Status _init_profile() = 0; |
100 | | |
101 | | std::atomic<bool> _opened {false}; |
102 | | |
103 | | DependencySPtr _scan_dependency = nullptr; |
104 | | |
105 | | std::shared_ptr<RuntimeProfile> _scanner_profile; |
106 | | RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr; |
107 | | // Num of newly created free blocks when running query |
108 | | RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; |
109 | | // Max num of scanner thread |
110 | | RuntimeProfile::Counter* _max_scan_concurrency = nullptr; |
111 | | RuntimeProfile::Counter* _min_scan_concurrency = nullptr; |
112 | | RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr; |
113 | | // time of get block from scanner |
114 | | RuntimeProfile::Counter* _scan_timer = nullptr; |
115 | | RuntimeProfile::Counter* _scan_cpu_timer = nullptr; |
116 | | // time of filter output block from scanner |
117 | | RuntimeProfile::Counter* _filter_timer = nullptr; |
118 | | // rows read from the scanner (including those discarded by (pre)filters) |
119 | | RuntimeProfile::Counter* _rows_read_counter = nullptr; |
120 | | |
121 | | RuntimeProfile::Counter* _num_scanners = nullptr; |
122 | | |
123 | | RuntimeProfile::Counter* _wait_for_rf_timer = nullptr; |
124 | | |
125 | | RuntimeProfile::Counter* _scan_rows = nullptr; |
126 | | RuntimeProfile::Counter* _scan_bytes = nullptr; |
127 | | |
128 | | std::mutex _conjuncts_lock; |
129 | | RuntimeFilterConsumerHelper _helper; |
130 | | // magic number as seed to generate hash value for condition cache |
131 | | uint64_t _condition_cache_digest = 0; |
132 | | // condition cache filter stats |
133 | | RuntimeProfile::Counter* _condition_cache_hit_counter = nullptr; |
134 | | RuntimeProfile::Counter* _condition_cache_filtered_rows_counter = nullptr; |
135 | | |
136 | | // Moved from ScanLocalState<Derived> to avoid re-instantiation for each Derived type. |
137 | | std::atomic<bool> _eos = false; |
138 | | int _max_pushdown_conditions_per_column = 1024; |
139 | | // Save all function predicates which may be pushed down to data source. |
140 | | std::vector<FunctionFilter> _push_down_functions; |
141 | | |
142 | | // Virtual methods with default implementations; overridden by subclasses when supported. |
143 | | // Declared here so that the normalize methods below (non-Derived-template) can call them. |
144 | 0 | virtual bool _push_down_topn(const RuntimePredicate& predicate) { return false; } |
145 | 0 | virtual PushDownType _should_push_down_bloom_filter() const { |
146 | 0 | return PushDownType::UNACCEPTABLE; |
147 | 0 | } |
148 | 0 | virtual PushDownType _should_push_down_topn_filter() const { |
149 | 0 | return PushDownType::UNACCEPTABLE; |
150 | 0 | } |
151 | 0 | virtual PushDownType _should_push_down_bitmap_filter() const { |
152 | 0 | return PushDownType::UNACCEPTABLE; |
153 | 0 | } |
154 | 0 | virtual PushDownType _should_push_down_is_null_predicate(VectorizedFnCall* fn_call) const { |
155 | 0 | return PushDownType::UNACCEPTABLE; |
156 | 0 | } |
157 | 0 | virtual PushDownType _should_push_down_in_predicate() const { |
158 | 0 | return PushDownType::UNACCEPTABLE; |
159 | 0 | } |
160 | | virtual PushDownType _should_push_down_binary_predicate( |
161 | | VectorizedFnCall* fn_call, VExprContext* expr_ctx, Field& constant_val, |
162 | 0 | const std::set<std::string> fn_name) const { |
163 | 0 | return PushDownType::UNACCEPTABLE; |
164 | 0 | } |
165 | | virtual Status _should_push_down_function_filter(VectorizedFnCall* fn_call, |
166 | | VExprContext* expr_ctx, |
167 | | StringRef* constant_str, |
168 | | doris::FunctionContext** fn_ctx, |
169 | 0 | PushDownType& pdt) { |
170 | 0 | pdt = PushDownType::UNACCEPTABLE; |
171 | 0 | return Status::OK(); |
172 | 0 | } |
173 | | |
174 | | // Non-templated normalize methods, moved here to avoid re-compilation per Derived type. |
175 | | Status _eval_const_conjuncts(VExprContext* expr_ctx, PushDownType* pdt); |
176 | | Status _normalize_bloom_filter(VExprContext* expr_ctx, const VExprSPtr& root, |
177 | | SlotDescriptor* slot, |
178 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
179 | | PushDownType* pdt); |
180 | | Status _normalize_topn_filter(VExprContext* expr_ctx, const VExprSPtr& root, |
181 | | SlotDescriptor* slot, |
182 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
183 | | PushDownType* pdt); |
184 | | Status _normalize_bitmap_filter(VExprContext* expr_ctx, const VExprSPtr& root, |
185 | | SlotDescriptor* slot, |
186 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
187 | | PushDownType* pdt); |
188 | | Status _normalize_function_filters(VExprContext* expr_ctx, SlotDescriptor* slot, |
189 | | PushDownType* pdt); |
190 | | |
191 | | // Inner PrimitiveType-template methods. Moved to base to avoid N(Derived)×M(PrimitiveType) |
192 | | // instantiation blowup: now instantiated M times total instead of N×M times. |
193 | | template <PrimitiveType T> |
194 | | Status _normalize_in_predicate(VExprContext* expr_ctx, const VExprSPtr& root, |
195 | | SlotDescriptor* slot, |
196 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
197 | | ColumnValueRange<T>& range, PushDownType* pdt); |
198 | | template <PrimitiveType T> |
199 | | Status _normalize_binary_predicate(VExprContext* expr_ctx, const VExprSPtr& root, |
200 | | SlotDescriptor* slot, |
201 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
202 | | ColumnValueRange<T>& range, PushDownType* pdt); |
203 | | template <PrimitiveType T> |
204 | | Status _normalize_is_null_predicate(VExprContext* expr_ctx, const VExprSPtr& root, |
205 | | SlotDescriptor* slot, |
206 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
207 | | ColumnValueRange<T>& range, PushDownType* pdt); |
208 | | template <PrimitiveType PrimitiveType, typename ChangeFixedValueRangeFunc> |
209 | | Status _change_value_range(bool is_equal_op, ColumnValueRange<PrimitiveType>& range, |
210 | | const Field& value, const ChangeFixedValueRangeFunc& func, |
211 | | const std::string& fn_name); |
212 | | }; |
213 | | |
214 | | template <typename LocalStateType> |
215 | | class ScanOperatorX; |
216 | | template <typename Derived> |
217 | | class ScanLocalState : public ScanLocalStateBase { |
218 | | ENABLE_FACTORY_CREATOR(ScanLocalState); |
219 | | ScanLocalState(RuntimeState* state, OperatorXBase* parent) |
220 | 123 | : ScanLocalStateBase(state, parent) {}_ZN5doris14ScanLocalStateINS_18MockScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Line | Count | Source | 220 | 105 | : ScanLocalStateBase(state, parent) {} |
_ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Line | Count | Source | 220 | 15 | : ScanLocalStateBase(state, parent) {} |
_ZN5doris14ScanLocalStateINS_18FileScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Line | Count | Source | 220 | 3 | : ScanLocalStateBase(state, parent) {} |
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE |
221 | 123 | ~ScanLocalState() override = default; _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEED2Ev Line | Count | Source | 221 | 105 | ~ScanLocalState() override = default; |
_ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEED2Ev Line | Count | Source | 221 | 15 | ~ScanLocalState() override = default; |
_ZN5doris14ScanLocalStateINS_18FileScanLocalStateEED2Ev Line | Count | Source | 221 | 3 | ~ScanLocalState() override = default; |
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEED2Ev Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEED2Ev Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEED2Ev Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEED2Ev |
222 | | |
223 | | virtual Status init(RuntimeState* state, LocalStateInfo& info) override; |
224 | | |
225 | | virtual Status open(RuntimeState* state) override; |
226 | | |
227 | | Status close(RuntimeState* state) override; |
228 | | std::string debug_string(int indentation_level) const final; |
229 | | |
230 | | [[nodiscard]] bool should_run_serial() const override; |
231 | | |
232 | 29 | RuntimeProfile* scanner_profile() override { return _scanner_profile.get(); }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE15scanner_profileEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE15scanner_profileEv _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE15scanner_profileEv Line | Count | Source | 232 | 29 | RuntimeProfile* scanner_profile() override { return _scanner_profile.get(); } |
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE15scanner_profileEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE15scanner_profileEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE15scanner_profileEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE15scanner_profileEv |
233 | | |
234 | | [[nodiscard]] const TupleDescriptor* input_tuple_desc() const override; |
235 | | [[nodiscard]] const TupleDescriptor* output_tuple_desc() const override; |
236 | | |
237 | | int64_t limit_per_scanner() override; |
238 | | |
239 | | void set_scan_ranges(RuntimeState* state, |
240 | 0 | const std::vector<TScanRangeParams>& scan_ranges) override {}Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE |
241 | | |
242 | | TPushAggOp::type get_push_down_agg_type() override; |
243 | | |
244 | 0 | std::vector<Dependency*> execution_dependencies() override { |
245 | 0 | if (_filter_dependencies.empty()) { |
246 | 0 | return {}; |
247 | 0 | } |
248 | 0 | std::vector<Dependency*> res(_filter_dependencies.size()); |
249 | 0 | std::transform(_filter_dependencies.begin(), _filter_dependencies.end(), res.begin(), |
250 | 0 | [](DependencySPtr dep) { return dep.get(); });Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18MockScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18FileScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_16EsScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ |
251 | 0 | return res; |
252 | 0 | } Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE22execution_dependenciesEv |
253 | | |
254 | 0 | std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; }Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18MockScanLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18OlapScanLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18FileScanLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_21GroupCommitLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18JDBCScanLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_16EsScanLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18MetaScanLocalStateEE12dependenciesEv |
255 | | |
256 | 0 | std::vector<int> get_topn_filter_source_node_ids(RuntimeState* state, bool push_down) { |
257 | 0 | std::vector<int> result; |
258 | 0 | for (int id : _parent->cast<typename Derived::Parent>()._topn_filter_source_node_ids) { |
259 | 0 | if (!state->get_query_ctx()->has_runtime_predicate(id)) { |
260 | | // compatible with older versions fe |
261 | 0 | continue; |
262 | 0 | } |
263 | | |
264 | 0 | const auto& pred = state->get_query_ctx()->get_runtime_predicate(id); |
265 | 0 | if (!pred.enable()) { |
266 | 0 | continue; |
267 | 0 | } |
268 | 0 | if (_push_down_topn(pred) == push_down) { |
269 | 0 | result.push_back(id); |
270 | 0 | } |
271 | 0 | } |
272 | 0 | return result; |
273 | 0 | } Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb |
274 | | |
275 | | protected: |
276 | | template <typename LocalStateType> |
277 | | friend class ScanOperatorX; |
278 | | friend class ScannerContext; |
279 | | friend class Scanner; |
280 | | |
281 | | Status _init_profile() override; |
282 | 0 | virtual Status _process_conjuncts(RuntimeState* state) { return _normalize_conjuncts(state); }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE |
283 | 0 | virtual bool _should_push_down_common_expr() { return false; }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE29_should_push_down_common_exprEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE29_should_push_down_common_exprEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE29_should_push_down_common_exprEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE29_should_push_down_common_exprEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE29_should_push_down_common_exprEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE29_should_push_down_common_exprEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE29_should_push_down_common_exprEv |
284 | | |
285 | 0 | virtual bool _storage_no_merge() { return false; }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE17_storage_no_mergeEv |
286 | 0 | virtual bool _is_key_column(const std::string& col_name) { return false; }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE |
287 | | |
288 | | // Create a list of scanners. |
289 | | // The number of scanners is related to the implementation of the data source, |
290 | | // predicate conditions, and scheduling strategy. |
291 | | // So this method needs to be implemented separately by the subclass of ScanNode. |
292 | | // Finally, a set of scanners that have been prepared are returned. |
293 | 0 | virtual Status _init_scanners(std::list<ScannerSPtr>* scanners) { return Status::OK(); }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE |
294 | | |
295 | | Status _normalize_conjuncts(RuntimeState* state); |
296 | | // Normalize a conjunct and try to convert it to column predicate recursively. |
297 | | Status _normalize_predicate(VExprContext* context, const VExprSPtr& root, |
298 | | VExprSPtr& output_expr); |
299 | | bool _is_predicate_acting_on_slot(const VExprSPtrs& children, SlotDescriptor** slot_desc, |
300 | | ColumnValueRangeType** range); |
301 | | Status _prepare_scanners(); |
302 | | |
303 | | // Submit the scanner to the thread pool and start execution |
304 | | Status _start_scanners(const std::list<std::shared_ptr<ScannerDelegate>>& scanners); |
305 | | |
306 | | // For some conjunct there is chance to elimate cast operator |
307 | | // Eg. Variant's sub column could eliminate cast in storage layer if |
308 | | // cast dst column type equals storage column type |
309 | | void get_cast_types_for_variants(); |
310 | | void _filter_and_collect_cast_type_for_variant( |
311 | | const VExpr* expr, |
312 | | std::unordered_map<std::string, std::vector<DataTypePtr>>& colname_to_cast_types); |
313 | | |
314 | | Status _get_topn_filters(RuntimeState* state); |
315 | | |
316 | | // Stores conjuncts that have been fully pushed down to the storage layer as predicate columns. |
317 | | // These expr contexts are kept alive to prevent their FunctionContext and constant strings |
318 | | // from being freed prematurely. |
319 | | VExprContextSPtrs _stale_expr_ctxs; |
320 | | VExprContextSPtrs _common_expr_ctxs_push_down; |
321 | | |
322 | | atomic_shared_ptr<ScannerContext> _scanner_ctx; |
323 | | |
324 | | // colname -> cast dst type |
325 | | std::map<std::string, DataTypePtr> _cast_types_for_variants; |
326 | | |
327 | | // slot id -> ColumnValueRange |
328 | | // Parsed from conjuncts |
329 | | phmap::flat_hash_map<int, ColumnValueRangeType> _slot_id_to_value_range; |
330 | | phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> _slot_id_to_predicates; |
331 | | std::vector<std::shared_ptr<MutilColumnBlockPredicate>> _or_predicates; |
332 | | |
333 | | std::vector<std::shared_ptr<Dependency>> _filter_dependencies; |
334 | | |
335 | | // ScanLocalState owns the ownership of scanner, scanner context only has its weakptr |
336 | | std::list<std::shared_ptr<ScannerDelegate>> _scanners; |
337 | | Arena _arena; |
338 | | int _instance_idx = 0; |
339 | | }; |
340 | | |
341 | | template <typename LocalStateType> |
342 | | class ScanOperatorX : public OperatorX<LocalStateType> { |
343 | | public: |
344 | | Status init(const TPlanNode& tnode, RuntimeState* state) override; |
345 | | Status prepare(RuntimeState* state) override; |
346 | | Status get_block(RuntimeState* state, Block* block, bool* eos) override; |
347 | 0 | Status get_block_after_projects(RuntimeState* state, Block* block, bool* eos) override { |
348 | 0 | Status status = get_block(state, block, eos); |
349 | 0 | if (status.ok()) { |
350 | 0 | if (auto rows = block->rows()) { |
351 | 0 | auto* local_state = state->get_local_state(operator_id()); |
352 | 0 | COUNTER_UPDATE(local_state->_rows_returned_counter, rows); |
353 | 0 | COUNTER_UPDATE(local_state->_blocks_returned_counter, 1); |
354 | 0 | } |
355 | 0 | } |
356 | 0 | return status; |
357 | 0 | } Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_16EsScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb |
358 | 0 | [[nodiscard]] bool is_source() const override { return true; }Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MockScanLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18OlapScanLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18FileScanLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_21GroupCommitLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18JDBCScanLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_16EsScanLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MetaScanLocalStateEE9is_sourceEv |
359 | | |
360 | | [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state) override; |
361 | | |
362 | 123 | const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override { |
363 | 123 | return _runtime_filter_descs; |
364 | 123 | } _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE20runtime_filter_descsEv Line | Count | Source | 362 | 105 | const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override { | 363 | 105 | return _runtime_filter_descs; | 364 | 105 | } |
_ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE20runtime_filter_descsEv Line | Count | Source | 362 | 15 | const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override { | 363 | 15 | return _runtime_filter_descs; | 364 | 15 | } |
_ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE20runtime_filter_descsEv Line | Count | Source | 362 | 3 | const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override { | 363 | 3 | return _runtime_filter_descs; | 364 | 3 | } |
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE20runtime_filter_descsEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE20runtime_filter_descsEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_16EsScanLocalStateEE20runtime_filter_descsEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE20runtime_filter_descsEv |
365 | | |
366 | 0 | [[nodiscard]] virtual int get_column_id(const std::string& col_name) const { return -1; }Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MockScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18OlapScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18FileScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_21GroupCommitLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18JDBCScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_16EsScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MetaScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE |
367 | | |
368 | 0 | TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_16EsScanLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE22get_push_down_agg_typeEv |
369 | | |
370 | 0 | DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { |
371 | 0 | if (OperatorX<LocalStateType>::is_serial_operator()) { |
372 | | // `is_serial_operator()` returns true means we ignore the distribution. |
373 | 0 | return {ExchangeType::NOOP}; |
374 | 0 | } |
375 | 0 | return {ExchangeType::BUCKET_HASH_SHUFFLE}; |
376 | 0 | } Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MockScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18OlapScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18FileScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_21GroupCommitLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18JDBCScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_16EsScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MetaScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE |
377 | | |
378 | 0 | void set_low_memory_mode(RuntimeState* state) override { |
379 | 0 | auto& local_state = get_local_state(state); |
380 | |
|
381 | 0 | if (auto ctx = local_state._scanner_ctx.load()) { |
382 | 0 | ctx->clear_free_blocks(); |
383 | 0 | } |
384 | 0 | } Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_16EsScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE |
385 | | |
386 | | using OperatorX<LocalStateType>::node_id; |
387 | | using OperatorX<LocalStateType>::operator_id; |
388 | | using OperatorX<LocalStateType>::get_local_state; |
389 | | |
390 | | #ifdef BE_TEST |
391 | 21 | ScanOperatorX() = default; |
392 | | #endif |
393 | | |
394 | | protected: |
395 | | using LocalState = LocalStateType; |
396 | | friend class OlapScanner; |
397 | | ScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, |
398 | | const DescriptorTbl& descs, int parallel_tasks = 0); |
399 | 41 | virtual ~ScanOperatorX() = default; _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEED2Ev Line | Count | Source | 399 | 21 | virtual ~ScanOperatorX() = default; |
_ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEED2Ev Line | Count | Source | 399 | 17 | virtual ~ScanOperatorX() = default; |
_ZN5doris13ScanOperatorXINS_18FileScanLocalStateEED2Ev Line | Count | Source | 399 | 3 | virtual ~ScanOperatorX() = default; |
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEED2Ev Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEED2Ev Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_16EsScanLocalStateEED2Ev Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEED2Ev |
400 | | template <typename Derived> |
401 | | friend class ScanLocalState; |
402 | | friend class OlapScanLocalState; |
403 | | |
404 | | // For load scan node, there should be both input and output tuple descriptor. |
405 | | // For query scan node, there is only output_tuple_desc. |
406 | | TupleId _input_tuple_id = -1; |
407 | | TupleId _output_tuple_id = -1; |
408 | | const TupleDescriptor* _input_tuple_desc = nullptr; |
409 | | const TupleDescriptor* _output_tuple_desc = nullptr; |
410 | | |
411 | | phmap::flat_hash_map<int, SlotDescriptor*> _slot_id_to_slot_desc; |
412 | | std::unordered_map<std::string, int> _colname_to_slot_id; |
413 | | |
414 | | // These two values are from query_options |
415 | | int _max_scan_key_num = 48; |
416 | | int _max_pushdown_conditions_per_column = 1024; |
417 | | |
418 | | // If the query like select * from table limit 10; then the query should run in |
419 | | // single scanner to avoid too many scanners which will cause lots of useless read. |
420 | | bool _should_run_serial = false; |
421 | | |
422 | | VExprContextSPtrs _common_expr_ctxs_push_down; |
423 | | |
424 | | // If sort info is set, push limit to each scanner; |
425 | | int64_t _limit_per_scanner = -1; |
426 | | |
427 | | // Shared remaining limit across all parallel instances and their scanners. |
428 | | // Initialized to _limit (SQL LIMIT); -1 means no limit. |
429 | | std::atomic<int64_t> _shared_scan_limit {-1}; |
430 | | |
431 | | std::vector<TRuntimeFilterDesc> _runtime_filter_descs; |
432 | | |
433 | | TPushAggOp::type _push_down_agg_type; |
434 | | |
435 | | // Record the value of the aggregate function 'count' from doris's be |
436 | | int64_t _push_down_count = -1; |
437 | | const int _parallel_tasks = 0; |
438 | | |
439 | | std::vector<int> _topn_filter_source_node_ids; |
440 | | |
441 | | std::shared_ptr<MemShareArbitrator> _mem_arb = nullptr; |
442 | | std::shared_ptr<MemLimiter> _mem_limiter = nullptr; |
443 | | }; |
444 | | |
445 | | } // namespace doris |