Coverage Report

Created: 2024-11-18 12:21

/root/doris/be/src/exec/exec_node.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/exec/exec-node.cpp
19
// and modified by Doris
20
21
#include "exec/exec_node.h"
22
23
#include <gen_cpp/Metrics_types.h>
24
#include <gen_cpp/PlanNodes_types.h>
25
#include <thrift/protocol/TDebugProtocol.h>
26
27
#include <map>
28
#include <memory>
29
#include <sstream>
30
#include <utility>
31
32
#include "common/compiler_util.h" // IWYU pragma: keep
33
#include "common/config.h"
34
#include "common/logging.h"
35
#include "common/object_pool.h"
36
#include "common/status.h"
37
#include "exec/scan_node.h"
38
#include "runtime/descriptors.h"
39
#include "runtime/memory/mem_tracker.h"
40
#include "runtime/runtime_state.h"
41
#include "util/debug_util.h"
42
#include "util/runtime_profile.h"
43
#include "util/uid_util.h"
44
#include "vec/columns/column_nullable.h"
45
#include "vec/core/block.h"
46
#include "vec/exec/distinct_vaggregation_node.h"
47
#include "vec/exec/join/vhash_join_node.h"
48
#include "vec/exec/join/vnested_loop_join_node.h"
49
#include "vec/exec/scan/group_commit_scan_node.h"
50
#include "vec/exec/scan/new_es_scan_node.h"
51
#include "vec/exec/scan/new_file_scan_node.h"
52
#include "vec/exec/scan/new_jdbc_scan_node.h"
53
#include "vec/exec/scan/new_odbc_scan_node.h"
54
#include "vec/exec/scan/new_olap_scan_node.h"
55
#include "vec/exec/scan/vmeta_scan_node.h"
56
#include "vec/exec/scan/vscan_node.h"
57
#include "vec/exec/vaggregation_node.h"
58
#include "vec/exec/vanalytic_eval_node.h"
59
#include "vec/exec/vassert_num_rows_node.h"
60
#include "vec/exec/vdata_gen_scan_node.h"
61
#include "vec/exec/vempty_set_node.h"
62
#include "vec/exec/vexchange_node.h"
63
#include "vec/exec/vmysql_scan_node.h" // IWYU pragma: keep
64
#include "vec/exec/vpartition_sort_node.h"
65
#include "vec/exec/vrepeat_node.h"
66
#include "vec/exec/vschema_scan_node.h"
67
#include "vec/exec/vselect_node.h"
68
#include "vec/exec/vset_operation_node.h"
69
#include "vec/exec/vsort_node.h"
70
#include "vec/exec/vtable_function_node.h"
71
#include "vec/exec/vunion_node.h"
72
#include "vec/exprs/vexpr.h"
73
#include "vec/exprs/vexpr_context.h"
74
#include "vec/utils/util.hpp"
75
76
namespace doris {
77
78
const std::string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsProducedRate";
79
80
ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
81
        : _id(tnode.node_id),
82
          _type(tnode.node_type),
83
          _pool(pool),
84
          _tuple_ids(tnode.row_tuples),
85
          _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
86
          _resource_profile(tnode.resource_profile),
87
2
          _limit(tnode.limit) {
88
2
    if (tnode.__isset.output_tuple_id) {
89
0
        _output_row_descriptor = std::make_unique<RowDescriptor>(
90
0
                descs, std::vector {tnode.output_tuple_id}, std::vector {true});
91
0
    }
92
2
    if (!tnode.intermediate_output_tuple_id_list.empty()) {
93
        // common subexpression elimination
94
0
        _intermediate_output_row_descriptor.reserve(tnode.intermediate_output_tuple_id_list.size());
95
0
        for (auto output_tuple_id : tnode.intermediate_output_tuple_id_list) {
96
0
            _intermediate_output_row_descriptor.push_back(
97
0
                    RowDescriptor(descs, std::vector {output_tuple_id}, std::vector {true}));
98
0
        }
99
0
    }
100
101
2
    _query_statistics = std::make_shared<QueryStatistics>();
102
2
}
103
104
2
ExecNode::~ExecNode() = default;
105
106
2
Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) {
107
2
    init_runtime_profile(get_name());
108
2
    if (!tnode.intermediate_output_tuple_id_list.empty()) {
109
0
        if (!tnode.__isset.output_tuple_id) {
110
0
            return Status::InternalError("no final output tuple id");
111
0
        }
112
0
        if (tnode.intermediate_output_tuple_id_list.size() !=
113
0
            tnode.intermediate_projections_list.size()) {
114
0
            return Status::InternalError(
115
0
                    "intermediate_output_tuple_id_list size:{} not match "
116
0
                    "intermediate_projections_list size:{}",
117
0
                    tnode.intermediate_output_tuple_id_list.size(),
118
0
                    tnode.intermediate_projections_list.size());
119
0
        }
120
0
    }
121
122
2
    if (tnode.__isset.vconjunct) {
123
0
        vectorized::VExprContextSPtr context;
124
0
        RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(tnode.vconjunct, context));
125
0
        _conjuncts.emplace_back(context);
126
2
    } else if (tnode.__isset.conjuncts) {
127
0
        for (const auto& conjunct : tnode.conjuncts) {
128
0
            vectorized::VExprContextSPtr context;
129
0
            RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(conjunct, context));
130
0
            _conjuncts.emplace_back(context);
131
0
        }
132
0
    }
133
134
    // create the projections expr
135
2
    if (tnode.__isset.projections) {
136
0
        DCHECK(tnode.__isset.output_tuple_id);
137
0
        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, _projections));
138
0
    }
139
2
    if (!tnode.intermediate_projections_list.empty()) {
140
0
        DCHECK(tnode.__isset.projections) << "no final projections";
141
0
        _intermediate_projections.reserve(tnode.intermediate_projections_list.size());
142
0
        for (const auto& tnode_projections : tnode.intermediate_projections_list) {
143
0
            vectorized::VExprContextSPtrs projections;
144
0
            RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections, projections));
145
0
            _intermediate_projections.push_back(projections);
146
0
        }
147
0
    }
148
2
    return Status::OK();
149
2
}
150
151
2
Status ExecNode::prepare(RuntimeState* state) {
152
2
    DCHECK(_runtime_profile.get() != nullptr);
153
2
    _exec_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "ExecTime", 1);
154
2
    _rows_returned_counter =
155
2
            ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsProduced", TUnit::UNIT, 1);
156
2
    _output_bytes_counter =
157
2
            ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BytesProduced", TUnit::BYTES, 1);
158
2
    _block_count_counter =
159
2
            ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BlocksProduced", TUnit::UNIT, 1);
160
2
    _projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime");
161
2
    _rows_returned_rate = runtime_profile()->add_derived_counter(
162
2
            ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
163
2
            [this, capture0 = runtime_profile()->total_time_counter()] {
164
0
                return RuntimeProfile::units_per_second(_rows_returned_counter, capture0);
165
0
            },
166
2
            "");
167
2
    _memory_used_counter = ADD_LABEL_COUNTER(runtime_profile(), "MemoryUsage");
168
2
    _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
169
2
            "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
170
2
    _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name());
171
172
2
    for (auto& conjunct : _conjuncts) {
173
0
        RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
174
0
    }
175
176
2
    for (int i = 0; i < _intermediate_projections.size(); i++) {
177
0
        RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state,
178
0
                                                   intermediate_row_desc(i)));
179
0
    }
180
181
2
    RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, projections_row_desc()));
182
183
2
    if (has_output_row_descriptor()) {
184
0
        RETURN_IF_ERROR(
185
0
                vectorized::VExpr::check_expr_output_type(_projections, *_output_row_descriptor));
186
0
    }
187
188
2
    for (auto& i : _children) {
189
0
        RETURN_IF_ERROR(i->prepare(state));
190
0
    }
191
2
    return Status::OK();
192
2
}
193
194
0
Status ExecNode::alloc_resource(RuntimeState* state) {
195
0
    for (auto& conjunct : _conjuncts) {
196
0
        RETURN_IF_ERROR(conjunct->open(state));
197
0
    }
198
0
    for (auto& projections : _intermediate_projections) {
199
0
        RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
200
0
    }
201
0
    RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state));
202
0
    return Status::OK();
203
0
}
204
205
0
Status ExecNode::open(RuntimeState* state) {
206
0
    return alloc_resource(state);
207
0
}
208
209
0
Status ExecNode::reset(RuntimeState* state) {
210
0
    _num_rows_returned = 0;
211
0
    for (auto& i : _children) {
212
0
        RETURN_IF_ERROR(i->reset(state));
213
0
    }
214
0
    return Status::OK();
215
0
}
216
217
2
void ExecNode::release_resource(doris::RuntimeState* state) {
218
2
    if (!_is_resource_released) {
219
2
        if (_rows_returned_counter != nullptr) {
220
2
            COUNTER_SET(_rows_returned_counter, _num_rows_returned);
221
2
        }
222
223
2
        _is_resource_released = true;
224
2
    }
225
2
    if (_peak_memory_usage_counter) {
226
2
        _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
227
2
    }
228
2
}
229
230
2
Status ExecNode::close(RuntimeState* state) {
231
2
    if (_is_closed) {
232
0
        LOG(INFO) << "query= " << print_id(state->query_id())
233
0
                  << " fragment_instance_id=" << print_id(state->fragment_instance_id())
234
0
                  << " already closed";
235
0
        return Status::OK();
236
0
    }
237
2
    _is_closed = true;
238
239
2
    Status result;
240
2
    for (auto& i : _children) {
241
0
        auto st = i->close(state);
242
0
        if (result.ok() && !st.ok()) {
243
0
            result = st;
244
0
        }
245
0
    }
246
2
    release_resource(state);
247
2
    LOG(INFO) << "query= " << print_id(state->query_id())
248
2
              << ", fragment_instance_id=" << print_id(state->fragment_instance_id())
249
2
              << ", id=" << _id << " type=" << print_plan_node_type(_type) << " closed";
250
2
    return result;
251
2
}
252
253
0
void ExecNode::add_runtime_exec_option(const std::string& str) {
254
0
    std::lock_guard<std::mutex> l(_exec_options_lock);
255
256
0
    if (_runtime_exec_options.empty()) {
257
0
        _runtime_exec_options = str;
258
0
    } else {
259
0
        _runtime_exec_options.append(", ");
260
0
        _runtime_exec_options.append(str);
261
0
    }
262
263
0
    runtime_profile()->add_info_string("ExecOption", _runtime_exec_options);
264
0
}
265
266
Status ExecNode::create_tree(RuntimeState* state, ObjectPool* pool, const TPlan& plan,
267
0
                             const DescriptorTbl& descs, ExecNode** root) {
268
0
    if (plan.nodes.empty()) {
269
0
        *root = nullptr;
270
0
        return Status::OK();
271
0
    }
272
273
0
    int node_idx = 0;
274
0
    RETURN_IF_ERROR(create_tree_helper(state, pool, plan.nodes, descs, nullptr, &node_idx, root));
275
276
0
    if (node_idx + 1 != plan.nodes.size()) {
277
        // TODO: print thrift msg for diagnostic purposes.
278
0
        return Status::InternalError(
279
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
280
0
    }
281
282
0
    return Status::OK();
283
0
}
284
285
Status ExecNode::create_tree_helper(RuntimeState* state, ObjectPool* pool,
286
                                    const std::vector<TPlanNode>& thrift_plan_nodes,
287
                                    const DescriptorTbl& descs, ExecNode* parent, int* node_idx,
288
0
                                    ExecNode** root) {
289
    // propagate error case
290
0
    if (*node_idx >= thrift_plan_nodes.size()) {
291
        // TODO: print thrift msg
292
0
        return Status::InternalError("Failed to reconstruct plan tree from thrift.");
293
0
    }
294
295
0
    const TPlanNode& cur_plan_node = thrift_plan_nodes[*node_idx];
296
0
    int num_children = cur_plan_node.num_children;
297
298
    // Step 1 Create current ExecNode according to current thrift plan node.
299
0
    ExecNode* cur_exec_node = nullptr;
300
0
    RETURN_IF_ERROR(create_node(state, pool, cur_plan_node, descs, &cur_exec_node));
301
0
    if (cur_exec_node != nullptr && state->get_query_ctx()) {
302
0
        state->get_query_ctx()->register_query_statistics(cur_exec_node->get_query_statistics());
303
0
    }
304
305
    // Step 1.1
306
    // Record current node if we have parent or record myself as root node.
307
0
    if (parent != nullptr) {
308
0
        parent->_children.push_back(cur_exec_node);
309
0
    } else {
310
0
        *root = cur_exec_node;
311
0
    }
312
313
    // Step 2
314
    // Create child ExecNode tree of current node in a recursive manner.
315
0
    for (int i = 0; i < num_children; i++) {
316
0
        ++*node_idx;
317
0
        RETURN_IF_ERROR(create_tree_helper(state, pool, thrift_plan_nodes, descs, cur_exec_node,
318
0
                                           node_idx, nullptr));
319
320
        // we are expecting a child, but have used all nodes
321
        // this means we have been given a bad tree and must fail
322
0
        if (*node_idx >= thrift_plan_nodes.size()) {
323
            // TODO: print thrift msg
324
0
            return Status::InternalError("Failed to reconstruct plan tree from thrift.");
325
0
        }
326
0
    }
327
328
    // Step 3 Init myself after sub ExecNode tree is created and initialized
329
0
    RETURN_IF_ERROR(cur_exec_node->init(cur_plan_node, state));
330
331
    // build up tree of profiles; add children >0 first, so that when we print
332
    // the profile, child 0 is printed last (makes the output more readable)
333
0
    for (int i = 1; i < cur_exec_node->_children.size(); ++i) {
334
0
        cur_exec_node->runtime_profile()->add_child(cur_exec_node->_children[i]->runtime_profile(),
335
0
                                                    true, nullptr);
336
0
    }
337
338
0
    if (!cur_exec_node->_children.empty()) {
339
0
        cur_exec_node->runtime_profile()->add_child(cur_exec_node->_children[0]->runtime_profile(),
340
0
                                                    true, nullptr);
341
0
    }
342
343
0
    return Status::OK();
344
0
}
345
346
// NOLINTBEGIN(readability-function-size)
347
Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanNode& tnode,
348
0
                             const DescriptorTbl& descs, ExecNode** node) {
349
0
    VLOG_CRITICAL << "tnode:\n" << apache::thrift::ThriftDebugString(tnode);
350
351
0
    switch (tnode.node_type) {
352
0
    case TPlanNodeType::MYSQL_SCAN_NODE:
353
0
#ifdef DORIS_WITH_MYSQL
354
0
        *node = pool->add(new vectorized::VMysqlScanNode(pool, tnode, descs));
355
0
        return Status::OK();
356
#else
357
        return Status::InternalError(
358
                "Don't support MySQL table, you should rebuild Doris with WITH_MYSQL option ON");
359
#endif
360
0
    case TPlanNodeType::ODBC_SCAN_NODE:
361
0
        *node = pool->add(new vectorized::NewOdbcScanNode(pool, tnode, descs));
362
0
        return Status::OK();
363
364
0
    case TPlanNodeType::JDBC_SCAN_NODE:
365
0
        if (config::enable_java_support) {
366
0
            *node = pool->add(new vectorized::NewJdbcScanNode(pool, tnode, descs));
367
0
            return Status::OK();
368
0
        } else {
369
0
            return Status::InternalError(
370
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
371
0
                    "to true and restart be.");
372
0
        }
373
374
0
    case TPlanNodeType::ES_HTTP_SCAN_NODE:
375
0
        *node = pool->add(new vectorized::NewEsScanNode(pool, tnode, descs));
376
0
        return Status::OK();
377
378
0
    case TPlanNodeType::SCHEMA_SCAN_NODE:
379
0
        *node = pool->add(new vectorized::VSchemaScanNode(pool, tnode, descs));
380
0
        return Status::OK();
381
382
0
    case TPlanNodeType::META_SCAN_NODE:
383
0
        *node = pool->add(new vectorized::VMetaScanNode(pool, tnode, descs));
384
0
        return Status::OK();
385
386
0
    case TPlanNodeType::OLAP_SCAN_NODE:
387
0
        *node = pool->add(new vectorized::NewOlapScanNode(pool, tnode, descs));
388
0
        return Status::OK();
389
390
0
    case TPlanNodeType::AGGREGATION_NODE:
391
0
        if (tnode.agg_node.aggregate_functions.empty() && state->enable_pipeline_exec()) {
392
0
            *node = pool->add(new vectorized::DistinctAggregationNode(pool, tnode, descs));
393
0
        } else {
394
0
            *node = pool->add(new vectorized::AggregationNode(pool, tnode, descs));
395
0
        }
396
0
        return Status::OK();
397
398
0
    case TPlanNodeType::HASH_JOIN_NODE:
399
0
        *node = pool->add(new vectorized::HashJoinNode(pool, tnode, descs));
400
0
        return Status::OK();
401
402
0
    case TPlanNodeType::CROSS_JOIN_NODE:
403
0
        *node = pool->add(new vectorized::VNestedLoopJoinNode(pool, tnode, descs));
404
0
        return Status::OK();
405
406
0
    case TPlanNodeType::EMPTY_SET_NODE:
407
0
        *node = pool->add(new vectorized::VEmptySetNode(pool, tnode, descs));
408
0
        return Status::OK();
409
410
0
    case TPlanNodeType::EXCHANGE_NODE:
411
0
        *node = pool->add(new doris::vectorized::VExchangeNode(pool, tnode, descs));
412
0
        return Status::OK();
413
414
0
    case TPlanNodeType::SELECT_NODE:
415
0
        *node = pool->add(new doris::vectorized::VSelectNode(pool, tnode, descs));
416
0
        return Status::OK();
417
418
0
    case TPlanNodeType::SORT_NODE:
419
0
        *node = pool->add(new vectorized::VSortNode(pool, tnode, descs));
420
0
        return Status::OK();
421
422
0
    case TPlanNodeType::ANALYTIC_EVAL_NODE:
423
0
        *node = pool->add(new vectorized::VAnalyticEvalNode(pool, tnode, descs));
424
0
        return Status::OK();
425
426
0
    case TPlanNodeType::MERGE_NODE:
427
0
        RETURN_ERROR_IF_NON_VEC;
428
429
0
    case TPlanNodeType::UNION_NODE:
430
0
        *node = pool->add(new vectorized::VUnionNode(pool, tnode, descs));
431
0
        return Status::OK();
432
433
0
    case TPlanNodeType::INTERSECT_NODE:
434
0
        *node = pool->add(new vectorized::VIntersectNode(pool, tnode, descs));
435
0
        return Status::OK();
436
437
0
    case TPlanNodeType::EXCEPT_NODE:
438
0
        *node = pool->add(new vectorized::VExceptNode(pool, tnode, descs));
439
0
        return Status::OK();
440
441
0
    case TPlanNodeType::FILE_SCAN_NODE:
442
0
        *node = pool->add(new vectorized::NewFileScanNode(pool, tnode, descs));
443
0
        return Status::OK();
444
445
0
    case TPlanNodeType::REPEAT_NODE:
446
0
        *node = pool->add(new vectorized::VRepeatNode(pool, tnode, descs));
447
0
        return Status::OK();
448
449
0
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE:
450
0
        *node = pool->add(new vectorized::VAssertNumRowsNode(pool, tnode, descs));
451
0
        return Status::OK();
452
453
0
    case TPlanNodeType::TABLE_FUNCTION_NODE:
454
0
        *node = pool->add(new vectorized::VTableFunctionNode(pool, tnode, descs));
455
0
        return Status::OK();
456
457
0
    case TPlanNodeType::DATA_GEN_SCAN_NODE:
458
0
        *node = pool->add(new vectorized::VDataGenFunctionScanNode(pool, tnode, descs));
459
0
        return Status::OK();
460
461
0
    case TPlanNodeType::PARTITION_SORT_NODE:
462
0
        *node = pool->add(new vectorized::VPartitionSortNode(pool, tnode, descs));
463
0
        return Status::OK();
464
465
0
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE:
466
0
#ifndef NDEBUG
467
0
        DCHECK(state->get_query_ctx() != nullptr);
468
0
        state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
469
0
#endif
470
0
        *node = pool->add(new vectorized::GroupCommitScanNode(pool, tnode, descs));
471
0
        return Status::OK();
472
473
0
    default:
474
0
        auto i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
475
0
        const char* str = "unknown node type";
476
477
0
        if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) {
478
0
            str = i->second;
479
0
        }
480
481
0
        std::stringstream error_msg;
482
0
        error_msg << str << " not implemented";
483
0
        return Status::InternalError(error_msg.str());
484
0
    }
485
486
0
    return Status::OK();
487
0
}
488
// NOLINTEND(readability-function-size)
489
490
0
std::string ExecNode::debug_string() const {
491
0
    std::stringstream out;
492
0
    this->debug_string(0, &out);
493
0
    return out.str();
494
0
}
495
496
0
void ExecNode::debug_string(int indentation_level, std::stringstream* out) const {
497
0
    *out << " id=" << _id;
498
0
    *out << " type=" << print_plan_node_type(_type);
499
0
    *out << " tuple_ids=[";
500
0
    for (auto id : _tuple_ids) {
501
0
        *out << id << ", ";
502
0
    }
503
0
    *out << "]";
504
505
0
    for (auto* i : _children) {
506
0
        *out << "\n";
507
0
        i->debug_string(indentation_level + 1, out);
508
0
    }
509
0
}
510
511
0
void ExecNode::collect_nodes(TPlanNodeType::type node_type, std::vector<ExecNode*>* nodes) {
512
0
    if (_type == node_type) {
513
0
        nodes->push_back(this);
514
0
    }
515
516
0
    for (auto& i : _children) {
517
0
        i->collect_nodes(node_type, nodes);
518
0
    }
519
0
}
520
521
0
void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) {
522
0
    collect_nodes(TPlanNodeType::OLAP_SCAN_NODE, nodes);
523
0
    collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes);
524
0
    collect_nodes(TPlanNodeType::DATA_GEN_SCAN_NODE, nodes);
525
0
    collect_nodes(TPlanNodeType::FILE_SCAN_NODE, nodes);
526
0
    collect_nodes(TPlanNodeType::META_SCAN_NODE, nodes);
527
0
    collect_nodes(TPlanNodeType::JDBC_SCAN_NODE, nodes);
528
0
    collect_nodes(TPlanNodeType::ODBC_SCAN_NODE, nodes);
529
0
}
530
531
2
void ExecNode::init_runtime_profile(const std::string& name) {
532
2
    std::stringstream ss;
533
2
    ss << name << " (id=" << _id << ")";
534
2
    _runtime_profile = std::make_unique<RuntimeProfile>(ss.str());
535
2
    _runtime_profile->set_metadata(_id);
536
2
}
537
538
0
void ExecNode::release_block_memory(vectorized::Block& block, uint16_t child_idx) {
539
0
    DCHECK(child_idx < _children.size());
540
0
    block.clear_column_data(child(child_idx)->row_desc().num_materialized_slots());
541
0
}
542
543
0
void ExecNode::reached_limit(vectorized::Block* block, bool* eos) {
544
0
    if (_limit != -1 and _num_rows_returned + block->rows() >= _limit) {
545
0
        block->set_num_rows(_limit - _num_rows_returned);
546
0
        *eos = true;
547
0
    }
548
549
0
    _num_rows_returned += block->rows();
550
0
    COUNTER_SET(_rows_returned_counter, _num_rows_returned);
551
0
}
552
553
0
Status ExecNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
554
0
    return Status::NotSupported("Not Implemented get block");
555
0
}
556
557
0
std::string ExecNode::get_name() {
558
0
    return "V" + print_plan_node_type(_type);
559
0
}
560
561
0
Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) {
562
0
    SCOPED_TIMER(_exec_timer);
563
0
    SCOPED_TIMER(_projection_timer);
564
0
    const size_t rows = origin_block->rows();
565
0
    if (rows == 0) {
566
0
        return Status::OK();
567
0
    }
568
0
    vectorized::Block input_block = *origin_block;
569
570
0
    std::vector<int> result_column_ids;
571
0
    for (auto& projections : _intermediate_projections) {
572
0
        result_column_ids.resize(projections.size());
573
0
        for (int i = 0; i < projections.size(); i++) {
574
0
            RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i]));
575
0
        }
576
0
        input_block.shuffle_columns(result_column_ids);
577
0
    }
578
579
0
    DCHECK_EQ(rows, input_block.rows());
580
0
    auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
581
0
        if (to->is_nullable() && !from->is_nullable()) {
582
0
            if (_keep_origin || !from->is_exclusive()) {
583
0
                auto& null_column = reinterpret_cast<vectorized::ColumnNullable&>(*to);
584
0
                null_column.get_nested_column().insert_range_from(*from, 0, rows);
585
0
                null_column.get_null_map_column().get_data().resize_fill(rows, 0);
586
0
            } else {
587
0
                to = make_nullable(from, false)->assume_mutable();
588
0
            }
589
0
        } else {
590
0
            if (_keep_origin || !from->is_exclusive()) {
591
0
                to->insert_range_from(*from, 0, rows);
592
0
            } else {
593
0
                to = from->assume_mutable();
594
0
            }
595
0
        }
596
0
    };
597
598
0
    using namespace vectorized;
599
0
    MutableBlock mutable_block =
600
0
            VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor);
601
602
0
    auto& mutable_columns = mutable_block.mutable_columns();
603
604
0
    DCHECK_EQ(mutable_columns.size(), _projections.size());
605
606
0
    for (int i = 0; i < mutable_columns.size(); ++i) {
607
0
        auto result_column_id = -1;
608
0
        RETURN_IF_ERROR(_projections[i]->execute(&input_block, &result_column_id));
609
0
        auto column_ptr = input_block.get_by_position(result_column_id)
610
0
                                  .column->convert_to_full_column_if_const();
611
        //TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
612
0
        insert_column_datas(mutable_columns[i], column_ptr, rows);
613
0
    }
614
0
    DCHECK(mutable_block.rows() == rows);
615
0
    output_block->set_columns(std::move(mutable_columns));
616
617
0
    return Status::OK();
618
0
}
619
620
Status ExecNode::get_next_after_projects(
621
        RuntimeState* state, vectorized::Block* block, bool* eos,
622
        const std::function<Status(RuntimeState*, vectorized::Block*, bool*)>& func,
623
0
        bool clear_data) {
624
0
    if (_output_row_descriptor) {
625
0
        if (clear_data) {
626
0
            clear_origin_block();
627
0
        }
628
0
        RETURN_IF_ERROR(func(state, &_origin_block, eos));
629
0
        RETURN_IF_ERROR(do_projections(&_origin_block, block));
630
0
    } else {
631
0
        RETURN_IF_ERROR(func(state, block, eos));
632
0
    }
633
0
    _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
634
635
0
    if (block && !block->empty()) {
636
0
        COUNTER_UPDATE(_output_bytes_counter, block->allocated_bytes());
637
0
        COUNTER_UPDATE(_block_count_counter, 1);
638
0
    }
639
0
    return Status::OK();
640
0
}
641
642
0
Status ExecNode::sink(RuntimeState* state, vectorized::Block* input_block, bool eos) {
643
0
    return Status::NotSupported("{} not implements sink", get_name());
644
0
}
645
646
} // namespace doris