Coverage Report

Created: 2024-11-21 14:31

/root/doris/be/src/exec/exec_node.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/exec/exec-node.h
19
// and modified by Doris
20
21
#pragma once
22
23
#include <gen_cpp/PlanNodes_types.h>
24
25
#include <atomic>
26
#include <cstddef>
27
#include <cstdint>
28
#include <functional>
29
#include <memory>
30
#include <mutex>
31
#include <sstream>
32
#include <string>
33
#include <vector>
34
35
#include "common/global_types.h"
36
#include "common/status.h"
37
#include "runtime/descriptors.h"
38
#include "util/runtime_profile.h"
39
#include "vec/core/block.h"
40
#include "vec/exprs/vexpr_fwd.h"
41
42
namespace doris {
43
class ObjectPool;
44
class RuntimeState;
45
class MemTracker;
46
class QueryStatistics;
47
48
namespace pipeline {
49
class OperatorBase;
50
} // namespace pipeline
51
52
using std::string;
53
using std::stringstream;
54
using std::vector;
55
56
// Superclass of all executor nodes.
57
// All subclasses need to make sure to check RuntimeState::is_cancelled()
58
// periodically in order to ensure timely termination after the cancellation
59
// flag gets set.
60
class ExecNode {
61
public:
62
    // Init conjuncts.
63
    ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
64
65
    virtual ~ExecNode();
66
67
    /// Initializes this object from the thrift tnode desc. The subclass should
68
    /// do any initialization that can fail in Init() rather than the ctor.
69
    /// If overridden in subclass, must first call superclass's Init().
70
    [[nodiscard]] virtual Status init(const TPlanNode& tnode, RuntimeState* state);
71
72
    // Sets up internal structures, etc., without doing any actual work.
73
    // Must be called prior to open(). Will only be called once in this
74
    // node's lifetime.
75
    // All code generation (adding functions to the LlvmCodeGen object) must happen
76
    // in prepare().  Retrieving the jit compiled function pointer must happen in
77
    // open().
78
    // If overridden in subclass, must first call superclass's prepare().
79
    [[nodiscard]] virtual Status prepare(RuntimeState* state);
80
81
    /*
82
     * For open and alloc_resource:
83
     *  Base class ExecNode's `open` only calls `alloc_resource`, which opens some public projections.
84
     *  If was overrided, `open` must call corresponding `alloc_resource` since it's a (early) part of opening.
85
     *  Or just call `ExecNode::open` is alternative way.
86
     *  Then `alloc_resource` call father's after it's own business to make the progress completed, including the projections.
87
     *  In Pipeline engine: 
88
     *      PipeContext::prepare -> node::prepare
89
     *      Task::open -> StreamingOp::open -> node::alloc_resource, for sink+source splits, only open in SinkOperator.
90
     *  So in pipeline, the things directly done by open(like call child's) wouldn't be done in `open`.
91
    */
92
    // Performs any preparatory work prior to calling get_next().
93
    // Can be called repeatedly (after calls to close()).
94
    // Caller must not be holding any io buffers. This will cause deadlock.
95
    [[nodiscard]] virtual Status open(RuntimeState* state);
96
97
    // Alloc and open resource for the node
98
    // Only pipeline operator use exec node need to impl the virtual function
99
    // so only vectorized exec node need to impl
100
    [[nodiscard]] virtual Status alloc_resource(RuntimeState* state);
101
102
    // Retrieves rows and returns them via row_batch. Sets eos to true
103
    // if subsequent calls will not retrieve any more rows.
104
    // Data referenced by any tuples returned in row_batch must not be overwritten
105
    // by the callee until close() is called. The memory holding that data
106
    // can be returned via row_batch's tuple_data_pool (in which case it may be deleted
107
    // by the caller) or held on to by the callee. The row_batch, including its
108
    // tuple_data_pool, will be destroyed by the caller at some point prior to the final
109
    // close() call.
110
    // In other words, if the memory holding the tuple data will be referenced
111
    // by the callee in subsequent get_next() calls, it must *not* be attached to the
112
    // row_batch's tuple_data_pool.
113
    // Caller must not be holding any io buffers. This will cause deadlock.
114
    // TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet.
115
    [[nodiscard]] virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos);
116
    // new interface to compatible new optimizers in FE
117
    [[nodiscard]] virtual Status get_next_after_projects(
118
            RuntimeState* state, vectorized::Block* block, bool* eos,
119
            const std::function<Status(RuntimeState*, vectorized::Block*, bool*)>& fn,
120
            bool clear_data = true);
121
122
    // Used by pipeline streaming operators.
123
0
    vectorized::Block* get_clear_input_block() {
124
0
        clear_origin_block();
125
0
        return &_origin_block;
126
0
    }
127
2
    bool has_output_row_descriptor() const { return _output_row_descriptor != nullptr; }
128
    // If use projection, we should clear `_origin_block`.
129
0
    void clear_origin_block() {
130
0
        _origin_block.clear_column_data(intermediate_row_desc().num_materialized_slots());
131
0
    }
132
133
    // Emit data, both need impl with method: sink
134
    // Eg: Aggregation, Sort, Scan
135
    [[nodiscard]] virtual Status pull(RuntimeState* state, vectorized::Block* output_block,
136
0
                                      bool* eos) {
137
0
        return get_next(state, output_block, eos);
138
0
    }
139
140
    [[nodiscard]] virtual Status push(RuntimeState* state, vectorized::Block* input_block,
141
0
                                      bool eos) {
142
0
        return Status::OK();
143
0
    }
144
145
0
    bool can_read() const { return _can_read; }
146
147
0
    [[nodiscard]] virtual bool can_terminate_early() { return false; }
148
149
    // Sink Data to ExecNode to do some stock work, both need impl with method: get_result
150
    // `eos` means source is exhausted, exec node should do some finalize work
151
    // Eg: Aggregation, Sort
152
    [[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block* input_block,
153
                                      bool eos);
154
155
    // Resets the stream of row batches to be retrieved by subsequent GetNext() calls.
156
    // Clears all internal state, returning this node to the state it was in after calling
157
    // Prepare() and before calling Open(). This function must not clear memory
158
    // still owned by this node that is backing rows returned in GetNext().
159
    // Prepare() and Open() must have already been called before calling Reset().
160
    // GetNext() may have optionally been called (not necessarily until eos).
161
    // Close() must not have been called.
162
    // Reset() is not idempotent. Calling it multiple times in a row without a preceding
163
    // call to Open() is invalid.
164
    // If overridden in a subclass, must call superclass's Reset() at the end. The default
165
    // implementation calls Reset() on children.
166
    // Note that this function may be called many times (proportional to the input data),
167
    // so should be fast.
168
    [[nodiscard]] virtual Status reset(RuntimeState* state);
169
170
    // close() will get called for every exec node, regardless of what else is called and
171
    // the status of these calls (i.e. prepare() may never have been called, or
172
    // prepare()/open()/get_next() returned with an error).
173
    // close() releases all resources that were allocated in open()/get_next(), even if the
174
    // latter ended with an error. close() can be called if the node has been prepared or
175
    // the node is closed.
176
    // After calling close(), the caller calls open() again prior to subsequent calls to
177
    // get_next(). The default implementation updates runtime profile counters and calls
178
    // close() on the children. To ensure that close() is called on the entire plan tree,
179
    // each implementation should start out by calling the default implementation.
180
    virtual Status close(RuntimeState* state);
181
182
0
    void increase_ref() { ++_ref; }
183
0
    int decrease_ref() { return --_ref; }
184
185
    // Release and close resource for the node
186
    // Only pipeline operator use exec node need to impl the virtual function
187
    // so only vectorized exec node need to impl
188
    virtual void release_resource(RuntimeState* state);
189
190
    // Creates exec node tree from list of nodes contained in plan via depth-first
191
    // traversal. All nodes are placed in pool.
192
    // Returns error if 'plan' is corrupted, otherwise success.
193
    [[nodiscard]] static Status create_tree(RuntimeState* state, ObjectPool* pool,
194
                                            const TPlan& plan, const DescriptorTbl& descs,
195
                                            ExecNode** root);
196
197
    // Collect all nodes of given 'node_type' that are part of this subtree, and return in
198
    // 'nodes'.
199
    void collect_nodes(TPlanNodeType::type node_type, std::vector<ExecNode*>* nodes);
200
201
    // Collect all scan node types.
202
    void collect_scan_nodes(std::vector<ExecNode*>* nodes);
203
204
0
    virtual void prepare_for_next() {}
205
206
    // Returns a string representation in DFS order of the plan rooted at this.
207
    std::string debug_string() const;
208
209
    // recursive helper method for generating a string for Debug_string().
210
    // implementations should call debug_string(int, std::stringstream) on their children.
211
    // Input parameters:
212
    //   indentation_level: Current level in plan tree.
213
    // Output parameters:
214
    //   out: Stream to accumulate debug string.
215
    virtual void debug_string(int indentation_level, std::stringstream* out) const;
216
217
2
    int id() const { return _id; }
218
0
    TPlanNodeType::type type() const { return _type; }
219
0
    virtual const RowDescriptor& row_desc() const {
220
0
        return _output_row_descriptor ? *_output_row_descriptor : _row_descriptor;
221
0
    }
222
2
    virtual const RowDescriptor& intermediate_row_desc() const { return _row_descriptor; }
223
224
    //  input expr -> intermediate_projections[0] -> intermediate_projections[1] -> intermediate_projections[2]    ... ->     final projections         ->         output expr
225
    //  prepare        _row_descriptor          intermediate_row_desc[0]             intermediate_row_desc[1]            intermediate_row_desc.end()          _output_row_descriptor
226
227
0
    [[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) {
228
0
        if (idx == 0) {
229
0
            return intermediate_row_desc();
230
0
        }
231
0
        DCHECK((idx - 1) < _intermediate_output_row_descriptor.size());
232
0
        return _intermediate_output_row_descriptor[idx - 1];
233
0
    }
234
235
2
    [[nodiscard]] const RowDescriptor& projections_row_desc() const {
236
2
        if (_intermediate_output_row_descriptor.empty()) {
237
2
            return intermediate_row_desc();
238
2
        } else {
239
0
            return _intermediate_output_row_descriptor.back();
240
0
        }
241
2
    }
242
243
0
    int64_t rows_returned() const { return _num_rows_returned; }
244
0
    int64_t limit() const { return _limit; }
245
0
    bool reached_limit() const { return _limit != -1 && _num_rows_returned >= _limit; }
246
    /// Only use in vectorized exec engine to check whether reach limit and cut num row for block
247
    // and add block rows for profile
248
    void reached_limit(vectorized::Block* block, bool* eos);
249
0
    const std::vector<TupleId>& get_tuple_ids() const { return _tuple_ids; }
250
251
0
    RuntimeProfile* faker_runtime_profile() const { return _faker_runtime_profile.get(); }
252
12
    RuntimeProfile* runtime_profile() const { return _runtime_profile.get(); }
253
0
    RuntimeProfile::Counter* memory_used_counter() const { return _memory_used_counter; }
254
255
0
    MemTracker* mem_tracker() const { return _mem_tracker.get(); }
256
257
    virtual std::string get_name();
258
259
    // Names of counters shared by all exec nodes
260
    static const std::string ROW_THROUGHPUT_COUNTER;
261
262
0
    ExecNode* child(int i) { return _children[i]; }
263
264
0
    size_t children_count() const { return _children.size(); }
265
266
0
    std::shared_ptr<QueryStatistics> get_query_statistics() { return _query_statistics; }
267
268
protected:
269
    friend class DataSink;
270
271
    /// Release all memory of block which got from child. The block
272
    // 1. clear mem of valid column get from child, make sure child can reuse the mem
273
    // 2. delete and release the column which create by function all and other reason
274
    void release_block_memory(vectorized::Block& block, uint16_t child_idx = 0);
275
276
    /// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc
277
    Status do_projections(vectorized::Block* origin_block, vectorized::Block* output_block);
278
279
    int _id; // unique w/in single plan tree
280
    TPlanNodeType::type _type;
281
    ObjectPool* _pool = nullptr;
282
    std::vector<TupleId> _tuple_ids;
283
284
    vectorized::VExprContextSPtrs _conjuncts;
285
286
    std::vector<ExecNode*> _children;
287
    RowDescriptor _row_descriptor;
288
    vectorized::Block _origin_block;
289
290
    std::unique_ptr<RowDescriptor> _output_row_descriptor;
291
    vectorized::VExprContextSPtrs _projections;
292
293
    std::vector<RowDescriptor> _intermediate_output_row_descriptor;
294
    // Used in common subexpression elimination to compute intermediate results.
295
    std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
296
297
    /// Resource information sent from the frontend.
298
    const TBackendResourceProfile _resource_profile;
299
300
    int64_t _limit; // -1: no limit
301
    int64_t _num_rows_returned = 0;
302
303
    std::unique_ptr<RuntimeProfile> _runtime_profile;
304
305
    // Record this node memory size. it is expected that artificial guarantees are accurate,
306
    // which will providea reference for operator memory.
307
    std::unique_ptr<MemTracker> _mem_tracker;
308
309
    RuntimeProfile::Counter* _exec_timer = nullptr;
310
    RuntimeProfile::Counter* _rows_returned_counter = nullptr;
311
    RuntimeProfile::Counter* _output_bytes_counter = nullptr;
312
    RuntimeProfile::Counter* _block_count_counter = nullptr;
313
    RuntimeProfile::Counter* _rows_returned_rate = nullptr;
314
    RuntimeProfile::Counter* _memory_used_counter = nullptr;
315
    RuntimeProfile::Counter* _projection_timer = nullptr;
316
    // Account for peak memory used by this node
317
    RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
318
319
    //NOTICE: now add a faker profile, because sometimes the profile record is useless
320
    //so we want remove some counters and timers, eg: in join node, if it's broadcast_join
321
    //and shared hash table, some counter/timer about build hash table is useless,
322
    //so we could add those counter/timer in faker profile, and those will not display in web profile.
323
    std::unique_ptr<RuntimeProfile> _faker_runtime_profile =
324
            std::make_unique<RuntimeProfile>("faker profile");
325
326
    // Execution options that are determined at runtime.  This is added to the
327
    // runtime profile at close().  Examples for options logged here would be
328
    // "Codegen Enabled"
329
    std::mutex _exec_options_lock;
330
    std::string _runtime_exec_options;
331
332
    // Set to true if this is a vectorized exec node.
333
    bool _is_vec = false;
334
335
2
    bool is_closed() const { return _is_closed; }
336
337
    // Create a single exec node derived from thrift node; place exec node in 'pool'.
338
    static Status create_node(RuntimeState* state, ObjectPool* pool, const TPlanNode& tnode,
339
                              const DescriptorTbl& descs, ExecNode** node);
340
341
0
    virtual bool is_scan_node() const { return false; }
342
343
    void init_runtime_profile(const std::string& name);
344
345
    // Appends option to '_runtime_exec_options'
346
    void add_runtime_exec_option(const std::string& option);
347
348
    std::atomic<bool> _can_read = false;
349
350
    std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
351
352
    //_keep_origin is used to avoid copying during projection,
353
    // currently set to false only in the nestloop join.
354
    bool _keep_origin = true;
355
356
private:
357
    static Status create_tree_helper(RuntimeState* state, ObjectPool* pool,
358
                                     const std::vector<TPlanNode>& tnodes,
359
                                     const DescriptorTbl& descs, ExecNode* parent, int* node_idx,
360
                                     ExecNode** root);
361
362
    friend class pipeline::OperatorBase;
363
    bool _is_closed = false;
364
    bool _is_resource_released = false;
365
    std::atomic_int _ref = 0; // used by pipeline operator to release resource.
366
};
367
368
} // namespace doris