/root/doris/be/src/exec/exec_node.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // This file is copied from |
18 | | // https://github.com/apache/impala/blob/branch-2.9.0/be/src/exec/exec-node.cpp |
19 | | // and modified by Doris |
20 | | |
21 | | #include "exec/exec_node.h" |
22 | | |
23 | | #include <gen_cpp/Metrics_types.h> |
24 | | #include <gen_cpp/PlanNodes_types.h> |
25 | | #include <thrift/protocol/TDebugProtocol.h> |
26 | | |
27 | | #include <map> |
28 | | #include <memory> |
29 | | #include <sstream> |
30 | | #include <utility> |
31 | | |
32 | | #include "common/compiler_util.h" // IWYU pragma: keep |
33 | | #include "common/config.h" |
34 | | #include "common/logging.h" |
35 | | #include "common/object_pool.h" |
36 | | #include "common/status.h" |
37 | | #include "exec/scan_node.h" |
38 | | #include "runtime/descriptors.h" |
39 | | #include "runtime/memory/mem_tracker.h" |
40 | | #include "runtime/runtime_state.h" |
41 | | #include "util/debug_util.h" |
42 | | #include "util/runtime_profile.h" |
43 | | #include "util/uid_util.h" |
44 | | #include "vec/columns/column_nullable.h" |
45 | | #include "vec/core/block.h" |
46 | | #include "vec/exec/distinct_vaggregation_node.h" |
47 | | #include "vec/exec/join/vhash_join_node.h" |
48 | | #include "vec/exec/join/vnested_loop_join_node.h" |
49 | | #include "vec/exec/scan/group_commit_scan_node.h" |
50 | | #include "vec/exec/scan/new_es_scan_node.h" |
51 | | #include "vec/exec/scan/new_file_scan_node.h" |
52 | | #include "vec/exec/scan/new_jdbc_scan_node.h" |
53 | | #include "vec/exec/scan/new_odbc_scan_node.h" |
54 | | #include "vec/exec/scan/new_olap_scan_node.h" |
55 | | #include "vec/exec/scan/vmeta_scan_node.h" |
56 | | #include "vec/exec/scan/vscan_node.h" |
57 | | #include "vec/exec/vaggregation_node.h" |
58 | | #include "vec/exec/vanalytic_eval_node.h" |
59 | | #include "vec/exec/vassert_num_rows_node.h" |
60 | | #include "vec/exec/vdata_gen_scan_node.h" |
61 | | #include "vec/exec/vempty_set_node.h" |
62 | | #include "vec/exec/vexchange_node.h" |
63 | | #include "vec/exec/vmysql_scan_node.h" // IWYU pragma: keep |
64 | | #include "vec/exec/vpartition_sort_node.h" |
65 | | #include "vec/exec/vrepeat_node.h" |
66 | | #include "vec/exec/vschema_scan_node.h" |
67 | | #include "vec/exec/vselect_node.h" |
68 | | #include "vec/exec/vset_operation_node.h" |
69 | | #include "vec/exec/vsort_node.h" |
70 | | #include "vec/exec/vtable_function_node.h" |
71 | | #include "vec/exec/vunion_node.h" |
72 | | #include "vec/exprs/vexpr.h" |
73 | | #include "vec/exprs/vexpr_context.h" |
74 | | #include "vec/utils/util.hpp" |
75 | | |
76 | | namespace doris { |
77 | | |
78 | | const std::string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsProducedRate"; |
79 | | |
80 | | ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) |
81 | | : _id(tnode.node_id), |
82 | | _type(tnode.node_type), |
83 | | _pool(pool), |
84 | | _tuple_ids(tnode.row_tuples), |
85 | | _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), |
86 | | _resource_profile(tnode.resource_profile), |
87 | 2 | _limit(tnode.limit) { |
88 | 2 | if (tnode.__isset.output_tuple_id) { |
89 | 0 | _output_row_descriptor = std::make_unique<RowDescriptor>( |
90 | 0 | descs, std::vector {tnode.output_tuple_id}, std::vector {true}); |
91 | 0 | } |
92 | 2 | if (!tnode.intermediate_output_tuple_id_list.empty()) { |
93 | | // common subexpression elimination |
94 | 0 | _intermediate_output_row_descriptor.reserve(tnode.intermediate_output_tuple_id_list.size()); |
95 | 0 | for (auto output_tuple_id : tnode.intermediate_output_tuple_id_list) { |
96 | 0 | _intermediate_output_row_descriptor.push_back( |
97 | 0 | RowDescriptor(descs, std::vector {output_tuple_id}, std::vector {true})); |
98 | 0 | } |
99 | 0 | } |
100 | | |
101 | 2 | _query_statistics = std::make_shared<QueryStatistics>(); |
102 | 2 | } |
103 | | |
104 | 2 | ExecNode::~ExecNode() = default; |
105 | | |
106 | 2 | Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) { |
107 | 2 | init_runtime_profile(get_name()); |
108 | 2 | if (!tnode.intermediate_output_tuple_id_list.empty()) { |
109 | 0 | if (!tnode.__isset.output_tuple_id) { |
110 | 0 | return Status::InternalError("no final output tuple id"); |
111 | 0 | } |
112 | 0 | if (tnode.intermediate_output_tuple_id_list.size() != |
113 | 0 | tnode.intermediate_projections_list.size()) { |
114 | 0 | return Status::InternalError( |
115 | 0 | "intermediate_output_tuple_id_list size:{} not match " |
116 | 0 | "intermediate_projections_list size:{}", |
117 | 0 | tnode.intermediate_output_tuple_id_list.size(), |
118 | 0 | tnode.intermediate_projections_list.size()); |
119 | 0 | } |
120 | 0 | } |
121 | | |
122 | 2 | if (tnode.__isset.vconjunct) { |
123 | 0 | vectorized::VExprContextSPtr context; |
124 | 0 | RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(tnode.vconjunct, context)); |
125 | 0 | _conjuncts.emplace_back(context); |
126 | 2 | } else if (tnode.__isset.conjuncts) { |
127 | 0 | for (const auto& conjunct : tnode.conjuncts) { |
128 | 0 | vectorized::VExprContextSPtr context; |
129 | 0 | RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(conjunct, context)); |
130 | 0 | _conjuncts.emplace_back(context); |
131 | 0 | } |
132 | 0 | } |
133 | | |
134 | | // create the projections expr |
135 | 2 | if (tnode.__isset.projections) { |
136 | 0 | DCHECK(tnode.__isset.output_tuple_id); |
137 | 0 | RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, _projections)); |
138 | 0 | } |
139 | 2 | if (!tnode.intermediate_projections_list.empty()) { |
140 | 0 | DCHECK(tnode.__isset.projections) << "no final projections"; |
141 | 0 | _intermediate_projections.reserve(tnode.intermediate_projections_list.size()); |
142 | 0 | for (const auto& tnode_projections : tnode.intermediate_projections_list) { |
143 | 0 | vectorized::VExprContextSPtrs projections; |
144 | 0 | RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections, projections)); |
145 | 0 | _intermediate_projections.push_back(projections); |
146 | 0 | } |
147 | 0 | } |
148 | 2 | return Status::OK(); |
149 | 2 | } |
150 | | |
151 | 2 | Status ExecNode::prepare(RuntimeState* state) { |
152 | 2 | DCHECK(_runtime_profile.get() != nullptr); |
153 | 2 | _exec_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "ExecTime", 1); |
154 | 2 | _rows_returned_counter = |
155 | 2 | ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsProduced", TUnit::UNIT, 1); |
156 | 2 | _output_bytes_counter = |
157 | 2 | ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BytesProduced", TUnit::BYTES, 1); |
158 | 2 | _block_count_counter = |
159 | 2 | ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BlocksProduced", TUnit::UNIT, 1); |
160 | 2 | _projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime"); |
161 | 2 | _rows_returned_rate = runtime_profile()->add_derived_counter( |
162 | 2 | ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND, |
163 | 2 | [this, capture0 = runtime_profile()->total_time_counter()] { |
164 | 0 | return RuntimeProfile::units_per_second(_rows_returned_counter, capture0); |
165 | 0 | }, |
166 | 2 | ""); |
167 | 2 | _memory_used_counter = ADD_LABEL_COUNTER(runtime_profile(), "MemoryUsage"); |
168 | 2 | _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter( |
169 | 2 | "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage"); |
170 | 2 | _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name()); |
171 | | |
172 | 2 | for (auto& conjunct : _conjuncts) { |
173 | 0 | RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc())); |
174 | 0 | } |
175 | | |
176 | 2 | for (int i = 0; i < _intermediate_projections.size(); i++) { |
177 | 0 | RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state, |
178 | 0 | intermediate_row_desc(i))); |
179 | 0 | } |
180 | | |
181 | 2 | RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, projections_row_desc())); |
182 | | |
183 | 2 | if (has_output_row_descriptor()) { |
184 | 0 | RETURN_IF_ERROR( |
185 | 0 | vectorized::VExpr::check_expr_output_type(_projections, *_output_row_descriptor)); |
186 | 0 | } |
187 | | |
188 | 2 | for (auto& i : _children) { |
189 | 0 | RETURN_IF_ERROR(i->prepare(state)); |
190 | 0 | } |
191 | 2 | return Status::OK(); |
192 | 2 | } |
193 | | |
194 | 0 | Status ExecNode::alloc_resource(RuntimeState* state) { |
195 | 0 | for (auto& conjunct : _conjuncts) { |
196 | 0 | RETURN_IF_ERROR(conjunct->open(state)); |
197 | 0 | } |
198 | 0 | for (auto& projections : _intermediate_projections) { |
199 | 0 | RETURN_IF_ERROR(vectorized::VExpr::open(projections, state)); |
200 | 0 | } |
201 | 0 | RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state)); |
202 | 0 | return Status::OK(); |
203 | 0 | } |
204 | | |
205 | 0 | Status ExecNode::open(RuntimeState* state) { |
206 | 0 | return alloc_resource(state); |
207 | 0 | } |
208 | | |
209 | 0 | Status ExecNode::reset(RuntimeState* state) { |
210 | 0 | _num_rows_returned = 0; |
211 | 0 | for (auto& i : _children) { |
212 | 0 | RETURN_IF_ERROR(i->reset(state)); |
213 | 0 | } |
214 | 0 | return Status::OK(); |
215 | 0 | } |
216 | | |
217 | 2 | void ExecNode::release_resource(doris::RuntimeState* state) { |
218 | 2 | if (!_is_resource_released) { |
219 | 2 | if (_rows_returned_counter != nullptr) { |
220 | 2 | COUNTER_SET(_rows_returned_counter, _num_rows_returned); |
221 | 2 | } |
222 | | |
223 | 2 | _is_resource_released = true; |
224 | 2 | } |
225 | 2 | if (_peak_memory_usage_counter) { |
226 | 2 | _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); |
227 | 2 | } |
228 | 2 | } |
229 | | |
230 | 2 | Status ExecNode::close(RuntimeState* state) { |
231 | 2 | if (_is_closed) { |
232 | 0 | LOG(INFO) << "query= " << print_id(state->query_id()) |
233 | 0 | << " fragment_instance_id=" << print_id(state->fragment_instance_id()) |
234 | 0 | << " already closed"; |
235 | 0 | return Status::OK(); |
236 | 0 | } |
237 | 2 | _is_closed = true; |
238 | | |
239 | 2 | Status result; |
240 | 2 | for (auto& i : _children) { |
241 | 0 | auto st = i->close(state); |
242 | 0 | if (result.ok() && !st.ok()) { |
243 | 0 | result = st; |
244 | 0 | } |
245 | 0 | } |
246 | 2 | release_resource(state); |
247 | 2 | LOG(INFO) << "query= " << print_id(state->query_id()) |
248 | 2 | << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) |
249 | 2 | << ", id=" << _id << " type=" << print_plan_node_type(_type) << " closed"; |
250 | 2 | return result; |
251 | 2 | } |
252 | | |
253 | 0 | void ExecNode::add_runtime_exec_option(const std::string& str) { |
254 | 0 | std::lock_guard<std::mutex> l(_exec_options_lock); |
255 | |
|
256 | 0 | if (_runtime_exec_options.empty()) { |
257 | 0 | _runtime_exec_options = str; |
258 | 0 | } else { |
259 | 0 | _runtime_exec_options.append(", "); |
260 | 0 | _runtime_exec_options.append(str); |
261 | 0 | } |
262 | |
|
263 | 0 | runtime_profile()->add_info_string("ExecOption", _runtime_exec_options); |
264 | 0 | } |
265 | | |
266 | | Status ExecNode::create_tree(RuntimeState* state, ObjectPool* pool, const TPlan& plan, |
267 | 0 | const DescriptorTbl& descs, ExecNode** root) { |
268 | 0 | if (plan.nodes.empty()) { |
269 | 0 | *root = nullptr; |
270 | 0 | return Status::OK(); |
271 | 0 | } |
272 | | |
273 | 0 | int node_idx = 0; |
274 | 0 | RETURN_IF_ERROR(create_tree_helper(state, pool, plan.nodes, descs, nullptr, &node_idx, root)); |
275 | | |
276 | 0 | if (node_idx + 1 != plan.nodes.size()) { |
277 | | // TODO: print thrift msg for diagnostic purposes. |
278 | 0 | return Status::InternalError( |
279 | 0 | "Plan tree only partially reconstructed. Not all thrift nodes were used."); |
280 | 0 | } |
281 | | |
282 | 0 | return Status::OK(); |
283 | 0 | } |
284 | | |
285 | | Status ExecNode::create_tree_helper(RuntimeState* state, ObjectPool* pool, |
286 | | const std::vector<TPlanNode>& thrift_plan_nodes, |
287 | | const DescriptorTbl& descs, ExecNode* parent, int* node_idx, |
288 | 0 | ExecNode** root) { |
289 | | // propagate error case |
290 | 0 | if (*node_idx >= thrift_plan_nodes.size()) { |
291 | | // TODO: print thrift msg |
292 | 0 | return Status::InternalError("Failed to reconstruct plan tree from thrift."); |
293 | 0 | } |
294 | | |
295 | 0 | const TPlanNode& cur_plan_node = thrift_plan_nodes[*node_idx]; |
296 | 0 | int num_children = cur_plan_node.num_children; |
297 | | |
298 | | // Step 1 Create current ExecNode according to current thrift plan node. |
299 | 0 | ExecNode* cur_exec_node = nullptr; |
300 | 0 | RETURN_IF_ERROR(create_node(state, pool, cur_plan_node, descs, &cur_exec_node)); |
301 | 0 | if (cur_exec_node != nullptr && state->get_query_ctx()) { |
302 | 0 | state->get_query_ctx()->register_query_statistics(cur_exec_node->get_query_statistics()); |
303 | 0 | } |
304 | | |
305 | | // Step 1.1 |
306 | | // Record current node if we have parent or record myself as root node. |
307 | 0 | if (parent != nullptr) { |
308 | 0 | parent->_children.push_back(cur_exec_node); |
309 | 0 | } else { |
310 | 0 | *root = cur_exec_node; |
311 | 0 | } |
312 | | |
313 | | // Step 2 |
314 | | // Create child ExecNode tree of current node in a recursive manner. |
315 | 0 | for (int i = 0; i < num_children; i++) { |
316 | 0 | ++*node_idx; |
317 | 0 | RETURN_IF_ERROR(create_tree_helper(state, pool, thrift_plan_nodes, descs, cur_exec_node, |
318 | 0 | node_idx, nullptr)); |
319 | | |
320 | | // we are expecting a child, but have used all nodes |
321 | | // this means we have been given a bad tree and must fail |
322 | 0 | if (*node_idx >= thrift_plan_nodes.size()) { |
323 | | // TODO: print thrift msg |
324 | 0 | return Status::InternalError("Failed to reconstruct plan tree from thrift."); |
325 | 0 | } |
326 | 0 | } |
327 | | |
328 | | // Step 3 Init myself after sub ExecNode tree is created and initialized |
329 | 0 | RETURN_IF_ERROR(cur_exec_node->init(cur_plan_node, state)); |
330 | | |
331 | | // build up tree of profiles; add children >0 first, so that when we print |
332 | | // the profile, child 0 is printed last (makes the output more readable) |
333 | 0 | for (int i = 1; i < cur_exec_node->_children.size(); ++i) { |
334 | 0 | cur_exec_node->runtime_profile()->add_child(cur_exec_node->_children[i]->runtime_profile(), |
335 | 0 | true, nullptr); |
336 | 0 | } |
337 | |
|
338 | 0 | if (!cur_exec_node->_children.empty()) { |
339 | 0 | cur_exec_node->runtime_profile()->add_child(cur_exec_node->_children[0]->runtime_profile(), |
340 | 0 | true, nullptr); |
341 | 0 | } |
342 | |
|
343 | 0 | return Status::OK(); |
344 | 0 | } |
345 | | |
346 | | // NOLINTBEGIN(readability-function-size) |
347 | | Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanNode& tnode, |
348 | 0 | const DescriptorTbl& descs, ExecNode** node) { |
349 | 0 | VLOG_CRITICAL << "tnode:\n" << apache::thrift::ThriftDebugString(tnode); |
350 | |
|
351 | 0 | switch (tnode.node_type) { |
352 | 0 | case TPlanNodeType::MYSQL_SCAN_NODE: |
353 | 0 | #ifdef DORIS_WITH_MYSQL |
354 | 0 | *node = pool->add(new vectorized::VMysqlScanNode(pool, tnode, descs)); |
355 | 0 | return Status::OK(); |
356 | | #else |
357 | | return Status::InternalError( |
358 | | "Don't support MySQL table, you should rebuild Doris with WITH_MYSQL option ON"); |
359 | | #endif |
360 | 0 | case TPlanNodeType::ODBC_SCAN_NODE: |
361 | 0 | *node = pool->add(new vectorized::NewOdbcScanNode(pool, tnode, descs)); |
362 | 0 | return Status::OK(); |
363 | | |
364 | 0 | case TPlanNodeType::JDBC_SCAN_NODE: |
365 | 0 | if (config::enable_java_support) { |
366 | 0 | *node = pool->add(new vectorized::NewJdbcScanNode(pool, tnode, descs)); |
367 | 0 | return Status::OK(); |
368 | 0 | } else { |
369 | 0 | return Status::InternalError( |
370 | 0 | "Jdbc scan node is disabled, you can change be config enable_java_support " |
371 | 0 | "to true and restart be."); |
372 | 0 | } |
373 | | |
374 | 0 | case TPlanNodeType::ES_HTTP_SCAN_NODE: |
375 | 0 | *node = pool->add(new vectorized::NewEsScanNode(pool, tnode, descs)); |
376 | 0 | return Status::OK(); |
377 | | |
378 | 0 | case TPlanNodeType::SCHEMA_SCAN_NODE: |
379 | 0 | *node = pool->add(new vectorized::VSchemaScanNode(pool, tnode, descs)); |
380 | 0 | return Status::OK(); |
381 | | |
382 | 0 | case TPlanNodeType::META_SCAN_NODE: |
383 | 0 | *node = pool->add(new vectorized::VMetaScanNode(pool, tnode, descs)); |
384 | 0 | return Status::OK(); |
385 | | |
386 | 0 | case TPlanNodeType::OLAP_SCAN_NODE: |
387 | 0 | *node = pool->add(new vectorized::NewOlapScanNode(pool, tnode, descs)); |
388 | 0 | return Status::OK(); |
389 | | |
390 | 0 | case TPlanNodeType::AGGREGATION_NODE: |
391 | 0 | if (tnode.agg_node.aggregate_functions.empty() && state->enable_pipeline_exec()) { |
392 | 0 | *node = pool->add(new vectorized::DistinctAggregationNode(pool, tnode, descs)); |
393 | 0 | } else { |
394 | 0 | *node = pool->add(new vectorized::AggregationNode(pool, tnode, descs)); |
395 | 0 | } |
396 | 0 | return Status::OK(); |
397 | | |
398 | 0 | case TPlanNodeType::HASH_JOIN_NODE: |
399 | 0 | *node = pool->add(new vectorized::HashJoinNode(pool, tnode, descs)); |
400 | 0 | return Status::OK(); |
401 | | |
402 | 0 | case TPlanNodeType::CROSS_JOIN_NODE: |
403 | 0 | *node = pool->add(new vectorized::VNestedLoopJoinNode(pool, tnode, descs)); |
404 | 0 | return Status::OK(); |
405 | | |
406 | 0 | case TPlanNodeType::EMPTY_SET_NODE: |
407 | 0 | *node = pool->add(new vectorized::VEmptySetNode(pool, tnode, descs)); |
408 | 0 | return Status::OK(); |
409 | | |
410 | 0 | case TPlanNodeType::EXCHANGE_NODE: |
411 | 0 | *node = pool->add(new doris::vectorized::VExchangeNode(pool, tnode, descs)); |
412 | 0 | return Status::OK(); |
413 | | |
414 | 0 | case TPlanNodeType::SELECT_NODE: |
415 | 0 | *node = pool->add(new doris::vectorized::VSelectNode(pool, tnode, descs)); |
416 | 0 | return Status::OK(); |
417 | | |
418 | 0 | case TPlanNodeType::SORT_NODE: |
419 | 0 | *node = pool->add(new vectorized::VSortNode(pool, tnode, descs)); |
420 | 0 | return Status::OK(); |
421 | | |
422 | 0 | case TPlanNodeType::ANALYTIC_EVAL_NODE: |
423 | 0 | *node = pool->add(new vectorized::VAnalyticEvalNode(pool, tnode, descs)); |
424 | 0 | return Status::OK(); |
425 | | |
426 | 0 | case TPlanNodeType::MERGE_NODE: |
427 | 0 | RETURN_ERROR_IF_NON_VEC; |
428 | |
|
429 | 0 | case TPlanNodeType::UNION_NODE: |
430 | 0 | *node = pool->add(new vectorized::VUnionNode(pool, tnode, descs)); |
431 | 0 | return Status::OK(); |
432 | | |
433 | 0 | case TPlanNodeType::INTERSECT_NODE: |
434 | 0 | *node = pool->add(new vectorized::VIntersectNode(pool, tnode, descs)); |
435 | 0 | return Status::OK(); |
436 | | |
437 | 0 | case TPlanNodeType::EXCEPT_NODE: |
438 | 0 | *node = pool->add(new vectorized::VExceptNode(pool, tnode, descs)); |
439 | 0 | return Status::OK(); |
440 | | |
441 | 0 | case TPlanNodeType::FILE_SCAN_NODE: |
442 | 0 | *node = pool->add(new vectorized::NewFileScanNode(pool, tnode, descs)); |
443 | 0 | return Status::OK(); |
444 | | |
445 | 0 | case TPlanNodeType::REPEAT_NODE: |
446 | 0 | *node = pool->add(new vectorized::VRepeatNode(pool, tnode, descs)); |
447 | 0 | return Status::OK(); |
448 | | |
449 | 0 | case TPlanNodeType::ASSERT_NUM_ROWS_NODE: |
450 | 0 | *node = pool->add(new vectorized::VAssertNumRowsNode(pool, tnode, descs)); |
451 | 0 | return Status::OK(); |
452 | | |
453 | 0 | case TPlanNodeType::TABLE_FUNCTION_NODE: |
454 | 0 | *node = pool->add(new vectorized::VTableFunctionNode(pool, tnode, descs)); |
455 | 0 | return Status::OK(); |
456 | | |
457 | 0 | case TPlanNodeType::DATA_GEN_SCAN_NODE: |
458 | 0 | *node = pool->add(new vectorized::VDataGenFunctionScanNode(pool, tnode, descs)); |
459 | 0 | return Status::OK(); |
460 | | |
461 | 0 | case TPlanNodeType::PARTITION_SORT_NODE: |
462 | 0 | *node = pool->add(new vectorized::VPartitionSortNode(pool, tnode, descs)); |
463 | 0 | return Status::OK(); |
464 | | |
465 | 0 | case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: |
466 | 0 | #ifndef NDEBUG |
467 | 0 | DCHECK(state->get_query_ctx() != nullptr); |
468 | 0 | state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true; |
469 | 0 | #endif |
470 | 0 | *node = pool->add(new vectorized::GroupCommitScanNode(pool, tnode, descs)); |
471 | 0 | return Status::OK(); |
472 | | |
473 | 0 | default: |
474 | 0 | auto i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); |
475 | 0 | const char* str = "unknown node type"; |
476 | |
|
477 | 0 | if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) { |
478 | 0 | str = i->second; |
479 | 0 | } |
480 | |
|
481 | 0 | std::stringstream error_msg; |
482 | 0 | error_msg << str << " not implemented"; |
483 | 0 | return Status::InternalError(error_msg.str()); |
484 | 0 | } |
485 | | |
486 | 0 | return Status::OK(); |
487 | 0 | } |
488 | | // NOLINTEND(readability-function-size) |
489 | | |
490 | 0 | std::string ExecNode::debug_string() const { |
491 | 0 | std::stringstream out; |
492 | 0 | this->debug_string(0, &out); |
493 | 0 | return out.str(); |
494 | 0 | } |
495 | | |
496 | 0 | void ExecNode::debug_string(int indentation_level, std::stringstream* out) const { |
497 | 0 | *out << " id=" << _id; |
498 | 0 | *out << " type=" << print_plan_node_type(_type); |
499 | 0 | *out << " tuple_ids=["; |
500 | 0 | for (auto id : _tuple_ids) { |
501 | 0 | *out << id << ", "; |
502 | 0 | } |
503 | 0 | *out << "]"; |
504 | |
|
505 | 0 | for (auto* i : _children) { |
506 | 0 | *out << "\n"; |
507 | 0 | i->debug_string(indentation_level + 1, out); |
508 | 0 | } |
509 | 0 | } |
510 | | |
511 | 0 | void ExecNode::collect_nodes(TPlanNodeType::type node_type, std::vector<ExecNode*>* nodes) { |
512 | 0 | if (_type == node_type) { |
513 | 0 | nodes->push_back(this); |
514 | 0 | } |
515 | |
|
516 | 0 | for (auto& i : _children) { |
517 | 0 | i->collect_nodes(node_type, nodes); |
518 | 0 | } |
519 | 0 | } |
520 | | |
521 | 0 | void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) { |
522 | 0 | collect_nodes(TPlanNodeType::OLAP_SCAN_NODE, nodes); |
523 | 0 | collect_nodes(TPlanNodeType::ES_HTTP_SCAN_NODE, nodes); |
524 | 0 | collect_nodes(TPlanNodeType::DATA_GEN_SCAN_NODE, nodes); |
525 | 0 | collect_nodes(TPlanNodeType::FILE_SCAN_NODE, nodes); |
526 | 0 | collect_nodes(TPlanNodeType::META_SCAN_NODE, nodes); |
527 | 0 | collect_nodes(TPlanNodeType::JDBC_SCAN_NODE, nodes); |
528 | 0 | collect_nodes(TPlanNodeType::ODBC_SCAN_NODE, nodes); |
529 | 0 | } |
530 | | |
531 | 2 | void ExecNode::init_runtime_profile(const std::string& name) { |
532 | 2 | std::stringstream ss; |
533 | 2 | ss << name << " (id=" << _id << ")"; |
534 | 2 | _runtime_profile = std::make_unique<RuntimeProfile>(ss.str()); |
535 | 2 | _runtime_profile->set_metadata(_id); |
536 | 2 | } |
537 | | |
538 | 0 | void ExecNode::release_block_memory(vectorized::Block& block, uint16_t child_idx) { |
539 | 0 | DCHECK(child_idx < _children.size()); |
540 | 0 | block.clear_column_data(child(child_idx)->row_desc().num_materialized_slots()); |
541 | 0 | } |
542 | | |
543 | 0 | void ExecNode::reached_limit(vectorized::Block* block, bool* eos) { |
544 | 0 | if (_limit != -1 and _num_rows_returned + block->rows() >= _limit) { |
545 | 0 | block->set_num_rows(_limit - _num_rows_returned); |
546 | 0 | *eos = true; |
547 | 0 | } |
548 | |
|
549 | 0 | _num_rows_returned += block->rows(); |
550 | 0 | COUNTER_SET(_rows_returned_counter, _num_rows_returned); |
551 | 0 | } |
552 | | |
553 | 0 | Status ExecNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { |
554 | 0 | return Status::NotSupported("Not Implemented get block"); |
555 | 0 | } |
556 | | |
557 | 0 | std::string ExecNode::get_name() { |
558 | 0 | return "V" + print_plan_node_type(_type); |
559 | 0 | } |
560 | | |
561 | 0 | Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) { |
562 | 0 | SCOPED_TIMER(_exec_timer); |
563 | 0 | SCOPED_TIMER(_projection_timer); |
564 | 0 | const size_t rows = origin_block->rows(); |
565 | 0 | if (rows == 0) { |
566 | 0 | return Status::OK(); |
567 | 0 | } |
568 | 0 | vectorized::Block input_block = *origin_block; |
569 | |
|
570 | 0 | std::vector<int> result_column_ids; |
571 | 0 | for (auto& projections : _intermediate_projections) { |
572 | 0 | result_column_ids.resize(projections.size()); |
573 | 0 | for (int i = 0; i < projections.size(); i++) { |
574 | 0 | RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i])); |
575 | 0 | } |
576 | 0 | input_block.shuffle_columns(result_column_ids); |
577 | 0 | } |
578 | | |
579 | 0 | DCHECK_EQ(rows, input_block.rows()); |
580 | 0 | auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) { |
581 | 0 | if (to->is_nullable() && !from->is_nullable()) { |
582 | 0 | if (_keep_origin || !from->is_exclusive()) { |
583 | 0 | auto& null_column = reinterpret_cast<vectorized::ColumnNullable&>(*to); |
584 | 0 | null_column.get_nested_column().insert_range_from(*from, 0, rows); |
585 | 0 | null_column.get_null_map_column().get_data().resize_fill(rows, 0); |
586 | 0 | } else { |
587 | 0 | to = make_nullable(from, false)->assume_mutable(); |
588 | 0 | } |
589 | 0 | } else { |
590 | 0 | if (_keep_origin || !from->is_exclusive()) { |
591 | 0 | to->insert_range_from(*from, 0, rows); |
592 | 0 | } else { |
593 | 0 | to = from->assume_mutable(); |
594 | 0 | } |
595 | 0 | } |
596 | 0 | }; |
597 | |
|
598 | 0 | using namespace vectorized; |
599 | 0 | MutableBlock mutable_block = |
600 | 0 | VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor); |
601 | |
|
602 | 0 | auto& mutable_columns = mutable_block.mutable_columns(); |
603 | |
|
604 | 0 | DCHECK_EQ(mutable_columns.size(), _projections.size()); |
605 | |
|
606 | 0 | for (int i = 0; i < mutable_columns.size(); ++i) { |
607 | 0 | auto result_column_id = -1; |
608 | 0 | RETURN_IF_ERROR(_projections[i]->execute(&input_block, &result_column_id)); |
609 | 0 | auto column_ptr = input_block.get_by_position(result_column_id) |
610 | 0 | .column->convert_to_full_column_if_const(); |
611 | | //TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it |
612 | 0 | insert_column_datas(mutable_columns[i], column_ptr, rows); |
613 | 0 | } |
614 | 0 | DCHECK(mutable_block.rows() == rows); |
615 | 0 | output_block->set_columns(std::move(mutable_columns)); |
616 | |
|
617 | 0 | return Status::OK(); |
618 | 0 | } |
619 | | |
620 | | Status ExecNode::get_next_after_projects( |
621 | | RuntimeState* state, vectorized::Block* block, bool* eos, |
622 | | const std::function<Status(RuntimeState*, vectorized::Block*, bool*)>& func, |
623 | 0 | bool clear_data) { |
624 | 0 | if (_output_row_descriptor) { |
625 | 0 | if (clear_data) { |
626 | 0 | clear_origin_block(); |
627 | 0 | } |
628 | 0 | RETURN_IF_ERROR(func(state, &_origin_block, eos)); |
629 | 0 | RETURN_IF_ERROR(do_projections(&_origin_block, block)); |
630 | 0 | } else { |
631 | 0 | RETURN_IF_ERROR(func(state, block, eos)); |
632 | 0 | } |
633 | 0 | _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); |
634 | |
|
635 | 0 | if (block && !block->empty()) { |
636 | 0 | COUNTER_UPDATE(_output_bytes_counter, block->allocated_bytes()); |
637 | 0 | COUNTER_UPDATE(_block_count_counter, 1); |
638 | 0 | } |
639 | 0 | return Status::OK(); |
640 | 0 | } |
641 | | |
642 | 0 | Status ExecNode::sink(RuntimeState* state, vectorized::Block* input_block, bool eos) { |
643 | 0 | return Status::NotSupported("{} not implements sink", get_name()); |
644 | 0 | } |
645 | | |
646 | | } // namespace doris |