/root/doris/be/src/exec/exec_node.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/apache/impala/blob/branch-2.9.0/be/src/exec/exec-node.h |
19 | | // and modified by Doris |
20 | | |
21 | | #pragma once |
22 | | |
23 | | #include <gen_cpp/PlanNodes_types.h> |
24 | | |
25 | | #include <atomic> |
26 | | #include <cstddef> |
27 | | #include <cstdint> |
28 | | #include <functional> |
29 | | #include <memory> |
30 | | #include <mutex> |
31 | | #include <sstream> |
32 | | #include <string> |
33 | | #include <vector> |
34 | | |
35 | | #include "common/global_types.h" |
36 | | #include "common/status.h" |
37 | | #include "runtime/descriptors.h" |
38 | | #include "util/runtime_profile.h" |
39 | | #include "vec/core/block.h" |
40 | | #include "vec/exprs/vexpr_fwd.h" |
41 | | |
42 | | namespace doris { |
43 | | class ObjectPool; |
44 | | class RuntimeState; |
45 | | class MemTracker; |
46 | | class QueryStatistics; |
47 | | |
48 | | namespace pipeline { |
49 | | class OperatorBase; |
50 | | } // namespace pipeline |
51 | | |
52 | | using std::string; |
53 | | using std::stringstream; |
54 | | using std::vector; |
55 | | |
56 | | // Superclass of all executor nodes. |
57 | | // All subclasses need to make sure to check RuntimeState::is_cancelled() |
58 | | // periodically in order to ensure timely termination after the cancellation |
59 | | // flag gets set. |
60 | | class ExecNode { |
61 | | public: |
62 | | // Init conjuncts. |
63 | | ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); |
64 | | |
65 | | virtual ~ExecNode(); |
66 | | |
67 | | /// Initializes this object from the thrift tnode desc. The subclass should |
68 | | /// do any initialization that can fail in Init() rather than the ctor. |
69 | | /// If overridden in subclass, must first call superclass's Init(). |
70 | | [[nodiscard]] virtual Status init(const TPlanNode& tnode, RuntimeState* state); |
71 | | |
72 | | // Sets up internal structures, etc., without doing any actual work. |
73 | | // Must be called prior to open(). Will only be called once in this |
74 | | // node's lifetime. |
75 | | // All code generation (adding functions to the LlvmCodeGen object) must happen |
76 | | // in prepare(). Retrieving the jit compiled function pointer must happen in |
77 | | // open(). |
78 | | // If overridden in subclass, must first call superclass's prepare(). |
79 | | [[nodiscard]] virtual Status prepare(RuntimeState* state); |
80 | | |
81 | | /* |
82 | | * For open and alloc_resource: |
83 | | * Base class ExecNode's `open` only calls `alloc_resource`, which opens some public projections. |
84 | | * If was overrided, `open` must call corresponding `alloc_resource` since it's a (early) part of opening. |
85 | | * Or just call `ExecNode::open` is alternative way. |
86 | | * Then `alloc_resource` call father's after it's own business to make the progress completed, including the projections. |
87 | | * In Pipeline engine: |
88 | | * PipeContext::prepare -> node::prepare |
89 | | * Task::open -> StreamingOp::open -> node::alloc_resource, for sink+source splits, only open in SinkOperator. |
90 | | * So in pipeline, the things directly done by open(like call child's) wouldn't be done in `open`. |
91 | | */ |
92 | | // Performs any preparatory work prior to calling get_next(). |
93 | | // Can be called repeatedly (after calls to close()). |
94 | | // Caller must not be holding any io buffers. This will cause deadlock. |
95 | | [[nodiscard]] virtual Status open(RuntimeState* state); |
96 | | |
97 | | // Alloc and open resource for the node |
98 | | // Only pipeline operator use exec node need to impl the virtual function |
99 | | // so only vectorized exec node need to impl |
100 | | [[nodiscard]] virtual Status alloc_resource(RuntimeState* state); |
101 | | |
102 | | // Retrieves rows and returns them via row_batch. Sets eos to true |
103 | | // if subsequent calls will not retrieve any more rows. |
104 | | // Data referenced by any tuples returned in row_batch must not be overwritten |
105 | | // by the callee until close() is called. The memory holding that data |
106 | | // can be returned via row_batch's tuple_data_pool (in which case it may be deleted |
107 | | // by the caller) or held on to by the callee. The row_batch, including its |
108 | | // tuple_data_pool, will be destroyed by the caller at some point prior to the final |
109 | | // close() call. |
110 | | // In other words, if the memory holding the tuple data will be referenced |
111 | | // by the callee in subsequent get_next() calls, it must *not* be attached to the |
112 | | // row_batch's tuple_data_pool. |
113 | | // Caller must not be holding any io buffers. This will cause deadlock. |
114 | | // TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet. |
115 | | [[nodiscard]] virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos); |
116 | | // new interface to compatible new optimizers in FE |
117 | | [[nodiscard]] virtual Status get_next_after_projects( |
118 | | RuntimeState* state, vectorized::Block* block, bool* eos, |
119 | | const std::function<Status(RuntimeState*, vectorized::Block*, bool*)>& fn, |
120 | | bool clear_data = true); |
121 | | |
122 | | // Used by pipeline streaming operators. |
123 | 0 | vectorized::Block* get_clear_input_block() { |
124 | 0 | clear_origin_block(); |
125 | 0 | return &_origin_block; |
126 | 0 | } |
127 | 2 | bool has_output_row_descriptor() const { return _output_row_descriptor != nullptr; } |
128 | | // If use projection, we should clear `_origin_block`. |
129 | 0 | void clear_origin_block() { |
130 | 0 | _origin_block.clear_column_data(intermediate_row_desc().num_materialized_slots()); |
131 | 0 | } |
132 | | |
133 | | // Emit data, both need impl with method: sink |
134 | | // Eg: Aggregation, Sort, Scan |
135 | | [[nodiscard]] virtual Status pull(RuntimeState* state, vectorized::Block* output_block, |
136 | 0 | bool* eos) { |
137 | 0 | return get_next(state, output_block, eos); |
138 | 0 | } |
139 | | |
140 | | [[nodiscard]] virtual Status push(RuntimeState* state, vectorized::Block* input_block, |
141 | 0 | bool eos) { |
142 | 0 | return Status::OK(); |
143 | 0 | } |
144 | | |
145 | 0 | bool can_read() const { return _can_read; } |
146 | | |
147 | 0 | [[nodiscard]] virtual bool can_terminate_early() { return false; } |
148 | | |
149 | | // Sink Data to ExecNode to do some stock work, both need impl with method: get_result |
150 | | // `eos` means source is exhausted, exec node should do some finalize work |
151 | | // Eg: Aggregation, Sort |
152 | | [[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block* input_block, |
153 | | bool eos); |
154 | | |
155 | | // Resets the stream of row batches to be retrieved by subsequent GetNext() calls. |
156 | | // Clears all internal state, returning this node to the state it was in after calling |
157 | | // Prepare() and before calling Open(). This function must not clear memory |
158 | | // still owned by this node that is backing rows returned in GetNext(). |
159 | | // Prepare() and Open() must have already been called before calling Reset(). |
160 | | // GetNext() may have optionally been called (not necessarily until eos). |
161 | | // Close() must not have been called. |
162 | | // Reset() is not idempotent. Calling it multiple times in a row without a preceding |
163 | | // call to Open() is invalid. |
164 | | // If overridden in a subclass, must call superclass's Reset() at the end. The default |
165 | | // implementation calls Reset() on children. |
166 | | // Note that this function may be called many times (proportional to the input data), |
167 | | // so should be fast. |
168 | | [[nodiscard]] virtual Status reset(RuntimeState* state); |
169 | | |
170 | | // close() will get called for every exec node, regardless of what else is called and |
171 | | // the status of these calls (i.e. prepare() may never have been called, or |
172 | | // prepare()/open()/get_next() returned with an error). |
173 | | // close() releases all resources that were allocated in open()/get_next(), even if the |
174 | | // latter ended with an error. close() can be called if the node has been prepared or |
175 | | // the node is closed. |
176 | | // After calling close(), the caller calls open() again prior to subsequent calls to |
177 | | // get_next(). The default implementation updates runtime profile counters and calls |
178 | | // close() on the children. To ensure that close() is called on the entire plan tree, |
179 | | // each implementation should start out by calling the default implementation. |
180 | | virtual Status close(RuntimeState* state); |
181 | | |
182 | 0 | void increase_ref() { ++_ref; } |
183 | 0 | int decrease_ref() { return --_ref; } |
184 | | |
185 | | // Release and close resource for the node |
186 | | // Only pipeline operator use exec node need to impl the virtual function |
187 | | // so only vectorized exec node need to impl |
188 | | virtual void release_resource(RuntimeState* state); |
189 | | |
190 | | // Creates exec node tree from list of nodes contained in plan via depth-first |
191 | | // traversal. All nodes are placed in pool. |
192 | | // Returns error if 'plan' is corrupted, otherwise success. |
193 | | [[nodiscard]] static Status create_tree(RuntimeState* state, ObjectPool* pool, |
194 | | const TPlan& plan, const DescriptorTbl& descs, |
195 | | ExecNode** root); |
196 | | |
197 | | // Collect all nodes of given 'node_type' that are part of this subtree, and return in |
198 | | // 'nodes'. |
199 | | void collect_nodes(TPlanNodeType::type node_type, std::vector<ExecNode*>* nodes); |
200 | | |
201 | | // Collect all scan node types. |
202 | | void collect_scan_nodes(std::vector<ExecNode*>* nodes); |
203 | | |
204 | 0 | virtual void prepare_for_next() {} |
205 | | |
206 | | // Returns a string representation in DFS order of the plan rooted at this. |
207 | | std::string debug_string() const; |
208 | | |
209 | | // recursive helper method for generating a string for Debug_string(). |
210 | | // implementations should call debug_string(int, std::stringstream) on their children. |
211 | | // Input parameters: |
212 | | // indentation_level: Current level in plan tree. |
213 | | // Output parameters: |
214 | | // out: Stream to accumulate debug string. |
215 | | virtual void debug_string(int indentation_level, std::stringstream* out) const; |
216 | | |
217 | 2 | int id() const { return _id; } |
218 | 0 | TPlanNodeType::type type() const { return _type; } |
219 | 0 | virtual const RowDescriptor& row_desc() const { |
220 | 0 | return _output_row_descriptor ? *_output_row_descriptor : _row_descriptor; |
221 | 0 | } |
222 | 2 | virtual const RowDescriptor& intermediate_row_desc() const { return _row_descriptor; } |
223 | | |
224 | | // input expr -> intermediate_projections[0] -> intermediate_projections[1] -> intermediate_projections[2] ... -> final projections -> output expr |
225 | | // prepare _row_descriptor intermediate_row_desc[0] intermediate_row_desc[1] intermediate_row_desc.end() _output_row_descriptor |
226 | | |
227 | 0 | [[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) { |
228 | 0 | if (idx == 0) { |
229 | 0 | return intermediate_row_desc(); |
230 | 0 | } |
231 | 0 | DCHECK((idx - 1) < _intermediate_output_row_descriptor.size()); |
232 | 0 | return _intermediate_output_row_descriptor[idx - 1]; |
233 | 0 | } |
234 | | |
235 | 2 | [[nodiscard]] const RowDescriptor& projections_row_desc() const { |
236 | 2 | if (_intermediate_output_row_descriptor.empty()) { |
237 | 2 | return intermediate_row_desc(); |
238 | 2 | } else { |
239 | 0 | return _intermediate_output_row_descriptor.back(); |
240 | 0 | } |
241 | 2 | } |
242 | | |
243 | 0 | int64_t rows_returned() const { return _num_rows_returned; } |
244 | 0 | int64_t limit() const { return _limit; } |
245 | 0 | bool reached_limit() const { return _limit != -1 && _num_rows_returned >= _limit; } |
246 | | /// Only use in vectorized exec engine to check whether reach limit and cut num row for block |
247 | | // and add block rows for profile |
248 | | void reached_limit(vectorized::Block* block, bool* eos); |
249 | 0 | const std::vector<TupleId>& get_tuple_ids() const { return _tuple_ids; } |
250 | | |
251 | 0 | RuntimeProfile* faker_runtime_profile() const { return _faker_runtime_profile.get(); } |
252 | 12 | RuntimeProfile* runtime_profile() const { return _runtime_profile.get(); } |
253 | 0 | RuntimeProfile::Counter* memory_used_counter() const { return _memory_used_counter; } |
254 | | |
255 | 0 | MemTracker* mem_tracker() const { return _mem_tracker.get(); } |
256 | | |
257 | | virtual std::string get_name(); |
258 | | |
259 | | // Names of counters shared by all exec nodes |
260 | | static const std::string ROW_THROUGHPUT_COUNTER; |
261 | | |
262 | 0 | ExecNode* child(int i) { return _children[i]; } |
263 | | |
264 | 0 | size_t children_count() const { return _children.size(); } |
265 | | |
266 | 0 | std::shared_ptr<QueryStatistics> get_query_statistics() { return _query_statistics; } |
267 | | |
268 | | protected: |
269 | | friend class DataSink; |
270 | | |
271 | | /// Release all memory of block which got from child. The block |
272 | | // 1. clear mem of valid column get from child, make sure child can reuse the mem |
273 | | // 2. delete and release the column which create by function all and other reason |
274 | | void release_block_memory(vectorized::Block& block, uint16_t child_idx = 0); |
275 | | |
276 | | /// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc |
277 | | Status do_projections(vectorized::Block* origin_block, vectorized::Block* output_block); |
278 | | |
279 | | int _id; // unique w/in single plan tree |
280 | | TPlanNodeType::type _type; |
281 | | ObjectPool* _pool = nullptr; |
282 | | std::vector<TupleId> _tuple_ids; |
283 | | |
284 | | vectorized::VExprContextSPtrs _conjuncts; |
285 | | |
286 | | std::vector<ExecNode*> _children; |
287 | | RowDescriptor _row_descriptor; |
288 | | vectorized::Block _origin_block; |
289 | | |
290 | | std::unique_ptr<RowDescriptor> _output_row_descriptor; |
291 | | vectorized::VExprContextSPtrs _projections; |
292 | | |
293 | | std::vector<RowDescriptor> _intermediate_output_row_descriptor; |
294 | | // Used in common subexpression elimination to compute intermediate results. |
295 | | std::vector<vectorized::VExprContextSPtrs> _intermediate_projections; |
296 | | |
297 | | /// Resource information sent from the frontend. |
298 | | const TBackendResourceProfile _resource_profile; |
299 | | |
300 | | int64_t _limit; // -1: no limit |
301 | | int64_t _num_rows_returned = 0; |
302 | | |
303 | | std::unique_ptr<RuntimeProfile> _runtime_profile; |
304 | | |
305 | | // Record this node memory size. it is expected that artificial guarantees are accurate, |
306 | | // which will providea reference for operator memory. |
307 | | std::unique_ptr<MemTracker> _mem_tracker; |
308 | | |
309 | | RuntimeProfile::Counter* _exec_timer = nullptr; |
310 | | RuntimeProfile::Counter* _rows_returned_counter = nullptr; |
311 | | RuntimeProfile::Counter* _output_bytes_counter = nullptr; |
312 | | RuntimeProfile::Counter* _block_count_counter = nullptr; |
313 | | RuntimeProfile::Counter* _rows_returned_rate = nullptr; |
314 | | RuntimeProfile::Counter* _memory_used_counter = nullptr; |
315 | | RuntimeProfile::Counter* _projection_timer = nullptr; |
316 | | // Account for peak memory used by this node |
317 | | RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; |
318 | | |
319 | | //NOTICE: now add a faker profile, because sometimes the profile record is useless |
320 | | //so we want remove some counters and timers, eg: in join node, if it's broadcast_join |
321 | | //and shared hash table, some counter/timer about build hash table is useless, |
322 | | //so we could add those counter/timer in faker profile, and those will not display in web profile. |
323 | | std::unique_ptr<RuntimeProfile> _faker_runtime_profile = |
324 | | std::make_unique<RuntimeProfile>("faker profile"); |
325 | | |
326 | | // Execution options that are determined at runtime. This is added to the |
327 | | // runtime profile at close(). Examples for options logged here would be |
328 | | // "Codegen Enabled" |
329 | | std::mutex _exec_options_lock; |
330 | | std::string _runtime_exec_options; |
331 | | |
332 | | // Set to true if this is a vectorized exec node. |
333 | | bool _is_vec = false; |
334 | | |
335 | 2 | bool is_closed() const { return _is_closed; } |
336 | | |
337 | | // Create a single exec node derived from thrift node; place exec node in 'pool'. |
338 | | static Status create_node(RuntimeState* state, ObjectPool* pool, const TPlanNode& tnode, |
339 | | const DescriptorTbl& descs, ExecNode** node); |
340 | | |
341 | 0 | virtual bool is_scan_node() const { return false; } |
342 | | |
343 | | void init_runtime_profile(const std::string& name); |
344 | | |
345 | | // Appends option to '_runtime_exec_options' |
346 | | void add_runtime_exec_option(const std::string& option); |
347 | | |
348 | | std::atomic<bool> _can_read = false; |
349 | | |
350 | | std::shared_ptr<QueryStatistics> _query_statistics = nullptr; |
351 | | |
352 | | //_keep_origin is used to avoid copying during projection, |
353 | | // currently set to false only in the nestloop join. |
354 | | bool _keep_origin = true; |
355 | | |
356 | | private: |
357 | | static Status create_tree_helper(RuntimeState* state, ObjectPool* pool, |
358 | | const std::vector<TPlanNode>& tnodes, |
359 | | const DescriptorTbl& descs, ExecNode* parent, int* node_idx, |
360 | | ExecNode** root); |
361 | | |
362 | | friend class pipeline::OperatorBase; |
363 | | bool _is_closed = false; |
364 | | bool _is_resource_released = false; |
365 | | std::atomic_int _ref = 0; // used by pipeline operator to release resource. |
366 | | }; |
367 | | |
368 | | } // namespace doris |