Coverage Report

Created: 2026-03-20 03:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <pthread.h>
24
25
#include <algorithm>
26
#include <cstdlib>
27
// IWYU pragma: no_include <bits/chrono.h>
28
#include <fmt/format.h>
29
30
#include <chrono> // IWYU pragma: keep
31
#include <map>
32
#include <memory>
33
#include <ostream>
34
#include <utility>
35
36
#include "cloud/config.h"
37
#include "common/cast_set.h"
38
#include "common/config.h"
39
#include "common/exception.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "exec/exchange/local_exchange_sink_operator.h"
43
#include "exec/exchange/local_exchange_source_operator.h"
44
#include "exec/exchange/local_exchanger.h"
45
#include "exec/exchange/vdata_stream_mgr.h"
46
#include "exec/operator/aggregation_sink_operator.h"
47
#include "exec/operator/aggregation_source_operator.h"
48
#include "exec/operator/analytic_sink_operator.h"
49
#include "exec/operator/analytic_source_operator.h"
50
#include "exec/operator/assert_num_rows_operator.h"
51
#include "exec/operator/blackhole_sink_operator.h"
52
#include "exec/operator/cache_sink_operator.h"
53
#include "exec/operator/cache_source_operator.h"
54
#include "exec/operator/datagen_operator.h"
55
#include "exec/operator/dict_sink_operator.h"
56
#include "exec/operator/distinct_streaming_aggregation_operator.h"
57
#include "exec/operator/empty_set_operator.h"
58
#include "exec/operator/es_scan_operator.h"
59
#include "exec/operator/exchange_sink_operator.h"
60
#include "exec/operator/exchange_source_operator.h"
61
#include "exec/operator/file_scan_operator.h"
62
#include "exec/operator/group_commit_block_sink_operator.h"
63
#include "exec/operator/group_commit_scan_operator.h"
64
#include "exec/operator/hashjoin_build_sink.h"
65
#include "exec/operator/hashjoin_probe_operator.h"
66
#include "exec/operator/hive_table_sink_operator.h"
67
#include "exec/operator/iceberg_table_sink_operator.h"
68
#include "exec/operator/jdbc_scan_operator.h"
69
#include "exec/operator/jdbc_table_sink_operator.h"
70
#include "exec/operator/local_merge_sort_source_operator.h"
71
#include "exec/operator/materialization_opertor.h"
72
#include "exec/operator/maxcompute_table_sink_operator.h"
73
#include "exec/operator/memory_scratch_sink_operator.h"
74
#include "exec/operator/meta_scan_operator.h"
75
#include "exec/operator/multi_cast_data_stream_sink.h"
76
#include "exec/operator/multi_cast_data_stream_source.h"
77
#include "exec/operator/nested_loop_join_build_operator.h"
78
#include "exec/operator/nested_loop_join_probe_operator.h"
79
#include "exec/operator/olap_scan_operator.h"
80
#include "exec/operator/olap_table_sink_operator.h"
81
#include "exec/operator/olap_table_sink_v2_operator.h"
82
#include "exec/operator/partition_sort_sink_operator.h"
83
#include "exec/operator/partition_sort_source_operator.h"
84
#include "exec/operator/partitioned_aggregation_sink_operator.h"
85
#include "exec/operator/partitioned_aggregation_source_operator.h"
86
#include "exec/operator/partitioned_hash_join_probe_operator.h"
87
#include "exec/operator/partitioned_hash_join_sink_operator.h"
88
#include "exec/operator/rec_cte_anchor_sink_operator.h"
89
#include "exec/operator/rec_cte_scan_operator.h"
90
#include "exec/operator/rec_cte_sink_operator.h"
91
#include "exec/operator/rec_cte_source_operator.h"
92
#include "exec/operator/repeat_operator.h"
93
#include "exec/operator/result_file_sink_operator.h"
94
#include "exec/operator/result_sink_operator.h"
95
#include "exec/operator/schema_scan_operator.h"
96
#include "exec/operator/select_operator.h"
97
#include "exec/operator/set_probe_sink_operator.h"
98
#include "exec/operator/set_sink_operator.h"
99
#include "exec/operator/set_source_operator.h"
100
#include "exec/operator/sort_sink_operator.h"
101
#include "exec/operator/sort_source_operator.h"
102
#include "exec/operator/spill_iceberg_table_sink_operator.h"
103
#include "exec/operator/spill_sort_sink_operator.h"
104
#include "exec/operator/spill_sort_source_operator.h"
105
#include "exec/operator/streaming_aggregation_operator.h"
106
#include "exec/operator/table_function_operator.h"
107
#include "exec/operator/tvf_table_sink_operator.h"
108
#include "exec/operator/union_sink_operator.h"
109
#include "exec/operator/union_source_operator.h"
110
#include "exec/pipeline/dependency.h"
111
#include "exec/pipeline/pipeline_task.h"
112
#include "exec/pipeline/task_scheduler.h"
113
#include "exec/runtime_filter/runtime_filter_mgr.h"
114
#include "exec/sort/topn_sorter.h"
115
#include "exec/spill/spill_file.h"
116
#include "io/fs/stream_load_pipe.h"
117
#include "load/stream_load/new_load_stream_mgr.h"
118
#include "runtime/exec_env.h"
119
#include "runtime/fragment_mgr.h"
120
#include "runtime/result_buffer_mgr.h"
121
#include "runtime/runtime_state.h"
122
#include "runtime/thread_context.h"
123
#include "util/countdown_latch.h"
124
#include "util/debug_util.h"
125
#include "util/uid_util.h"
126
127
namespace doris {
128
#include "common/compile_check_begin.h"
129
PipelineFragmentContext::PipelineFragmentContext(
130
        TUniqueId query_id, const TPipelineFragmentParams& request,
131
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
132
        const std::function<void(RuntimeState*, Status*)>& call_back,
133
        report_status_callback report_status_cb)
134
458k
        : _query_id(std::move(query_id)),
135
458k
          _fragment_id(request.fragment_id),
136
458k
          _exec_env(exec_env),
137
458k
          _query_ctx(std::move(query_ctx)),
138
458k
          _call_back(call_back),
139
458k
          _is_report_on_cancel(true),
140
458k
          _report_status_cb(std::move(report_status_cb)),
141
458k
          _params(request),
142
458k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
143
458k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
144
458k
                                                               : false) {
145
458k
    _fragment_watcher.start();
146
458k
}
147
148
458k
PipelineFragmentContext::~PipelineFragmentContext() {
149
458k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
150
458k
            .tag("query_id", print_id(_query_id))
151
458k
            .tag("fragment_id", _fragment_id);
152
458k
    _release_resource();
153
458k
    {
154
        // The memory released by the query end is recorded in the query mem tracker.
155
458k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
156
458k
        _runtime_state.reset();
157
458k
        _query_ctx.reset();
158
458k
    }
159
458k
}
160
161
518
bool PipelineFragmentContext::is_timeout(timespec now) const {
162
518
    if (_timeout <= 0) {
163
0
        return false;
164
0
    }
165
518
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
166
518
}
167
168
// Must not add lock in this method. Because it will call query ctx cancel. And
169
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
170
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
171
// There maybe dead lock.
172
6.24k
void PipelineFragmentContext::cancel(const Status reason) {
173
6.24k
    LOG_INFO("PipelineFragmentContext::cancel")
174
6.24k
            .tag("query_id", print_id(_query_id))
175
6.24k
            .tag("fragment_id", _fragment_id)
176
6.24k
            .tag("reason", reason.to_string());
177
6.24k
    {
178
6.24k
        std::lock_guard<std::mutex> l(_task_mutex);
179
6.24k
        if (_closed_tasks >= _total_tasks) {
180
            // All tasks in this PipelineXFragmentContext already closed.
181
104
            return;
182
104
        }
183
6.24k
    }
184
    // Timeout is a special error code, we need print current stack to debug timeout issue.
185
6.14k
    if (reason.is<ErrorCode::TIMEOUT>()) {
186
17
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
187
17
                                   debug_string());
188
17
        LOG_LONG_STRING(WARNING, dbg_str);
189
17
    }
190
191
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
192
6.14k
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
193
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
194
0
                    debug_string());
195
0
    }
196
197
6.14k
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
198
12
        print_profile("cancel pipeline, reason: " + reason.to_string());
199
12
    }
200
201
6.14k
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
202
22
        _query_ctx->set_load_error_url(error_url);
203
22
    }
204
205
6.14k
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
206
22
        _query_ctx->set_first_error_msg(first_error_msg);
207
22
    }
208
209
6.14k
    _query_ctx->cancel(reason, _fragment_id);
210
6.14k
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
211
305
        _is_report_on_cancel = false;
212
5.83k
    } else {
213
29.5k
        for (auto& id : _fragment_instance_ids) {
214
29.5k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
215
29.5k
        }
216
5.83k
    }
217
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
218
    // For stream load the fragment's query_id == load id, it is set in FE.
219
6.14k
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
220
6.14k
    if (stream_load_ctx != nullptr) {
221
30
        stream_load_ctx->pipe->cancel(reason.to_string());
222
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
223
        // We need to set the error URL at this point to ensure error information is properly
224
        // propagated to the client.
225
30
        stream_load_ctx->error_url = get_load_error_url();
226
30
        stream_load_ctx->first_error_msg = get_first_error_msg();
227
30
    }
228
229
30.1k
    for (auto& tasks : _tasks) {
230
67.4k
        for (auto& task : tasks) {
231
67.4k
            task.first->terminate();
232
67.4k
        }
233
30.1k
    }
234
6.14k
}
235
236
715k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
237
715k
    PipelineId id = _next_pipeline_id++;
238
715k
    auto pipeline = std::make_shared<Pipeline>(
239
715k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
240
715k
            parent ? parent->num_tasks() : _num_instances);
241
715k
    if (idx >= 0) {
242
105k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
243
609k
    } else {
244
609k
        _pipelines.emplace_back(pipeline);
245
609k
    }
246
715k
    if (parent) {
247
246k
        parent->set_children(pipeline);
248
246k
    }
249
715k
    return pipeline;
250
715k
}
251
252
461k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
253
461k
    {
254
461k
        SCOPED_TIMER(_build_pipelines_timer);
255
        // 2. Build pipelines with operators in this fragment.
256
461k
        auto root_pipeline = add_pipeline();
257
461k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
258
461k
                                         &_root_op, root_pipeline));
259
260
        // 3. Create sink operator
261
461k
        if (!_params.fragment.__isset.output_sink) {
262
0
            return Status::InternalError("No output sink in this fragment!");
263
0
        }
264
461k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
265
461k
                                          _params.fragment.output_exprs, _params,
266
461k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
267
461k
                                          *_desc_tbl, root_pipeline->id()));
268
461k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
269
461k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
270
271
608k
        for (PipelinePtr& pipeline : _pipelines) {
272
18.4E
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
273
608k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
274
608k
        }
275
461k
    }
276
    // 4. Build local exchanger
277
461k
    if (_runtime_state->enable_local_shuffle()) {
278
458k
        SCOPED_TIMER(_plan_local_exchanger_timer);
279
458k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
280
458k
                                             _params.bucket_seq_to_instance_idx,
281
458k
                                             _params.shuffle_idx_to_instance_idx));
282
458k
    }
283
284
    // 5. Initialize global states in pipelines.
285
716k
    for (PipelinePtr& pipeline : _pipelines) {
286
716k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
287
716k
        pipeline->children().clear();
288
716k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
289
716k
    }
290
291
459k
    {
292
459k
        SCOPED_TIMER(_build_tasks_timer);
293
        // 6. Build pipeline tasks and initialize local state.
294
459k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
295
459k
    }
296
297
459k
    return Status::OK();
298
459k
}
299
300
458k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
301
458k
    if (_prepared) {
302
0
        return Status::InternalError("Already prepared");
303
0
    }
304
458k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
305
458k
        _timeout = _params.query_options.execution_timeout;
306
458k
    }
307
308
458k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
309
458k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
310
458k
    SCOPED_TIMER(_prepare_timer);
311
458k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
312
458k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
313
458k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
314
458k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
315
458k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
316
458k
    {
317
458k
        SCOPED_TIMER(_init_context_timer);
318
458k
        cast_set(_num_instances, _params.local_params.size());
319
458k
        _total_instances =
320
458k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
321
322
458k
        auto* fragment_context = this;
323
324
458k
        if (_params.query_options.__isset.is_report_success) {
325
457k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
326
457k
        }
327
328
        // 1. Set up the global runtime state.
329
458k
        _runtime_state = RuntimeState::create_unique(
330
458k
                _params.query_id, _params.fragment_id, _params.query_options,
331
458k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
332
458k
        _runtime_state->set_task_execution_context(shared_from_this());
333
458k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
334
458k
        if (_params.__isset.backend_id) {
335
456k
            _runtime_state->set_backend_id(_params.backend_id);
336
456k
        }
337
458k
        if (_params.__isset.import_label) {
338
238
            _runtime_state->set_import_label(_params.import_label);
339
238
        }
340
458k
        if (_params.__isset.db_name) {
341
190
            _runtime_state->set_db_name(_params.db_name);
342
190
        }
343
458k
        if (_params.__isset.load_job_id) {
344
0
            _runtime_state->set_load_job_id(_params.load_job_id);
345
0
        }
346
347
458k
        if (_params.is_simplified_param) {
348
160k
            _desc_tbl = _query_ctx->desc_tbl;
349
297k
        } else {
350
297k
            DCHECK(_params.__isset.desc_tbl);
351
297k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
352
297k
                                                  &_desc_tbl));
353
297k
        }
354
458k
        _runtime_state->set_desc_tbl(_desc_tbl);
355
458k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
356
458k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
357
458k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
358
458k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
359
360
        // init fragment_instance_ids
361
458k
        const auto target_size = _params.local_params.size();
362
458k
        _fragment_instance_ids.resize(target_size);
363
1.71M
        for (size_t i = 0; i < _params.local_params.size(); i++) {
364
1.26M
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
365
1.26M
            _fragment_instance_ids[i] = fragment_instance_id;
366
1.26M
        }
367
458k
    }
368
369
458k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
370
371
457k
    _init_next_report_time();
372
373
457k
    _prepared = true;
374
457k
    return Status::OK();
375
458k
}
376
377
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
378
        int instance_idx,
379
1.26M
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
380
1.26M
    const auto& local_params = _params.local_params[instance_idx];
381
1.26M
    auto fragment_instance_id = local_params.fragment_instance_id;
382
1.26M
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
383
1.26M
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
384
1.26M
    auto get_shared_state = [&](PipelinePtr pipeline)
385
1.26M
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
386
2.25M
                                       std::vector<std::shared_ptr<Dependency>>>> {
387
2.25M
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
388
2.25M
                                std::vector<std::shared_ptr<Dependency>>>>
389
2.25M
                shared_state_map;
390
2.88M
        for (auto& op : pipeline->operators()) {
391
2.88M
            auto source_id = op->operator_id();
392
2.88M
            if (auto iter = _op_id_to_shared_state.find(source_id);
393
2.88M
                iter != _op_id_to_shared_state.end()) {
394
819k
                shared_state_map.insert({source_id, iter->second});
395
819k
            }
396
2.88M
        }
397
2.26M
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
398
2.26M
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
399
2.26M
                iter != _op_id_to_shared_state.end()) {
400
367k
                shared_state_map.insert({sink_to_source_id, iter->second});
401
367k
            }
402
2.26M
        }
403
2.25M
        return shared_state_map;
404
2.25M
    };
405
406
3.97M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
407
2.70M
        auto& pipeline = _pipelines[pip_idx];
408
2.70M
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
409
2.25M
            auto task_runtime_state = RuntimeState::create_unique(
410
2.25M
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
411
2.25M
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
412
2.25M
            {
413
                // Initialize runtime state for this task
414
2.25M
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
415
416
2.25M
                task_runtime_state->set_task_execution_context(shared_from_this());
417
2.25M
                task_runtime_state->set_be_number(local_params.backend_num);
418
2.25M
                if (_need_notify_close) {
419
                    // rec cte require child rf to wait infinitely to make sure all rpc done
420
9.52k
                    task_runtime_state->set_force_make_rf_wait_infinite();
421
9.52k
                }
422
423
2.25M
                if (_params.__isset.backend_id) {
424
2.25M
                    task_runtime_state->set_backend_id(_params.backend_id);
425
2.25M
                }
426
2.25M
                if (_params.__isset.import_label) {
427
239
                    task_runtime_state->set_import_label(_params.import_label);
428
239
                }
429
2.25M
                if (_params.__isset.db_name) {
430
191
                    task_runtime_state->set_db_name(_params.db_name);
431
191
                }
432
2.25M
                if (_params.__isset.load_job_id) {
433
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
434
0
                }
435
2.25M
                if (_params.__isset.wal_id) {
436
114
                    task_runtime_state->set_wal_id(_params.wal_id);
437
114
                }
438
2.25M
                if (_params.__isset.content_length) {
439
31
                    task_runtime_state->set_content_length(_params.content_length);
440
31
                }
441
442
2.25M
                task_runtime_state->set_desc_tbl(_desc_tbl);
443
2.25M
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
444
2.25M
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
445
2.25M
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
446
2.25M
                task_runtime_state->set_max_operator_id(max_operator_id());
447
2.25M
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
448
2.25M
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
449
2.25M
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
450
451
2.25M
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
452
2.25M
            }
453
2.25M
            auto cur_task_id = _total_tasks++;
454
2.25M
            task_runtime_state->set_task_id(cur_task_id);
455
2.25M
            task_runtime_state->set_task_num(pipeline->num_tasks());
456
2.25M
            auto task = std::make_shared<PipelineTask>(
457
2.25M
                    pipeline, cur_task_id, task_runtime_state.get(),
458
2.25M
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
459
2.25M
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
460
2.25M
                    instance_idx);
461
2.25M
            pipeline->incr_created_tasks(instance_idx, task.get());
462
2.25M
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
463
2.25M
            _tasks[instance_idx].emplace_back(
464
2.25M
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
465
2.25M
                            std::move(task), std::move(task_runtime_state)});
466
2.25M
        }
467
2.70M
    }
468
469
    /**
470
         * Build DAG for pipeline tasks.
471
         * For example, we have
472
         *
473
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
474
         *            \                      /
475
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
476
         *                 \                          /
477
         *               JoinProbeOperator2 (Pipeline1)
478
         *
479
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
480
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
481
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
482
         *
483
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
484
         * and JoinProbeOperator2.
485
         */
486
2.70M
    for (auto& _pipeline : _pipelines) {
487
2.70M
        if (pipeline_id_to_task.contains(_pipeline->id())) {
488
2.25M
            auto* task = pipeline_id_to_task[_pipeline->id()];
489
2.25M
            DCHECK(task != nullptr);
490
491
            // If this task has upstream dependency, then inject it into this task.
492
2.25M
            if (_dag.contains(_pipeline->id())) {
493
1.43M
                auto& deps = _dag[_pipeline->id()];
494
2.24M
                for (auto& dep : deps) {
495
2.24M
                    if (pipeline_id_to_task.contains(dep)) {
496
1.33M
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
497
1.33M
                        if (ss) {
498
615k
                            task->inject_shared_state(ss);
499
722k
                        } else {
500
722k
                            pipeline_id_to_task[dep]->inject_shared_state(
501
722k
                                    task->get_source_shared_state());
502
722k
                        }
503
1.33M
                    }
504
2.24M
                }
505
1.43M
            }
506
2.25M
        }
507
2.70M
    }
508
3.96M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
509
2.70M
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
510
2.25M
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
511
2.25M
            DCHECK(pipeline_id_to_profile[pip_idx]);
512
2.25M
            std::vector<TScanRangeParams> scan_ranges;
513
2.25M
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
514
2.25M
            if (local_params.per_node_scan_ranges.contains(node_id)) {
515
421k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
516
421k
            }
517
2.25M
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
518
2.25M
                                                             _params.fragment.output_sink));
519
2.25M
        }
520
2.70M
    }
521
1.26M
    {
522
1.26M
        std::lock_guard<std::mutex> l(_state_map_lock);
523
1.26M
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
524
1.26M
    }
525
1.26M
    return Status::OK();
526
1.26M
}
527
528
460k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
529
460k
    _total_tasks = 0;
530
460k
    _closed_tasks = 0;
531
460k
    const auto target_size = _params.local_params.size();
532
460k
    _tasks.resize(target_size);
533
460k
    _runtime_filter_mgr_map.resize(target_size);
534
1.17M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
535
714k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
536
714k
    }
537
460k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
538
539
460k
    if (target_size > 1 &&
540
460k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
541
126k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
542
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
543
29.2k
        std::vector<Status> prepare_status(target_size);
544
29.2k
        int submitted_tasks = 0;
545
29.2k
        Status submit_status;
546
29.2k
        CountDownLatch latch((int)target_size);
547
395k
        for (int i = 0; i < target_size; i++) {
548
365k
            submit_status = thread_pool->submit_func([&, i]() {
549
365k
                SCOPED_ATTACH_TASK(_query_ctx.get());
550
365k
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
551
365k
                latch.count_down();
552
365k
            });
553
365k
            if (LIKELY(submit_status.ok())) {
554
365k
                submitted_tasks++;
555
18.4E
            } else {
556
18.4E
                break;
557
18.4E
            }
558
365k
        }
559
29.2k
        latch.arrive_and_wait(target_size - submitted_tasks);
560
29.2k
        if (UNLIKELY(!submit_status.ok())) {
561
0
            return submit_status;
562
0
        }
563
395k
        for (int i = 0; i < submitted_tasks; i++) {
564
365k
            if (!prepare_status[i].ok()) {
565
0
                return prepare_status[i];
566
0
            }
567
365k
        }
568
431k
    } else {
569
1.33M
        for (int i = 0; i < target_size; i++) {
570
899k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
571
899k
        }
572
431k
    }
573
460k
    _pipeline_parent_map.clear();
574
460k
    _op_id_to_shared_state.clear();
575
576
460k
    return Status::OK();
577
460k
}
578
579
455k
void PipelineFragmentContext::_init_next_report_time() {
580
455k
    auto interval_s = config::pipeline_status_report_interval;
581
455k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
582
43.8k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
583
43.8k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
584
        // We don't want to wait longer than it takes to run the entire fragment.
585
43.8k
        _previous_report_time =
586
43.8k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
587
43.8k
        _disable_period_report = false;
588
43.8k
    }
589
455k
}
590
591
5.19k
void PipelineFragmentContext::refresh_next_report_time() {
592
5.19k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
593
5.19k
    DCHECK(disable == true);
594
5.19k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
595
5.19k
    _disable_period_report.compare_exchange_strong(disable, false);
596
5.19k
}
597
598
8.14M
void PipelineFragmentContext::trigger_report_if_necessary() {
599
8.14M
    if (!_is_report_success) {
600
7.43M
        return;
601
7.43M
    }
602
716k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
603
716k
    if (disable) {
604
11.1k
        return;
605
11.1k
    }
606
705k
    int32_t interval_s = config::pipeline_status_report_interval;
607
705k
    if (interval_s <= 0) {
608
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
609
0
                        "trigger "
610
0
                        "report.";
611
0
    }
612
705k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
613
705k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
614
705k
    if (MonotonicNanos() > next_report_time) {
615
5.20k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
616
5.20k
                                                            std::memory_order_acq_rel)) {
617
7
            return;
618
7
        }
619
5.19k
        if (VLOG_FILE_IS_ON) {
620
0
            VLOG_FILE << "Reporting "
621
0
                      << "profile for query_id " << print_id(_query_id)
622
0
                      << ", fragment id: " << _fragment_id;
623
624
0
            std::stringstream ss;
625
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
626
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
627
0
            if (_runtime_state->load_channel_profile()) {
628
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
629
0
            }
630
631
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
632
0
                      << " profile:\n"
633
0
                      << ss.str();
634
0
        }
635
5.19k
        auto st = send_report(false);
636
5.19k
        if (!st.ok()) {
637
0
            disable = true;
638
0
            _disable_period_report.compare_exchange_strong(disable, false,
639
0
                                                           std::memory_order_acq_rel);
640
0
        }
641
5.19k
    }
642
705k
}
643
644
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
645
459k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
646
459k
    if (_params.fragment.plan.nodes.empty()) {
647
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
648
0
    }
649
650
459k
    int node_idx = 0;
651
652
459k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
653
459k
                                        &node_idx, root, cur_pipe, 0, false, false));
654
655
459k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
656
0
        return Status::InternalError(
657
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
658
0
    }
659
459k
    return Status::OK();
660
459k
}
661
662
Status PipelineFragmentContext::_create_tree_helper(
663
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
664
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
665
724k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
666
    // propagate error case
667
724k
    if (*node_idx >= tnodes.size()) {
668
0
        return Status::InternalError(
669
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
670
0
                *node_idx, tnodes.size());
671
0
    }
672
724k
    const TPlanNode& tnode = tnodes[*node_idx];
673
674
724k
    int num_children = tnodes[*node_idx].num_children;
675
724k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
676
724k
    bool current_require_bucket_distribution = require_bucket_distribution;
677
724k
    OperatorPtr op = nullptr;
678
724k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
679
724k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
680
724k
                                     followed_by_shuffled_operator,
681
724k
                                     current_require_bucket_distribution));
682
    // Initialization must be done here. For example, group by expressions in agg will be used to
683
    // decide if a local shuffle should be planed, so it must be initialized here.
684
724k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
685
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
686
724k
    if (parent != nullptr) {
687
        // add to parent's child(s)
688
265k
        RETURN_IF_ERROR(parent->set_child(op));
689
458k
    } else {
690
458k
        *root = op;
691
458k
    }
692
    /**
693
     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
694
     *
695
     * For plan:
696
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
697
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
698
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
699
     *
700
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
701
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
702
     */
703
724k
    auto required_data_distribution =
704
724k
            cur_pipe->operators().empty()
705
724k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
706
724k
                    : op->required_data_distribution(_runtime_state.get());
707
724k
    current_followed_by_shuffled_operator =
708
724k
            ((followed_by_shuffled_operator ||
709
724k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
710
659k
                                             : op->is_shuffled_operator())) &&
711
724k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
712
724k
            (followed_by_shuffled_operator &&
713
602k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
714
715
724k
    current_require_bucket_distribution =
716
724k
            ((require_bucket_distribution ||
717
724k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
718
664k
                                             : op->is_colocated_operator())) &&
719
724k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
720
724k
            (require_bucket_distribution &&
721
608k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
722
723
724k
    if (num_children == 0) {
724
477k
        _use_serial_source = op->is_serial_operator();
725
477k
    }
726
    // rely on that tnodes is preorder of the plan
727
990k
    for (int i = 0; i < num_children; i++) {
728
265k
        ++*node_idx;
729
265k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
730
265k
                                            current_followed_by_shuffled_operator,
731
265k
                                            current_require_bucket_distribution));
732
733
        // we are expecting a child, but have used all nodes
734
        // this means we have been given a bad tree and must fail
735
265k
        if (*node_idx >= tnodes.size()) {
736
0
            return Status::InternalError(
737
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
738
0
                    "nodes: {}",
739
0
                    *node_idx, tnodes.size());
740
0
        }
741
265k
    }
742
743
724k
    return Status::OK();
744
724k
}
745
746
void PipelineFragmentContext::_inherit_pipeline_properties(
747
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
748
105k
        PipelinePtr pipe_with_sink) {
749
105k
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
750
105k
    pipe_with_source->set_num_tasks(_num_instances);
751
105k
    pipe_with_source->set_data_distribution(data_distribution);
752
105k
}
753
754
Status PipelineFragmentContext::_add_local_exchange_impl(
755
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
756
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
757
        const std::map<int, int>& bucket_seq_to_instance_idx,
758
105k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
759
105k
    auto& operators = cur_pipe->operators();
760
105k
    const auto downstream_pipeline_id = cur_pipe->id();
761
105k
    auto local_exchange_id = next_operator_id();
762
    // 1. Create a new pipeline with local exchange sink.
763
105k
    DataSinkOperatorPtr sink;
764
105k
    auto sink_id = next_sink_operator_id();
765
766
    /**
767
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
768
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
769
     */
770
105k
    const bool followed_by_shuffled_operator =
771
105k
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
772
105k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
773
105k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
774
105k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
775
105k
                                         followed_by_shuffled_operator && !_use_serial_source;
776
105k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
777
105k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
778
105k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
779
105k
    if (bucket_seq_to_instance_idx.empty() &&
780
105k
        data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
781
0
        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
782
0
    }
783
105k
    RETURN_IF_ERROR(new_pip->set_sink(sink));
784
105k
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
785
105k
                                          num_buckets, use_global_hash_shuffle,
786
105k
                                          shuffle_idx_to_instance_idx));
787
788
    // 2. Create and initialize LocalExchangeSharedState.
789
105k
    std::shared_ptr<LocalExchangeSharedState> shared_state =
790
105k
            LocalExchangeSharedState::create_shared(_num_instances);
791
105k
    switch (data_distribution.distribution_type) {
792
14.1k
    case ExchangeType::HASH_SHUFFLE:
793
14.1k
        shared_state->exchanger = ShuffleExchanger::create_unique(
794
14.1k
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
795
14.1k
                use_global_hash_shuffle ? _total_instances : _num_instances,
796
14.1k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
797
14.1k
                        ? cast_set<int>(
798
14.1k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
799
18.4E
                        : 0);
800
14.1k
        break;
801
389
    case ExchangeType::BUCKET_HASH_SHUFFLE:
802
389
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
803
389
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
804
389
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
805
389
                        ? cast_set<int>(
806
389
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
807
389
                        : 0);
808
389
        break;
809
87.6k
    case ExchangeType::PASSTHROUGH:
810
87.6k
        shared_state->exchanger = PassthroughExchanger::create_unique(
811
87.6k
                cur_pipe->num_tasks(), _num_instances,
812
87.6k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
813
87.6k
                        ? cast_set<int>(
814
87.6k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
815
87.6k
                        : 0);
816
87.6k
        break;
817
276
    case ExchangeType::BROADCAST:
818
276
        shared_state->exchanger = BroadcastExchanger::create_unique(
819
276
                cur_pipe->num_tasks(), _num_instances,
820
276
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
821
276
                        ? cast_set<int>(
822
276
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
823
276
                        : 0);
824
276
        break;
825
2.46k
    case ExchangeType::PASS_TO_ONE:
826
2.46k
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
827
            // If shared hash table is enabled for BJ, hash table will be built by only one task
828
1.51k
            shared_state->exchanger = PassToOneExchanger::create_unique(
829
1.51k
                    cur_pipe->num_tasks(), _num_instances,
830
1.51k
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
831
1.51k
                            ? cast_set<int>(_runtime_state->query_options()
832
1.51k
                                                    .local_exchange_free_blocks_limit)
833
1.51k
                            : 0);
834
1.51k
        } else {
835
948
            shared_state->exchanger = BroadcastExchanger::create_unique(
836
948
                    cur_pipe->num_tasks(), _num_instances,
837
948
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
838
948
                            ? cast_set<int>(_runtime_state->query_options()
839
948
                                                    .local_exchange_free_blocks_limit)
840
948
                            : 0);
841
948
        }
842
2.46k
        break;
843
893
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
844
893
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
845
893
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
846
893
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
847
893
                        ? cast_set<int>(
848
893
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
849
893
                        : 0);
850
893
        break;
851
0
    default:
852
0
        return Status::InternalError("Unsupported local exchange type : " +
853
0
                                     std::to_string((int)data_distribution.distribution_type));
854
105k
    }
855
105k
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
856
105k
                                             "LOCAL_EXCHANGE_OPERATOR");
857
105k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
858
105k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
859
860
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
861
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
862
863
    // 3.1 Initialize new pipeline's operator list.
864
105k
    std::copy(operators.begin(), operators.begin() + idx,
865
105k
              std::inserter(new_pip->operators(), new_pip->operators().end()));
866
867
    // 3.2 Erase unused operators in previous pipeline.
868
105k
    operators.erase(operators.begin(), operators.begin() + idx);
869
870
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
871
105k
    OperatorPtr source_op;
872
105k
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
873
105k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
874
105k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
875
105k
    if (!operators.empty()) {
876
37.2k
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
877
37.2k
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
878
37.2k
    }
879
105k
    operators.insert(operators.begin(), source_op);
880
881
    // 5. Set children for two pipelines separately.
882
105k
    std::vector<std::shared_ptr<Pipeline>> new_children;
883
105k
    std::vector<PipelineId> edges_with_source;
884
124k
    for (auto child : cur_pipe->children()) {
885
124k
        bool found = false;
886
138k
        for (auto op : new_pip->operators()) {
887
138k
            if (child->sink()->node_id() == op->node_id()) {
888
13.0k
                new_pip->set_children(child);
889
13.0k
                found = true;
890
13.0k
            };
891
138k
        }
892
124k
        if (!found) {
893
111k
            new_children.push_back(child);
894
111k
            edges_with_source.push_back(child->id());
895
111k
        }
896
124k
    }
897
105k
    new_children.push_back(new_pip);
898
105k
    edges_with_source.push_back(new_pip->id());
899
900
    // 6. Set DAG for new pipelines.
901
105k
    if (!new_pip->children().empty()) {
902
7.66k
        std::vector<PipelineId> edges_with_sink;
903
13.0k
        for (auto child : new_pip->children()) {
904
13.0k
            edges_with_sink.push_back(child->id());
905
13.0k
        }
906
7.66k
        _dag.insert({new_pip->id(), edges_with_sink});
907
7.66k
    }
908
105k
    cur_pipe->set_children(new_children);
909
105k
    _dag[downstream_pipeline_id] = edges_with_source;
910
105k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
911
105k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
912
105k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
913
914
    // 7. Inherit properties from current pipeline.
915
105k
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
916
105k
    return Status::OK();
917
105k
}
918
919
Status PipelineFragmentContext::_add_local_exchange(
920
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
921
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
922
        const std::map<int, int>& bucket_seq_to_instance_idx,
923
205k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
924
205k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
925
71.9k
        return Status::OK();
926
71.9k
    }
927
928
133k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
929
42.1k
        return Status::OK();
930
42.1k
    }
931
91.4k
    *do_local_exchange = true;
932
933
91.4k
    auto& operators = cur_pipe->operators();
934
91.4k
    auto total_op_num = operators.size();
935
91.4k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
936
91.4k
    RETURN_IF_ERROR(_add_local_exchange_impl(
937
91.4k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
938
91.4k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
939
940
18.4E
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
941
18.4E
            << "total_op_num: " << total_op_num
942
18.4E
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
943
18.4E
            << " new_pip->operators().size(): " << new_pip->operators().size();
944
945
    // There are some local shuffles with relatively heavy operations on the sink.
946
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
947
    // Therefore, local passthrough is used to increase the concurrency of the sink.
948
    // op -> local sink(1) -> local source (n)
949
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
950
91.6k
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
951
91.4k
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
952
14.1k
        RETURN_IF_ERROR(_add_local_exchange_impl(
953
14.1k
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
954
14.1k
                add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
955
14.1k
                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
956
14.1k
                shuffle_idx_to_instance_idx));
957
14.1k
    }
958
91.4k
    return Status::OK();
959
91.4k
}
960
961
Status PipelineFragmentContext::_plan_local_exchange(
962
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
963
457k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
964
1.06M
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
965
606k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
966
        // Set property if child pipeline is not join operator's child.
967
606k
        if (!_pipelines[pip_idx]->children().empty()) {
968
141k
            for (auto& child : _pipelines[pip_idx]->children()) {
969
141k
                if (child->sink()->node_id() ==
970
141k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
971
125k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
972
125k
                }
973
141k
            }
974
135k
        }
975
976
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
977
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
978
        // still keep colocate plan after local shuffle
979
606k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
980
606k
                                             bucket_seq_to_instance_idx,
981
606k
                                             shuffle_idx_to_instance_idx));
982
606k
    }
983
457k
    return Status::OK();
984
457k
}
985
986
Status PipelineFragmentContext::_plan_local_exchange(
987
        int num_buckets, int pip_idx, PipelinePtr pip,
988
        const std::map<int, int>& bucket_seq_to_instance_idx,
989
605k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
990
605k
    int idx = 1;
991
605k
    bool do_local_exchange = false;
992
642k
    do {
993
642k
        auto& ops = pip->operators();
994
642k
        do_local_exchange = false;
995
        // Plan local exchange for each operator.
996
730k
        for (; idx < ops.size();) {
997
124k
            if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
998
113k
                RETURN_IF_ERROR(_add_local_exchange(
999
113k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
1000
113k
                        ops[idx]->required_data_distribution(_runtime_state.get()),
1001
113k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1002
113k
                        shuffle_idx_to_instance_idx));
1003
113k
            }
1004
124k
            if (do_local_exchange) {
1005
                // If local exchange is needed for current operator, we will split this pipeline to
1006
                // two pipelines by local exchange sink/source. And then we need to process remaining
1007
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1008
                // is current operator was already processed) and continue to plan local exchange.
1009
37.3k
                idx = 2;
1010
37.3k
                break;
1011
37.3k
            }
1012
87.4k
            idx++;
1013
87.4k
        }
1014
642k
    } while (do_local_exchange);
1015
605k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1016
91.5k
        RETURN_IF_ERROR(_add_local_exchange(
1017
91.5k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1018
91.5k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1019
91.5k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1020
91.5k
    }
1021
605k
    return Status::OK();
1022
605k
}
1023
1024
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1025
                                                  const std::vector<TExpr>& output_exprs,
1026
                                                  const TPipelineFragmentParams& params,
1027
                                                  const RowDescriptor& row_desc,
1028
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1029
460k
                                                  PipelineId cur_pipeline_id) {
1030
460k
    switch (thrift_sink.type) {
1031
161k
    case TDataSinkType::DATA_STREAM_SINK: {
1032
161k
        if (!thrift_sink.__isset.stream_sink) {
1033
0
            return Status::InternalError("Missing data stream sink.");
1034
0
        }
1035
161k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1036
161k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1037
161k
                params.destinations, _fragment_instance_ids);
1038
161k
        break;
1039
161k
    }
1040
262k
    case TDataSinkType::RESULT_SINK: {
1041
262k
        if (!thrift_sink.__isset.result_sink) {
1042
0
            return Status::InternalError("Missing data buffer sink.");
1043
0
        }
1044
1045
262k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), row_desc,
1046
262k
                                                      output_exprs, thrift_sink.result_sink);
1047
262k
        break;
1048
262k
    }
1049
105
    case TDataSinkType::DICTIONARY_SINK: {
1050
105
        if (!thrift_sink.__isset.dictionary_sink) {
1051
0
            return Status::InternalError("Missing dict sink.");
1052
0
        }
1053
1054
105
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1055
105
                                                    thrift_sink.dictionary_sink);
1056
105
        break;
1057
105
    }
1058
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1059
30.9k
    case TDataSinkType::OLAP_TABLE_SINK: {
1060
30.9k
        if (state->query_options().enable_memtable_on_sink_node &&
1061
30.9k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1062
30.9k
            !config::is_cloud_mode()) {
1063
2.17k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(pool, next_sink_operator_id(),
1064
2.17k
                                                               row_desc, output_exprs);
1065
28.7k
        } else {
1066
28.7k
            _sink = std::make_shared<OlapTableSinkOperatorX>(pool, next_sink_operator_id(),
1067
28.7k
                                                             row_desc, output_exprs);
1068
28.7k
        }
1069
30.9k
        break;
1070
0
    }
1071
165
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1072
165
        DCHECK(thrift_sink.__isset.olap_table_sink);
1073
165
        DCHECK(state->get_query_ctx() != nullptr);
1074
165
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1075
165
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1076
165
                                                                output_exprs);
1077
165
        break;
1078
0
    }
1079
1.48k
    case TDataSinkType::HIVE_TABLE_SINK: {
1080
1.48k
        if (!thrift_sink.__isset.hive_table_sink) {
1081
0
            return Status::InternalError("Missing hive table sink.");
1082
0
        }
1083
1.48k
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1084
1.48k
                                                         output_exprs);
1085
1.48k
        break;
1086
1.48k
    }
1087
1.57k
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1088
1.57k
        if (!thrift_sink.__isset.iceberg_table_sink) {
1089
0
            return Status::InternalError("Missing iceberg table sink.");
1090
0
        }
1091
1.57k
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1092
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1093
0
                                                                     row_desc, output_exprs);
1094
1.57k
        } else {
1095
1.57k
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1096
1.57k
                                                                row_desc, output_exprs);
1097
1.57k
        }
1098
1.57k
        break;
1099
1.57k
    }
1100
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1101
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1102
0
            return Status::InternalError("Missing max compute table sink.");
1103
0
        }
1104
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1105
0
                                                       output_exprs);
1106
0
        break;
1107
0
    }
1108
80
    case TDataSinkType::JDBC_TABLE_SINK: {
1109
80
        if (!thrift_sink.__isset.jdbc_table_sink) {
1110
0
            return Status::InternalError("Missing data jdbc sink.");
1111
0
        }
1112
80
        if (config::enable_java_support) {
1113
80
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1114
80
                                                             output_exprs);
1115
80
        } else {
1116
0
            return Status::InternalError(
1117
0
                    "Jdbc table sink is not enabled, you can change be config "
1118
0
                    "enable_java_support to true and restart be.");
1119
0
        }
1120
80
        break;
1121
80
    }
1122
80
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1123
3
        if (!thrift_sink.__isset.memory_scratch_sink) {
1124
0
            return Status::InternalError("Missing data buffer sink.");
1125
0
        }
1126
1127
3
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1128
3
                                                             output_exprs);
1129
3
        break;
1130
3
    }
1131
473
    case TDataSinkType::RESULT_FILE_SINK: {
1132
473
        if (!thrift_sink.__isset.result_file_sink) {
1133
0
            return Status::InternalError("Missing result file sink.");
1134
0
        }
1135
1136
        // Result file sink is not the top sink
1137
473
        if (params.__isset.destinations && !params.destinations.empty()) {
1138
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1139
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1140
0
                    params.destinations, output_exprs, desc_tbl);
1141
473
        } else {
1142
473
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1143
473
                                                              output_exprs);
1144
473
        }
1145
473
        break;
1146
473
    }
1147
2.51k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1148
2.51k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1149
2.51k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1150
2.51k
        auto sink_id = next_sink_operator_id();
1151
2.51k
        const int multi_cast_node_id = sink_id;
1152
2.51k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1153
        // one sink has multiple sources.
1154
2.51k
        std::vector<int> sources;
1155
9.85k
        for (int i = 0; i < sender_size; ++i) {
1156
7.33k
            auto source_id = next_operator_id();
1157
7.33k
            sources.push_back(source_id);
1158
7.33k
        }
1159
1160
2.51k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1161
2.51k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1162
9.85k
        for (int i = 0; i < sender_size; ++i) {
1163
7.33k
            auto new_pipeline = add_pipeline();
1164
            // use to exchange sink
1165
7.33k
            RowDescriptor* exchange_row_desc = nullptr;
1166
7.33k
            {
1167
7.33k
                const auto& tmp_row_desc =
1168
7.33k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1169
7.33k
                                ? RowDescriptor(state->desc_tbl(),
1170
7.33k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1171
7.33k
                                                         .output_tuple_id})
1172
7.33k
                                : row_desc;
1173
7.33k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1174
7.33k
            }
1175
7.33k
            auto source_id = sources[i];
1176
7.33k
            OperatorPtr source_op;
1177
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1178
7.33k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1179
7.33k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1180
7.33k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1181
7.33k
                    /*operator_id=*/source_id);
1182
7.33k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1183
7.33k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1184
            // 2. create and set sink operator of data stream sender for new pipeline
1185
1186
7.33k
            DataSinkOperatorPtr sink_op;
1187
7.33k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1188
7.33k
                    state, *exchange_row_desc, next_sink_operator_id(),
1189
7.33k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1190
7.33k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1191
1192
7.33k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1193
7.33k
            {
1194
7.33k
                TDataSink* t = pool->add(new TDataSink());
1195
7.33k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1196
7.33k
                RETURN_IF_ERROR(sink_op->init(*t));
1197
7.33k
            }
1198
1199
            // 3. set dependency dag
1200
7.33k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1201
7.33k
        }
1202
2.51k
        if (sources.empty()) {
1203
0
            return Status::InternalError("size of sources must be greater than 0");
1204
0
        }
1205
2.51k
        break;
1206
2.51k
    }
1207
2.51k
    case TDataSinkType::BLACKHOLE_SINK: {
1208
13
        if (!thrift_sink.__isset.blackhole_sink) {
1209
0
            return Status::InternalError("Missing blackhole sink.");
1210
0
        }
1211
1212
13
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1213
13
        break;
1214
13
    }
1215
156
    case TDataSinkType::TVF_TABLE_SINK: {
1216
156
        if (!thrift_sink.__isset.tvf_table_sink) {
1217
0
            return Status::InternalError("Missing TVF table sink.");
1218
0
        }
1219
156
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1220
156
                                                        output_exprs);
1221
156
        break;
1222
156
    }
1223
0
    default:
1224
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1225
460k
    }
1226
460k
    return Status::OK();
1227
460k
}
1228
1229
// NOLINTBEGIN(readability-function-size)
1230
// NOLINTBEGIN(readability-function-cognitive-complexity)
1231
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1232
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1233
                                                 PipelinePtr& cur_pipe, int parent_idx,
1234
                                                 int child_idx,
1235
                                                 const bool followed_by_shuffled_operator,
1236
727k
                                                 const bool require_bucket_distribution) {
1237
727k
    std::vector<DataSinkOperatorPtr> sink_ops;
1238
727k
    Defer defer = Defer([&]() {
1239
726k
        if (op) {
1240
726k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1241
726k
        }
1242
725k
        for (auto& s : sink_ops) {
1243
140k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1244
140k
        }
1245
725k
    });
1246
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1247
    // Therefore, here we need to use a stack-like structure.
1248
727k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1249
727k
    std::stringstream error_msg;
1250
727k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1251
1252
727k
    bool fe_with_old_version = false;
1253
727k
    switch (tnode.node_type) {
1254
221k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1255
221k
        op = std::make_shared<OlapScanOperatorX>(
1256
221k
                pool, tnode, next_operator_id(), descs, _num_instances,
1257
221k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1258
221k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1259
221k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1260
221k
        break;
1261
221k
    }
1262
77
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1263
77
        DCHECK(_query_ctx != nullptr);
1264
77
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1265
77
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1266
77
                                                    _num_instances);
1267
77
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1268
77
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1269
77
        break;
1270
77
    }
1271
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1272
0
        if (config::enable_java_support) {
1273
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1274
0
                                                     _num_instances);
1275
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1276
0
        } else {
1277
0
            return Status::InternalError(
1278
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1279
0
                    "to true and restart be.");
1280
0
        }
1281
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1282
0
        break;
1283
0
    }
1284
24.2k
    case TPlanNodeType::FILE_SCAN_NODE: {
1285
24.2k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1286
24.2k
                                                 _num_instances);
1287
24.2k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1288
24.2k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1289
24.2k
        break;
1290
24.2k
    }
1291
0
    case TPlanNodeType::ES_SCAN_NODE:
1292
676
    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
1293
676
        op = std::make_shared<EsScanOperatorX>(pool, tnode, next_operator_id(), descs,
1294
676
                                               _num_instances);
1295
676
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1296
676
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1297
676
        break;
1298
676
    }
1299
166k
    case TPlanNodeType::EXCHANGE_NODE: {
1300
166k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1301
166k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1302
18.4E
                                  : 0;
1303
166k
        DCHECK_GT(num_senders, 0);
1304
166k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1305
166k
                                                       num_senders);
1306
166k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1307
166k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1308
166k
        break;
1309
166k
    }
1310
177k
    case TPlanNodeType::AGGREGATION_NODE: {
1311
177k
        if (tnode.agg_node.grouping_exprs.empty() &&
1312
177k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1313
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1314
0
                                         ": group by and output is empty");
1315
0
        }
1316
177k
        bool need_create_cache_op =
1317
177k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1318
177k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1319
24
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1320
24
            auto cache_source_id = next_operator_id();
1321
24
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1322
24
                                                        _params.fragment.query_cache_param);
1323
24
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1324
1325
24
            const auto downstream_pipeline_id = cur_pipe->id();
1326
24
            if (!_dag.contains(downstream_pipeline_id)) {
1327
24
                _dag.insert({downstream_pipeline_id, {}});
1328
24
            }
1329
24
            new_pipe = add_pipeline(cur_pipe);
1330
24
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1331
1332
24
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1333
24
                    next_sink_operator_id(), cache_source_id, op->operator_id()));
1334
24
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1335
24
            return Status::OK();
1336
24
        };
1337
177k
        const bool group_by_limit_opt =
1338
177k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1339
1340
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1341
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1342
177k
        const bool enable_spill = _runtime_state->enable_spill() &&
1343
177k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1344
177k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1345
177k
                                      tnode.agg_node.use_streaming_preaggregation &&
1346
177k
                                      !tnode.agg_node.grouping_exprs.empty();
1347
        // TODO: distinct streaming agg does not support spill.
1348
177k
        const bool can_use_distinct_streaming_agg =
1349
177k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1350
177k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1351
177k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1352
177k
                _params.query_options.enable_distinct_streaming_aggregation;
1353
1354
177k
        if (can_use_distinct_streaming_agg) {
1355
98.9k
            if (need_create_cache_op) {
1356
8
                PipelinePtr new_pipe;
1357
8
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1358
1359
8
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1360
8
                                                                     tnode, descs);
1361
8
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1362
8
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1363
8
                cur_pipe = new_pipe;
1364
98.9k
            } else {
1365
98.9k
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1366
98.9k
                                                                     tnode, descs);
1367
98.9k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1368
98.9k
            }
1369
98.9k
        } else if (is_streaming_agg) {
1370
4.00k
            if (need_create_cache_op) {
1371
9
                PipelinePtr new_pipe;
1372
9
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1373
1374
9
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1375
9
                                                             descs);
1376
9
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1377
9
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1378
9
                cur_pipe = new_pipe;
1379
3.99k
            } else {
1380
3.99k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1381
3.99k
                                                             descs);
1382
3.99k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1383
3.99k
            }
1384
74.6k
        } else {
1385
            // create new pipeline to add query cache operator
1386
74.6k
            PipelinePtr new_pipe;
1387
74.6k
            if (need_create_cache_op) {
1388
7
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1389
7
            }
1390
1391
74.6k
            if (enable_spill) {
1392
569
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1393
569
                                                                     next_operator_id(), descs);
1394
74.0k
            } else {
1395
74.0k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1396
74.0k
            }
1397
74.6k
            if (need_create_cache_op) {
1398
7
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1399
7
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1400
7
                cur_pipe = new_pipe;
1401
74.6k
            } else {
1402
74.6k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1403
74.6k
            }
1404
1405
74.6k
            const auto downstream_pipeline_id = cur_pipe->id();
1406
74.6k
            if (!_dag.contains(downstream_pipeline_id)) {
1407
71.5k
                _dag.insert({downstream_pipeline_id, {}});
1408
71.5k
            }
1409
74.6k
            cur_pipe = add_pipeline(cur_pipe);
1410
74.6k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1411
1412
74.6k
            if (enable_spill) {
1413
569
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1414
569
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1415
74.0k
            } else {
1416
74.0k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1417
74.0k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1418
74.0k
            }
1419
74.6k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1420
74.6k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1421
74.6k
        }
1422
177k
        break;
1423
177k
    }
1424
177k
    case TPlanNodeType::HASH_JOIN_NODE: {
1425
10.0k
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1426
10.0k
                                       tnode.hash_join_node.is_broadcast_join;
1427
10.0k
        const auto enable_spill = _runtime_state->enable_spill();
1428
10.0k
        if (enable_spill && !is_broadcast_join) {
1429
0
            auto tnode_ = tnode;
1430
0
            tnode_.runtime_filters.clear();
1431
0
            auto inner_probe_operator =
1432
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1433
1434
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1435
            // So here use `tnode_` which has no runtime filters.
1436
0
            auto probe_side_inner_sink_operator =
1437
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1438
1439
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1440
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1441
1442
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1443
0
                    pool, tnode_, next_operator_id(), descs);
1444
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1445
0
                                                inner_probe_operator);
1446
0
            op = std::move(probe_operator);
1447
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1448
1449
0
            const auto downstream_pipeline_id = cur_pipe->id();
1450
0
            if (!_dag.contains(downstream_pipeline_id)) {
1451
0
                _dag.insert({downstream_pipeline_id, {}});
1452
0
            }
1453
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1454
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1455
1456
0
            auto inner_sink_operator =
1457
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1458
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1459
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1460
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1461
1462
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1463
0
            sink_ops.push_back(std::move(sink_operator));
1464
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1465
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1466
1467
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1468
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1469
10.0k
        } else {
1470
10.0k
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1471
10.0k
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1472
1473
10.0k
            const auto downstream_pipeline_id = cur_pipe->id();
1474
10.0k
            if (!_dag.contains(downstream_pipeline_id)) {
1475
8.30k
                _dag.insert({downstream_pipeline_id, {}});
1476
8.30k
            }
1477
10.0k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1478
10.0k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1479
1480
10.0k
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1481
10.0k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1482
10.0k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1483
10.0k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1484
1485
10.0k
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1486
10.0k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1487
10.0k
        }
1488
10.0k
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1489
3.02k
            std::shared_ptr<HashJoinSharedState> shared_state =
1490
3.02k
                    HashJoinSharedState::create_shared(_num_instances);
1491
23.4k
            for (int i = 0; i < _num_instances; i++) {
1492
20.4k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1493
20.4k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1494
20.4k
                sink_dep->set_shared_state(shared_state.get());
1495
20.4k
                shared_state->sink_deps.push_back(sink_dep);
1496
20.4k
            }
1497
3.02k
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1498
3.02k
                                                     op->node_id(), "HASH_JOIN_PROBE");
1499
3.02k
            _op_id_to_shared_state.insert(
1500
3.02k
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1501
3.02k
        }
1502
10.0k
        break;
1503
10.0k
    }
1504
5.98k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1505
5.98k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1506
5.98k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1507
1508
5.98k
        const auto downstream_pipeline_id = cur_pipe->id();
1509
5.98k
        if (!_dag.contains(downstream_pipeline_id)) {
1510
5.73k
            _dag.insert({downstream_pipeline_id, {}});
1511
5.73k
        }
1512
5.98k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1513
5.98k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1514
1515
5.98k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1516
5.98k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1517
5.98k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1518
5.98k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1519
5.98k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1520
5.98k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1521
5.98k
        break;
1522
5.98k
    }
1523
53.3k
    case TPlanNodeType::UNION_NODE: {
1524
53.3k
        int child_count = tnode.num_children;
1525
53.3k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1526
53.3k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1527
1528
53.3k
        const auto downstream_pipeline_id = cur_pipe->id();
1529
53.3k
        if (!_dag.contains(downstream_pipeline_id)) {
1530
52.9k
            _dag.insert({downstream_pipeline_id, {}});
1531
52.9k
        }
1532
55.2k
        for (int i = 0; i < child_count; i++) {
1533
1.87k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1534
1.87k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1535
1.87k
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1536
1.87k
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1537
1.87k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1538
1.87k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1539
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1540
1.87k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1541
1.87k
        }
1542
53.3k
        break;
1543
53.3k
    }
1544
53.3k
    case TPlanNodeType::SORT_NODE: {
1545
46.3k
        const auto should_spill = _runtime_state->enable_spill() &&
1546
46.3k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1547
46.3k
        const bool use_local_merge =
1548
46.3k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1549
46.3k
        if (should_spill) {
1550
9
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1551
46.3k
        } else if (use_local_merge) {
1552
41.0k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1553
41.0k
                                                                 descs);
1554
41.0k
        } else {
1555
5.28k
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1556
5.28k
        }
1557
46.3k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1558
1559
46.3k
        const auto downstream_pipeline_id = cur_pipe->id();
1560
46.3k
        if (!_dag.contains(downstream_pipeline_id)) {
1561
46.3k
            _dag.insert({downstream_pipeline_id, {}});
1562
46.3k
        }
1563
46.3k
        cur_pipe = add_pipeline(cur_pipe);
1564
46.3k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1565
1566
46.3k
        if (should_spill) {
1567
9
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1568
9
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1569
46.3k
        } else {
1570
46.3k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1571
46.3k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1572
46.3k
        }
1573
46.3k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1574
46.3k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1575
46.3k
        break;
1576
46.3k
    }
1577
46.3k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1578
81
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1579
81
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1580
1581
81
        const auto downstream_pipeline_id = cur_pipe->id();
1582
81
        if (!_dag.contains(downstream_pipeline_id)) {
1583
81
            _dag.insert({downstream_pipeline_id, {}});
1584
81
        }
1585
81
        cur_pipe = add_pipeline(cur_pipe);
1586
81
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1587
1588
81
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1589
81
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1590
81
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1591
81
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1592
81
        break;
1593
81
    }
1594
1.86k
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1595
1.86k
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1596
1.86k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1597
1598
1.86k
        const auto downstream_pipeline_id = cur_pipe->id();
1599
1.86k
        if (!_dag.contains(downstream_pipeline_id)) {
1600
1.85k
            _dag.insert({downstream_pipeline_id, {}});
1601
1.85k
        }
1602
1.86k
        cur_pipe = add_pipeline(cur_pipe);
1603
1.86k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1604
1605
1.86k
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1606
1.86k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1607
1.86k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1608
1.86k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1609
1.86k
        break;
1610
1.86k
    }
1611
1.91k
    case TPlanNodeType::MATERIALIZATION_NODE: {
1612
1.91k
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1613
1.91k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1614
1.91k
        break;
1615
1.91k
    }
1616
1.91k
    case TPlanNodeType::INTERSECT_NODE: {
1617
158
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1618
158
                                                                      cur_pipe, sink_ops));
1619
158
        break;
1620
158
    }
1621
158
    case TPlanNodeType::EXCEPT_NODE: {
1622
129
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1623
129
                                                                       cur_pipe, sink_ops));
1624
129
        break;
1625
129
    }
1626
332
    case TPlanNodeType::REPEAT_NODE: {
1627
332
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1628
332
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1629
332
        break;
1630
332
    }
1631
875
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1632
875
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1633
875
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1634
875
        break;
1635
875
    }
1636
875
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1637
230
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1638
230
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1639
230
        break;
1640
230
    }
1641
1.69k
    case TPlanNodeType::EMPTY_SET_NODE: {
1642
1.69k
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1643
1.69k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1644
1.69k
        break;
1645
1.69k
    }
1646
1.69k
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1647
459
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1648
459
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1649
459
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1650
459
        break;
1651
459
    }
1652
2.33k
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1653
2.33k
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1654
2.33k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1655
2.33k
        break;
1656
2.33k
    }
1657
6.98k
    case TPlanNodeType::META_SCAN_NODE: {
1658
6.98k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1659
6.98k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1660
6.98k
        break;
1661
6.98k
    }
1662
6.98k
    case TPlanNodeType::SELECT_NODE: {
1663
2.48k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1664
2.48k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1665
2.48k
        break;
1666
2.48k
    }
1667
2.48k
    case TPlanNodeType::REC_CTE_NODE: {
1668
133
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1669
133
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1670
1671
133
        const auto downstream_pipeline_id = cur_pipe->id();
1672
133
        if (!_dag.contains(downstream_pipeline_id)) {
1673
132
            _dag.insert({downstream_pipeline_id, {}});
1674
132
        }
1675
1676
133
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1677
133
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1678
1679
133
        DataSinkOperatorPtr anchor_sink;
1680
133
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1681
133
                                                                  op->operator_id(), tnode, descs);
1682
133
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1683
133
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1684
133
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1685
1686
133
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1687
133
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1688
1689
133
        DataSinkOperatorPtr rec_sink;
1690
133
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
1691
133
                                                         tnode, descs);
1692
133
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
1693
133
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
1694
133
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
1695
1696
133
        break;
1697
133
    }
1698
1.75k
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
1699
1.75k
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
1700
1.75k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1701
1.75k
        break;
1702
1.75k
    }
1703
1.75k
    default:
1704
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
1705
0
                                     print_plan_node_type(tnode.node_type));
1706
727k
    }
1707
725k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
1708
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
1709
0
        op->set_serial_operator();
1710
0
    }
1711
1712
725k
    return Status::OK();
1713
727k
}
1714
// NOLINTEND(readability-function-cognitive-complexity)
1715
// NOLINTEND(readability-function-size)
1716
1717
template <bool is_intersect>
1718
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
1719
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1720
287
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1721
287
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1722
287
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1723
1724
287
    const auto downstream_pipeline_id = cur_pipe->id();
1725
287
    if (!_dag.contains(downstream_pipeline_id)) {
1726
271
        _dag.insert({downstream_pipeline_id, {}});
1727
271
    }
1728
1729
954
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1730
667
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1731
667
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1732
1733
667
        if (child_id == 0) {
1734
287
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1735
287
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1736
380
        } else {
1737
380
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1738
380
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1739
380
        }
1740
667
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1741
667
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1742
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1743
667
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1744
667
    }
1745
1746
287
    return Status::OK();
1747
287
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1720
158
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1721
158
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1722
158
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1723
1724
158
    const auto downstream_pipeline_id = cur_pipe->id();
1725
158
    if (!_dag.contains(downstream_pipeline_id)) {
1726
150
        _dag.insert({downstream_pipeline_id, {}});
1727
150
    }
1728
1729
552
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1730
394
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1731
394
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1732
1733
394
        if (child_id == 0) {
1734
158
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1735
158
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1736
236
        } else {
1737
236
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1738
236
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1739
236
        }
1740
394
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1741
394
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1742
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1743
394
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1744
394
    }
1745
1746
158
    return Status::OK();
1747
158
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1720
129
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1721
129
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1722
129
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1723
1724
129
    const auto downstream_pipeline_id = cur_pipe->id();
1725
129
    if (!_dag.contains(downstream_pipeline_id)) {
1726
121
        _dag.insert({downstream_pipeline_id, {}});
1727
121
    }
1728
1729
402
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1730
273
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1731
273
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1732
1733
273
        if (child_id == 0) {
1734
129
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1735
129
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1736
144
        } else {
1737
144
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1738
144
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1739
144
        }
1740
273
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1741
273
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1742
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1743
273
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1744
273
    }
1745
1746
129
    return Status::OK();
1747
129
}
1748
1749
457k
Status PipelineFragmentContext::submit() {
1750
457k
    if (_submitted) {
1751
0
        return Status::InternalError("submitted");
1752
0
    }
1753
457k
    _submitted = true;
1754
1755
457k
    int submit_tasks = 0;
1756
457k
    Status st;
1757
457k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
1758
1.26M
    for (auto& task : _tasks) {
1759
2.26M
        for (auto& t : task) {
1760
2.26M
            st = scheduler->submit(t.first);
1761
2.26M
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
1762
2.26M
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
1763
2.26M
            if (!st) {
1764
0
                cancel(Status::InternalError("submit context to executor fail"));
1765
0
                std::lock_guard<std::mutex> l(_task_mutex);
1766
0
                _total_tasks = submit_tasks;
1767
0
                break;
1768
0
            }
1769
2.26M
            submit_tasks++;
1770
2.26M
        }
1771
1.26M
    }
1772
457k
    if (!st.ok()) {
1773
0
        std::lock_guard<std::mutex> l(_task_mutex);
1774
0
        if (_closed_tasks >= _total_tasks) {
1775
0
            _close_fragment_instance();
1776
0
        }
1777
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
1778
0
                                     BackendOptions::get_localhost());
1779
457k
    } else {
1780
457k
        return st;
1781
457k
    }
1782
457k
}
1783
1784
12
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
1785
12
    if (_runtime_state->enable_profile()) {
1786
0
        std::stringstream ss;
1787
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1788
0
            runtime_profile_ptr->pretty_print(&ss);
1789
0
        }
1790
1791
0
        if (_runtime_state->load_channel_profile()) {
1792
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1793
0
        }
1794
1795
0
        auto profile_str =
1796
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
1797
0
                            this->_fragment_id, extra_info, ss.str());
1798
0
        LOG_LONG_STRING(INFO, profile_str);
1799
0
    }
1800
12
}
1801
// If all pipeline tasks binded to the fragment instance are finished, then we could
1802
// close the fragment instance.
1803
460k
void PipelineFragmentContext::_close_fragment_instance() {
1804
460k
    if (_is_fragment_instance_closed) {
1805
0
        return;
1806
0
    }
1807
460k
    Defer defer_op {[&]() {
1808
460k
        _is_fragment_instance_closed = true;
1809
460k
        _notify_cv.notify_all();
1810
460k
    }};
1811
460k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1812
460k
    if (!_need_notify_close) {
1813
457k
        auto st = send_report(true);
1814
457k
        if (!st) {
1815
412k
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1816
412k
                                        print_id(_query_id), _fragment_id, st.to_string());
1817
412k
        }
1818
457k
    }
1819
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
1820
    // Since stream load does not have someting like coordinator on FE, so
1821
    // backend can not report profile to FE, ant its profile can not be shown
1822
    // in the same way with other query. So we print the profile content to info log.
1823
1824
460k
    if (_runtime_state->enable_profile() &&
1825
460k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
1826
2.74k
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
1827
2.74k
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
1828
0
        std::stringstream ss;
1829
        // Compute the _local_time_percent before pretty_print the runtime_profile
1830
        // Before add this operation, the print out like that:
1831
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
1832
        // After add the operation, the print out like that:
1833
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
1834
        // We can easily know the exec node execute time without child time consumed.
1835
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1836
0
            runtime_profile_ptr->pretty_print(&ss);
1837
0
        }
1838
1839
0
        if (_runtime_state->load_channel_profile()) {
1840
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1841
0
        }
1842
1843
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
1844
0
    }
1845
1846
460k
    if (_query_ctx->enable_profile()) {
1847
2.74k
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
1848
2.74k
                                         collect_realtime_load_channel_profile());
1849
2.74k
    }
1850
1851
460k
    if (!_need_notify_close) {
1852
        // all submitted tasks done
1853
457k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1854
457k
    }
1855
460k
}
1856
1857
2.25M
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
1858
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
1859
2.25M
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
1860
2.25M
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
1861
715k
        if (_dag.contains(pipeline_id)) {
1862
360k
            for (auto dep : _dag[pipeline_id]) {
1863
360k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
1864
360k
            }
1865
295k
        }
1866
715k
    }
1867
2.25M
    std::lock_guard<std::mutex> l(_task_mutex);
1868
2.25M
    ++_closed_tasks;
1869
2.25M
    if (_closed_tasks >= _total_tasks) {
1870
460k
        _close_fragment_instance();
1871
460k
    }
1872
2.25M
}
1873
1874
56.7k
std::string PipelineFragmentContext::get_load_error_url() {
1875
56.7k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
1876
0
        return to_load_error_http_path(str);
1877
0
    }
1878
179k
    for (auto& tasks : _tasks) {
1879
300k
        for (auto& task : tasks) {
1880
300k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
1881
162
                return to_load_error_http_path(str);
1882
162
            }
1883
300k
        }
1884
179k
    }
1885
56.6k
    return "";
1886
56.7k
}
1887
1888
56.7k
std::string PipelineFragmentContext::get_first_error_msg() {
1889
56.7k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
1890
0
        return str;
1891
0
    }
1892
179k
    for (auto& tasks : _tasks) {
1893
300k
        for (auto& task : tasks) {
1894
300k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
1895
162
                return str;
1896
162
            }
1897
300k
        }
1898
179k
    }
1899
56.6k
    return "";
1900
56.7k
}
1901
1902
462k
Status PipelineFragmentContext::send_report(bool done) {
1903
462k
    Status exec_status = _query_ctx->exec_status();
1904
    // If plan is done successfully, but _is_report_success is false,
1905
    // no need to send report.
1906
    // Load will set _is_report_success to true because load wants to know
1907
    // the process.
1908
462k
    if (!_is_report_success && done && exec_status.ok()) {
1909
412k
        return Status::NeedSendAgain("");
1910
412k
    }
1911
1912
    // If both _is_report_success and _is_report_on_cancel are false,
1913
    // which means no matter query is success or failed, no report is needed.
1914
    // This may happen when the query limit reached and
1915
    // a internal cancellation being processed
1916
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
1917
    // be set to false, to avoid sending fault report to FE.
1918
50.8k
    if (!_is_report_success && !_is_report_on_cancel) {
1919
262
        return Status::NeedSendAgain("");
1920
262
    }
1921
1922
50.5k
    std::vector<RuntimeState*> runtime_states;
1923
1924
149k
    for (auto& tasks : _tasks) {
1925
233k
        for (auto& task : tasks) {
1926
233k
            runtime_states.push_back(task.second.get());
1927
233k
        }
1928
149k
    }
1929
1930
50.5k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
1931
50.6k
                                        ? get_load_error_url()
1932
18.4E
                                        : _query_ctx->get_load_error_url();
1933
50.5k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
1934
50.6k
                                          ? get_first_error_msg()
1935
18.4E
                                          : _query_ctx->get_first_error_msg();
1936
1937
50.5k
    ReportStatusRequest req {.status = exec_status,
1938
50.5k
                             .runtime_states = runtime_states,
1939
50.5k
                             .done = done || !exec_status.ok(),
1940
50.5k
                             .coord_addr = _query_ctx->coord_addr,
1941
50.5k
                             .query_id = _query_id,
1942
50.5k
                             .fragment_id = _fragment_id,
1943
50.5k
                             .fragment_instance_id = TUniqueId(),
1944
50.5k
                             .backend_num = -1,
1945
50.5k
                             .runtime_state = _runtime_state.get(),
1946
50.5k
                             .load_error_url = load_eror_url,
1947
50.5k
                             .first_error_msg = first_error_msg,
1948
50.5k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
1949
1950
50.5k
    return _report_status_cb(
1951
50.5k
            req, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
1952
50.8k
}
1953
1954
58
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
1955
58
    size_t res = 0;
1956
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
1957
    // here to traverse the vector.
1958
58
    for (const auto& task_instances : _tasks) {
1959
148
        for (const auto& task : task_instances) {
1960
148
            if (task.first->is_running()) {
1961
4
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
1962
2
                                      << " is running, task: " << (void*)task.first.get()
1963
2
                                      << ", is_running: " << task.first->is_running();
1964
4
                *has_running_task = true;
1965
4
                return 0;
1966
4
            }
1967
1968
144
            size_t revocable_size = task.first->get_revocable_size();
1969
144
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
1970
12
                res += revocable_size;
1971
12
            }
1972
144
        }
1973
58
    }
1974
54
    return res;
1975
58
}
1976
1977
108
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
1978
108
    std::vector<PipelineTask*> revocable_tasks;
1979
108
    for (const auto& task_instances : _tasks) {
1980
280
        for (const auto& task : task_instances) {
1981
280
            size_t revocable_size_ = task.first->get_revocable_size();
1982
1983
280
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
1984
24
                revocable_tasks.emplace_back(task.first.get());
1985
24
            }
1986
280
        }
1987
108
    }
1988
108
    return revocable_tasks;
1989
108
}
1990
1991
535
std::string PipelineFragmentContext::debug_string() {
1992
535
    std::lock_guard<std::mutex> l(_task_mutex);
1993
535
    fmt::memory_buffer debug_string_buffer;
1994
535
    fmt::format_to(debug_string_buffer,
1995
535
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
1996
535
                   "need_notify_close={}, has_task_execution_ctx_ref_count={}\n",
1997
535
                   _closed_tasks, _total_tasks, _need_notify_close,
1998
535
                   _has_task_execution_ctx_ref_count);
1999
2.29k
    for (size_t j = 0; j < _tasks.size(); j++) {
2000
1.75k
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2001
4.29k
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2002
2.53k
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2003
2.53k
                           _tasks[j][i].first->debug_string());
2004
2.53k
        }
2005
1.75k
    }
2006
2007
535
    return fmt::to_string(debug_string_buffer);
2008
535
}
2009
2010
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2011
2.74k
PipelineFragmentContext::collect_realtime_profile() const {
2012
2.74k
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2013
2014
    // we do not have mutex to protect pipeline_id_to_profile
2015
    // so we need to make sure this funciton is invoked after fragment context
2016
    // has already been prepared.
2017
2.74k
    if (!_prepared) {
2018
0
        std::string msg =
2019
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2020
0
        DCHECK(false) << msg;
2021
0
        LOG_ERROR(msg);
2022
0
        return res;
2023
0
    }
2024
2025
    // Make sure first profile is fragment level profile
2026
2.74k
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2027
2.74k
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2028
2.74k
    res.push_back(fragment_profile);
2029
2030
    // pipeline_id_to_profile is initialized in prepare stage
2031
4.97k
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2032
4.97k
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2033
4.97k
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2034
4.97k
        res.push_back(profile_ptr);
2035
4.97k
    }
2036
2037
2.74k
    return res;
2038
2.74k
}
2039
2040
std::shared_ptr<TRuntimeProfileTree>
2041
2.74k
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2042
    // we do not have mutex to protect pipeline_id_to_profile
2043
    // so we need to make sure this funciton is invoked after fragment context
2044
    // has already been prepared.
2045
2.74k
    if (!_prepared) {
2046
0
        std::string msg =
2047
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2048
0
        DCHECK(false) << msg;
2049
0
        LOG_ERROR(msg);
2050
0
        return nullptr;
2051
0
    }
2052
2053
8.50k
    for (const auto& tasks : _tasks) {
2054
16.8k
        for (const auto& task : tasks) {
2055
16.8k
            if (task.second->load_channel_profile() == nullptr) {
2056
0
                continue;
2057
0
            }
2058
2059
16.8k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2060
2061
16.8k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2062
16.8k
                                                           _runtime_state->profile_level());
2063
16.8k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2064
16.8k
        }
2065
8.50k
    }
2066
2067
2.74k
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2068
2.74k
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2069
2.74k
                                                      _runtime_state->profile_level());
2070
2.74k
    return load_channel_profile;
2071
2.74k
}
2072
2073
3.04k
Status PipelineFragmentContext::wait_close(bool close) {
2074
3.04k
    if (_exec_env->new_load_stream_mgr()->get(_query_id) != nullptr) {
2075
0
        return Status::InternalError("stream load do not support reset");
2076
0
    }
2077
3.04k
    if (!_need_notify_close) {
2078
0
        return Status::InternalError("_need_notify_close is false, do not support reset");
2079
0
    }
2080
2081
3.04k
    {
2082
3.04k
        std::unique_lock<std::mutex> lock(_task_mutex);
2083
3.10k
        while (!(_is_fragment_instance_closed.load() && !_has_task_execution_ctx_ref_count)) {
2084
54
            if (_query_ctx->is_cancelled()) {
2085
0
                return Status::Cancelled("Query has been cancelled");
2086
0
            }
2087
54
            _notify_cv.wait_for(lock, std::chrono::seconds(1));
2088
54
        }
2089
3.04k
    }
2090
2091
3.04k
    if (close) {
2092
166
        auto st = send_report(true);
2093
166
        if (!st) {
2094
129
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
2095
129
                                        print_id(_query_id), _fragment_id, st.to_string());
2096
129
        }
2097
166
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2098
166
    }
2099
3.04k
    return Status::OK();
2100
3.04k
}
2101
2102
2.88k
Status PipelineFragmentContext::set_to_rerun() {
2103
2.88k
    {
2104
2.88k
        std::lock_guard<std::mutex> l(_task_mutex);
2105
2.88k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2106
4.74k
        for (auto& tasks : _tasks) {
2107
8.58k
            for (const auto& task : tasks) {
2108
8.58k
                task.first->runtime_state()->reset_to_rerun();
2109
8.58k
            }
2110
4.74k
        }
2111
2.88k
    }
2112
2.88k
    _release_resource();
2113
2.88k
    _runtime_state->reset_to_rerun();
2114
2.88k
    return Status::OK();
2115
2.88k
}
2116
2117
2.88k
Status PipelineFragmentContext::rebuild(ThreadPool* thread_pool) {
2118
2.88k
    _submitted = false;
2119
2.88k
    _is_fragment_instance_closed = false;
2120
2.88k
    return _build_and_prepare_full_pipeline(thread_pool);
2121
2.88k
}
2122
2123
461k
void PipelineFragmentContext::_release_resource() {
2124
461k
    std::lock_guard<std::mutex> l(_task_mutex);
2125
    // The memory released by the query end is recorded in the query mem tracker.
2126
461k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2127
461k
    auto st = _query_ctx->exec_status();
2128
1.26M
    for (auto& _task : _tasks) {
2129
1.26M
        if (!_task.empty()) {
2130
1.26M
            _call_back(_task.front().first->runtime_state(), &st);
2131
1.26M
        }
2132
1.26M
    }
2133
461k
    _tasks.clear();
2134
461k
    _dag.clear();
2135
461k
    _pip_id_to_pipeline.clear();
2136
461k
    _pipelines.clear();
2137
461k
    _sink.reset();
2138
461k
    _root_op.reset();
2139
461k
    _runtime_filter_mgr_map.clear();
2140
461k
    _op_id_to_shared_state.clear();
2141
461k
}
2142
2143
#include "common/compile_check_end.h"
2144
} // namespace doris