be/src/exec/operator/scan_operator.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <cstdint> |
21 | | #include <mutex> |
22 | | #include <set> |
23 | | #include <string> |
24 | | |
25 | | #include "common/status.h" |
26 | | #include "core/field.h" |
27 | | #include "exec/common/util.hpp" |
28 | | #include "exec/operator/operator.h" |
29 | | #include "exec/pipeline/dependency.h" |
30 | | #include "exec/runtime_filter/runtime_filter_consumer_helper.h" |
31 | | #include "exec/scan/scan_node.h" |
32 | | #include "exec/scan/scanner_context.h" |
33 | | #include "exprs/function_filter.h" |
34 | | #include "exprs/vectorized_fn_call.h" |
35 | | #include "exprs/vin_predicate.h" |
36 | | #include "runtime/descriptors.h" |
37 | | #include "storage/predicate/filter_olap_param.h" |
38 | | |
39 | | namespace doris { |
40 | | #include "common/compile_check_begin.h" |
41 | | class ScannerDelegate; |
42 | | class OlapScanner; |
43 | | } // namespace doris |
44 | | |
45 | | namespace doris { |
46 | | |
47 | | enum class PushDownType { |
48 | | // The predicate can not be pushed down to data source |
49 | | UNACCEPTABLE, |
50 | | // The predicate can be pushed down to data source |
51 | | // and the data source can fully evaludate it |
52 | | ACCEPTABLE, |
53 | | // The predicate can be pushed down to data source |
54 | | // but the data source can not fully evaluate it. |
55 | | PARTIAL_ACCEPTABLE |
56 | | }; |
57 | | |
58 | | class ScanLocalStateBase : public PipelineXLocalState<> { |
59 | | public: |
60 | | ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent) |
61 | 118 | : PipelineXLocalState<>(state, parent), _helper(parent->runtime_filter_descs()) {} |
62 | 118 | ~ScanLocalStateBase() override = default; |
63 | | |
64 | | [[nodiscard]] virtual bool should_run_serial() const = 0; |
65 | | |
66 | | virtual RuntimeProfile* scanner_profile() = 0; |
67 | | |
68 | | [[nodiscard]] virtual const TupleDescriptor* input_tuple_desc() const = 0; |
69 | | [[nodiscard]] virtual const TupleDescriptor* output_tuple_desc() const = 0; |
70 | | |
71 | | virtual int64_t limit_per_scanner() = 0; |
72 | | |
73 | | virtual void set_scan_ranges(RuntimeState* state, |
74 | | const std::vector<TScanRangeParams>& scan_ranges) = 0; |
75 | | virtual TPushAggOp::type get_push_down_agg_type() = 0; |
76 | | |
77 | | // If scan operator is serial operator(like topn), its real parallelism is 1. |
78 | | // Otherwise, its real parallelism is query_parallel_instance_num. |
79 | | // query_parallel_instance_num of olap table is usually equal to session var parallel_pipeline_task_num. |
80 | | // for file scan operator, its real parallelism will be 1 if it is in batch mode. |
81 | | // Related pr: |
82 | | // https://github.com/apache/doris/pull/42460 |
83 | | // https://github.com/apache/doris/pull/44635 |
84 | | [[nodiscard]] virtual int max_scanners_concurrency(RuntimeState* state) const; |
85 | | [[nodiscard]] virtual int min_scanners_concurrency(RuntimeState* state) const; |
86 | | [[nodiscard]] virtual ScannerScheduler* scan_scheduler(RuntimeState* state) const; |
87 | | |
88 | 0 | [[nodiscard]] std::string get_name() { return _parent->get_name(); } |
89 | | |
90 | 11 | uint64_t get_condition_cache_digest() const { return _condition_cache_digest; } |
91 | | |
92 | | Status update_late_arrival_runtime_filter(RuntimeState* state, int& arrived_rf_num); |
93 | | |
94 | | Status clone_conjunct_ctxs(VExprContextSPtrs& scanner_conjuncts); |
95 | | |
96 | | protected: |
97 | | friend class ScannerContext; |
98 | | friend class Scanner; |
99 | | |
100 | | virtual Status _init_profile() = 0; |
101 | | |
102 | | std::atomic<bool> _opened {false}; |
103 | | |
104 | | DependencySPtr _scan_dependency = nullptr; |
105 | | |
106 | | std::shared_ptr<RuntimeProfile> _scanner_profile; |
107 | | RuntimeProfile::Counter* _scanner_wait_worker_timer = nullptr; |
108 | | // Num of newly created free blocks when running query |
109 | | RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; |
110 | | // Max num of scanner thread |
111 | | RuntimeProfile::Counter* _max_scan_concurrency = nullptr; |
112 | | RuntimeProfile::Counter* _min_scan_concurrency = nullptr; |
113 | | RuntimeProfile::HighWaterMarkCounter* _peak_running_scanner = nullptr; |
114 | | // time of get block from scanner |
115 | | RuntimeProfile::Counter* _scan_timer = nullptr; |
116 | | RuntimeProfile::Counter* _scan_cpu_timer = nullptr; |
117 | | // time of filter output block from scanner |
118 | | RuntimeProfile::Counter* _filter_timer = nullptr; |
119 | | // rows read from the scanner (including those discarded by (pre)filters) |
120 | | RuntimeProfile::Counter* _rows_read_counter = nullptr; |
121 | | |
122 | | RuntimeProfile::Counter* _num_scanners = nullptr; |
123 | | |
124 | | RuntimeProfile::Counter* _wait_for_rf_timer = nullptr; |
125 | | |
126 | | RuntimeProfile::Counter* _scan_rows = nullptr; |
127 | | RuntimeProfile::Counter* _scan_bytes = nullptr; |
128 | | |
129 | | std::mutex _conjuncts_lock; |
130 | | RuntimeFilterConsumerHelper _helper; |
131 | | // magic number as seed to generate hash value for condition cache |
132 | | uint64_t _condition_cache_digest = 0; |
133 | | // condition cache filter stats |
134 | | RuntimeProfile::Counter* _condition_cache_hit_counter = nullptr; |
135 | | RuntimeProfile::Counter* _condition_cache_filtered_rows_counter = nullptr; |
136 | | |
137 | | // Moved from ScanLocalState<Derived> to avoid re-instantiation for each Derived type. |
138 | | std::atomic<bool> _eos = false; |
139 | | int _max_pushdown_conditions_per_column = 1024; |
140 | | // Save all function predicates which may be pushed down to data source. |
141 | | std::vector<FunctionFilter> _push_down_functions; |
142 | | |
143 | | // Virtual methods with default implementations; overridden by subclasses when supported. |
144 | | // Declared here so that the normalize methods below (non-Derived-template) can call them. |
145 | 0 | virtual bool _push_down_topn(const RuntimePredicate& predicate) { return false; } |
146 | 0 | virtual PushDownType _should_push_down_bloom_filter() const { |
147 | 0 | return PushDownType::UNACCEPTABLE; |
148 | 0 | } |
149 | 0 | virtual PushDownType _should_push_down_topn_filter() const { |
150 | 0 | return PushDownType::UNACCEPTABLE; |
151 | 0 | } |
152 | 0 | virtual PushDownType _should_push_down_bitmap_filter() const { |
153 | 0 | return PushDownType::UNACCEPTABLE; |
154 | 0 | } |
155 | 0 | virtual PushDownType _should_push_down_is_null_predicate(VectorizedFnCall* fn_call) const { |
156 | 0 | return PushDownType::UNACCEPTABLE; |
157 | 0 | } |
158 | 0 | virtual PushDownType _should_push_down_in_predicate() const { |
159 | 0 | return PushDownType::UNACCEPTABLE; |
160 | 0 | } |
161 | | virtual PushDownType _should_push_down_binary_predicate( |
162 | | VectorizedFnCall* fn_call, VExprContext* expr_ctx, Field& constant_val, |
163 | 0 | const std::set<std::string> fn_name) const { |
164 | 0 | return PushDownType::UNACCEPTABLE; |
165 | 0 | } |
166 | | virtual Status _should_push_down_function_filter(VectorizedFnCall* fn_call, |
167 | | VExprContext* expr_ctx, |
168 | | StringRef* constant_str, |
169 | | doris::FunctionContext** fn_ctx, |
170 | 0 | PushDownType& pdt) { |
171 | 0 | pdt = PushDownType::UNACCEPTABLE; |
172 | 0 | return Status::OK(); |
173 | 0 | } |
174 | | |
175 | | // Non-templated normalize methods, moved here to avoid re-compilation per Derived type. |
176 | | Status _eval_const_conjuncts(VExprContext* expr_ctx, PushDownType* pdt); |
177 | | Status _normalize_bloom_filter(VExprContext* expr_ctx, const VExprSPtr& root, |
178 | | SlotDescriptor* slot, |
179 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
180 | | PushDownType* pdt); |
181 | | Status _normalize_topn_filter(VExprContext* expr_ctx, const VExprSPtr& root, |
182 | | SlotDescriptor* slot, |
183 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
184 | | PushDownType* pdt); |
185 | | Status _normalize_bitmap_filter(VExprContext* expr_ctx, const VExprSPtr& root, |
186 | | SlotDescriptor* slot, |
187 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
188 | | PushDownType* pdt); |
189 | | Status _normalize_function_filters(VExprContext* expr_ctx, SlotDescriptor* slot, |
190 | | PushDownType* pdt); |
191 | | |
192 | | // Inner PrimitiveType-template methods. Moved to base to avoid N(Derived)×M(PrimitiveType) |
193 | | // instantiation blowup: now instantiated M times total instead of N×M times. |
194 | | template <PrimitiveType T> |
195 | | Status _normalize_in_predicate(VExprContext* expr_ctx, const VExprSPtr& root, |
196 | | SlotDescriptor* slot, |
197 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
198 | | ColumnValueRange<T>& range, PushDownType* pdt); |
199 | | template <PrimitiveType T> |
200 | | Status _normalize_binary_predicate(VExprContext* expr_ctx, const VExprSPtr& root, |
201 | | SlotDescriptor* slot, |
202 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
203 | | ColumnValueRange<T>& range, PushDownType* pdt); |
204 | | template <PrimitiveType T> |
205 | | Status _normalize_is_null_predicate(VExprContext* expr_ctx, const VExprSPtr& root, |
206 | | SlotDescriptor* slot, |
207 | | std::vector<std::shared_ptr<ColumnPredicate>>& predicates, |
208 | | ColumnValueRange<T>& range, PushDownType* pdt); |
209 | | template <PrimitiveType PrimitiveType, typename ChangeFixedValueRangeFunc> |
210 | | Status _change_value_range(bool is_equal_op, ColumnValueRange<PrimitiveType>& range, |
211 | | const Field& value, const ChangeFixedValueRangeFunc& func, |
212 | | const std::string& fn_name); |
213 | | }; |
214 | | |
215 | | template <typename LocalStateType> |
216 | | class ScanOperatorX; |
217 | | template <typename Derived> |
218 | | class ScanLocalState : public ScanLocalStateBase { |
219 | | ENABLE_FACTORY_CREATOR(ScanLocalState); |
220 | | ScanLocalState(RuntimeState* state, OperatorXBase* parent) |
221 | 118 | : ScanLocalStateBase(state, parent) {}_ZN5doris14ScanLocalStateINS_18MockScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Line | Count | Source | 221 | 105 | : ScanLocalStateBase(state, parent) {} |
_ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Line | Count | Source | 221 | 11 | : ScanLocalStateBase(state, parent) {} |
_ZN5doris14ScanLocalStateINS_18FileScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Line | Count | Source | 221 | 2 | : ScanLocalStateBase(state, parent) {} |
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEEC2EPNS_12RuntimeStateEPNS_13OperatorXBaseE |
222 | 118 | ~ScanLocalState() override = default; _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEED2Ev Line | Count | Source | 222 | 105 | ~ScanLocalState() override = default; |
_ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEED2Ev Line | Count | Source | 222 | 11 | ~ScanLocalState() override = default; |
_ZN5doris14ScanLocalStateINS_18FileScanLocalStateEED2Ev Line | Count | Source | 222 | 2 | ~ScanLocalState() override = default; |
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEED2Ev Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEED2Ev Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEED2Ev Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEED2Ev |
223 | | |
224 | | virtual Status init(RuntimeState* state, LocalStateInfo& info) override; |
225 | | |
226 | | virtual Status open(RuntimeState* state) override; |
227 | | |
228 | | Status close(RuntimeState* state) override; |
229 | | std::string debug_string(int indentation_level) const final; |
230 | | |
231 | | [[nodiscard]] bool should_run_serial() const override; |
232 | | |
233 | 14 | RuntimeProfile* scanner_profile() override { return _scanner_profile.get(); }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE15scanner_profileEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE15scanner_profileEv _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE15scanner_profileEv Line | Count | Source | 233 | 14 | RuntimeProfile* scanner_profile() override { return _scanner_profile.get(); } |
Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE15scanner_profileEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE15scanner_profileEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE15scanner_profileEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE15scanner_profileEv |
234 | | |
235 | | [[nodiscard]] const TupleDescriptor* input_tuple_desc() const override; |
236 | | [[nodiscard]] const TupleDescriptor* output_tuple_desc() const override; |
237 | | |
238 | | int64_t limit_per_scanner() override; |
239 | | |
240 | | void set_scan_ranges(RuntimeState* state, |
241 | 0 | const std::vector<TScanRangeParams>& scan_ranges) override {}Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE15set_scan_rangesEPNS_12RuntimeStateERKSt6vectorINS_16TScanRangeParamsESaIS6_EE |
242 | | |
243 | | TPushAggOp::type get_push_down_agg_type() override; |
244 | | |
245 | 0 | std::vector<Dependency*> execution_dependencies() override { |
246 | 0 | if (_filter_dependencies.empty()) { |
247 | 0 | return {}; |
248 | 0 | } |
249 | 0 | std::vector<Dependency*> res(_filter_dependencies.size()); |
250 | 0 | std::transform(_filter_dependencies.begin(), _filter_dependencies.end(), res.begin(), |
251 | 0 | [](DependencySPtr dep) { return dep.get(); });Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18MockScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18FileScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_16EsScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ Unexecuted instantiation: _ZZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE22execution_dependenciesEvENKUlSt10shared_ptrINS_10DependencyEEE_clES5_ |
252 | 0 | return res; |
253 | 0 | } Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE22execution_dependenciesEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE22execution_dependenciesEv |
254 | | |
255 | 0 | std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; }Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18MockScanLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18OlapScanLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18FileScanLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_21GroupCommitLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18JDBCScanLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_16EsScanLocalStateEE12dependenciesEv Unexecuted instantiation: _ZNK5doris14ScanLocalStateINS_18MetaScanLocalStateEE12dependenciesEv |
256 | | |
257 | 0 | std::vector<int> get_topn_filter_source_node_ids(RuntimeState* state, bool push_down) { |
258 | 0 | std::vector<int> result; |
259 | 0 | for (int id : _parent->cast<typename Derived::Parent>()._topn_filter_source_node_ids) { |
260 | 0 | if (!state->get_query_ctx()->has_runtime_predicate(id)) { |
261 | | // compatible with older versions fe |
262 | 0 | continue; |
263 | 0 | } |
264 | | |
265 | 0 | const auto& pred = state->get_query_ctx()->get_runtime_predicate(id); |
266 | 0 | if (!pred.enable()) { |
267 | 0 | continue; |
268 | 0 | } |
269 | 0 | if (_push_down_topn(pred) == push_down) { |
270 | 0 | result.push_back(id); |
271 | 0 | } |
272 | 0 | } |
273 | 0 | return result; |
274 | 0 | } Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE31get_topn_filter_source_node_idsEPNS_12RuntimeStateEb |
275 | | |
276 | | protected: |
277 | | template <typename LocalStateType> |
278 | | friend class ScanOperatorX; |
279 | | friend class ScannerContext; |
280 | | friend class Scanner; |
281 | | |
282 | | Status _init_profile() override; |
283 | 0 | virtual Status _process_conjuncts(RuntimeState* state) { return _normalize_conjuncts(state); }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE18_process_conjunctsEPNS_12RuntimeStateE |
284 | 0 | virtual bool _should_push_down_common_expr() { return false; }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE29_should_push_down_common_exprEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE29_should_push_down_common_exprEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE29_should_push_down_common_exprEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE29_should_push_down_common_exprEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE29_should_push_down_common_exprEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE29_should_push_down_common_exprEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE29_should_push_down_common_exprEv |
285 | | |
286 | 0 | virtual bool _storage_no_merge() { return false; }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE17_storage_no_mergeEv Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE17_storage_no_mergeEv |
287 | 0 | virtual bool _is_key_column(const std::string& col_name) { return false; }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE14_is_key_columnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE |
288 | | |
289 | | // Create a list of scanners. |
290 | | // The number of scanners is related to the implementation of the data source, |
291 | | // predicate conditions, and scheduling strategy. |
292 | | // So this method needs to be implemented separately by the subclass of ScanNode. |
293 | | // Finally, a set of scanners that have been prepared are returned. |
294 | 0 | virtual Status _init_scanners(std::list<ScannerSPtr>* scanners) { return Status::OK(); }Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MockScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18OlapScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18FileScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_21GroupCommitLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18JDBCScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_16EsScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE Unexecuted instantiation: _ZN5doris14ScanLocalStateINS_18MetaScanLocalStateEE14_init_scannersEPNSt7__cxx114listISt10shared_ptrINS_7ScannerEESaIS7_EEE |
295 | | |
296 | | Status _normalize_conjuncts(RuntimeState* state); |
297 | | // Normalize a conjunct and try to convert it to column predicate recursively. |
298 | | Status _normalize_predicate(VExprContext* context, const VExprSPtr& root, |
299 | | VExprSPtr& output_expr); |
300 | | bool _is_predicate_acting_on_slot(const VExprSPtrs& children, SlotDescriptor** slot_desc, |
301 | | ColumnValueRangeType** range); |
302 | | Status _prepare_scanners(); |
303 | | |
304 | | // Submit the scanner to the thread pool and start execution |
305 | | Status _start_scanners(const std::list<std::shared_ptr<ScannerDelegate>>& scanners); |
306 | | |
307 | | // For some conjunct there is chance to elimate cast operator |
308 | | // Eg. Variant's sub column could eliminate cast in storage layer if |
309 | | // cast dst column type equals storage column type |
310 | | void get_cast_types_for_variants(); |
311 | | void _filter_and_collect_cast_type_for_variant( |
312 | | const VExpr* expr, |
313 | | std::unordered_map<std::string, std::vector<DataTypePtr>>& colname_to_cast_types); |
314 | | |
315 | | Status _get_topn_filters(RuntimeState* state); |
316 | | |
317 | | // Stores conjuncts that have been fully pushed down to the storage layer as predicate columns. |
318 | | // These expr contexts are kept alive to prevent their FunctionContext and constant strings |
319 | | // from being freed prematurely. |
320 | | VExprContextSPtrs _stale_expr_ctxs; |
321 | | VExprContextSPtrs _common_expr_ctxs_push_down; |
322 | | |
323 | | atomic_shared_ptr<ScannerContext> _scanner_ctx; |
324 | | |
325 | | // colname -> cast dst type |
326 | | std::map<std::string, DataTypePtr> _cast_types_for_variants; |
327 | | |
328 | | // slot id -> ColumnValueRange |
329 | | // Parsed from conjuncts |
330 | | phmap::flat_hash_map<int, ColumnValueRangeType> _slot_id_to_value_range; |
331 | | phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> _slot_id_to_predicates; |
332 | | std::vector<std::shared_ptr<MutilColumnBlockPredicate>> _or_predicates; |
333 | | |
334 | | std::vector<std::shared_ptr<Dependency>> _filter_dependencies; |
335 | | |
336 | | // ScanLocalState owns the ownership of scanner, scanner context only has its weakptr |
337 | | std::list<std::shared_ptr<ScannerDelegate>> _scanners; |
338 | | Arena _arena; |
339 | | }; |
340 | | |
341 | | template <typename LocalStateType> |
342 | | class ScanOperatorX : public OperatorX<LocalStateType> { |
343 | | public: |
344 | | Status init(const TPlanNode& tnode, RuntimeState* state) override; |
345 | | Status prepare(RuntimeState* state) override; |
346 | | Status get_block(RuntimeState* state, Block* block, bool* eos) override; |
347 | 0 | Status get_block_after_projects(RuntimeState* state, Block* block, bool* eos) override { |
348 | 0 | Status status = get_block(state, block, eos); |
349 | 0 | if (status.ok()) { |
350 | 0 | if (auto rows = block->rows()) { |
351 | 0 | auto* local_state = state->get_local_state(operator_id()); |
352 | 0 | COUNTER_UPDATE(local_state->_rows_returned_counter, rows); |
353 | 0 | COUNTER_UPDATE(local_state->_blocks_returned_counter, 1); |
354 | 0 | } |
355 | 0 | } |
356 | 0 | return status; |
357 | 0 | } Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_16EsScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE24get_block_after_projectsEPNS_12RuntimeStateEPNS_5BlockEPb |
358 | 0 | [[nodiscard]] bool is_source() const override { return true; }Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MockScanLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18OlapScanLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18FileScanLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_21GroupCommitLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18JDBCScanLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_16EsScanLocalStateEE9is_sourceEv Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MetaScanLocalStateEE9is_sourceEv |
359 | | |
360 | | [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state) override; |
361 | | |
362 | 118 | const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override { |
363 | 118 | return _runtime_filter_descs; |
364 | 118 | } _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE20runtime_filter_descsEv Line | Count | Source | 362 | 105 | const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override { | 363 | 105 | return _runtime_filter_descs; | 364 | 105 | } |
_ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE20runtime_filter_descsEv Line | Count | Source | 362 | 11 | const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override { | 363 | 11 | return _runtime_filter_descs; | 364 | 11 | } |
_ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE20runtime_filter_descsEv Line | Count | Source | 362 | 2 | const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override { | 363 | 2 | return _runtime_filter_descs; | 364 | 2 | } |
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE20runtime_filter_descsEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE20runtime_filter_descsEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_16EsScanLocalStateEE20runtime_filter_descsEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE20runtime_filter_descsEv |
365 | | |
366 | 0 | [[nodiscard]] virtual int get_column_id(const std::string& col_name) const { return -1; }Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MockScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18OlapScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18FileScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_21GroupCommitLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18JDBCScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_16EsScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MetaScanLocalStateEE13get_column_idERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE |
367 | | |
368 | 0 | TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_16EsScanLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE22get_push_down_agg_typeEv Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE22get_push_down_agg_typeEv |
369 | | |
370 | 0 | DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { |
371 | 0 | if (OperatorX<LocalStateType>::is_serial_operator()) { |
372 | | // `is_serial_operator()` returns true means we ignore the distribution. |
373 | 0 | return {ExchangeType::NOOP}; |
374 | 0 | } |
375 | 0 | return {ExchangeType::BUCKET_HASH_SHUFFLE}; |
376 | 0 | } Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MockScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18OlapScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18FileScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_21GroupCommitLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18JDBCScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_16EsScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE Unexecuted instantiation: _ZNK5doris13ScanOperatorXINS_18MetaScanLocalStateEE26required_data_distributionEPNS_12RuntimeStateE |
377 | | |
378 | 0 | void set_low_memory_mode(RuntimeState* state) override { |
379 | 0 | auto& local_state = get_local_state(state); |
380 | |
|
381 | 0 | if (auto ctx = local_state._scanner_ctx.load()) { |
382 | 0 | ctx->clear_free_blocks(); |
383 | 0 | } |
384 | 0 | } Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18FileScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_16EsScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEE19set_low_memory_modeEPNS_12RuntimeStateE |
385 | | |
386 | | using OperatorX<LocalStateType>::node_id; |
387 | | using OperatorX<LocalStateType>::operator_id; |
388 | | using OperatorX<LocalStateType>::get_local_state; |
389 | | |
390 | | #ifdef BE_TEST |
391 | 21 | ScanOperatorX() = default; |
392 | | #endif |
393 | | |
394 | | protected: |
395 | | using LocalState = LocalStateType; |
396 | | friend class OlapScanner; |
397 | | ScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, |
398 | | const DescriptorTbl& descs, int parallel_tasks = 0); |
399 | 36 | virtual ~ScanOperatorX() = default; _ZN5doris13ScanOperatorXINS_18MockScanLocalStateEED2Ev Line | Count | Source | 399 | 21 | virtual ~ScanOperatorX() = default; |
_ZN5doris13ScanOperatorXINS_18OlapScanLocalStateEED2Ev Line | Count | Source | 399 | 13 | virtual ~ScanOperatorX() = default; |
_ZN5doris13ScanOperatorXINS_18FileScanLocalStateEED2Ev Line | Count | Source | 399 | 2 | virtual ~ScanOperatorX() = default; |
Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_21GroupCommitLocalStateEED2Ev Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18JDBCScanLocalStateEED2Ev Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_16EsScanLocalStateEED2Ev Unexecuted instantiation: _ZN5doris13ScanOperatorXINS_18MetaScanLocalStateEED2Ev |
400 | | template <typename Derived> |
401 | | friend class ScanLocalState; |
402 | | friend class OlapScanLocalState; |
403 | | |
404 | | // For load scan node, there should be both input and output tuple descriptor. |
405 | | // For query scan node, there is only output_tuple_desc. |
406 | | TupleId _input_tuple_id = -1; |
407 | | TupleId _output_tuple_id = -1; |
408 | | const TupleDescriptor* _input_tuple_desc = nullptr; |
409 | | const TupleDescriptor* _output_tuple_desc = nullptr; |
410 | | |
411 | | phmap::flat_hash_map<int, SlotDescriptor*> _slot_id_to_slot_desc; |
412 | | std::unordered_map<std::string, int> _colname_to_slot_id; |
413 | | |
414 | | // These two values are from query_options |
415 | | int _max_scan_key_num = 48; |
416 | | int _max_pushdown_conditions_per_column = 1024; |
417 | | |
418 | | // If the query like select * from table limit 10; then the query should run in |
419 | | // single scanner to avoid too many scanners which will cause lots of useless read. |
420 | | bool _should_run_serial = false; |
421 | | |
422 | | VExprContextSPtrs _common_expr_ctxs_push_down; |
423 | | |
424 | | // If sort info is set, push limit to each scanner; |
425 | | int64_t _limit_per_scanner = -1; |
426 | | |
427 | | std::vector<TRuntimeFilterDesc> _runtime_filter_descs; |
428 | | |
429 | | TPushAggOp::type _push_down_agg_type; |
430 | | |
431 | | // Record the value of the aggregate function 'count' from doris's be |
432 | | int64_t _push_down_count = -1; |
433 | | const int _parallel_tasks = 0; |
434 | | |
435 | | std::vector<int> _topn_filter_source_node_ids; |
436 | | }; |
437 | | |
438 | | #include "common/compile_check_end.h" |
439 | | } // namespace doris |