Coverage Report

Created: 2026-04-09 06:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <pthread.h>
24
25
#include <algorithm>
26
#include <cstdlib>
27
// IWYU pragma: no_include <bits/chrono.h>
28
#include <fmt/format.h>
29
30
#include <chrono> // IWYU pragma: keep
31
#include <map>
32
#include <memory>
33
#include <ostream>
34
#include <utility>
35
36
#include "cloud/config.h"
37
#include "common/cast_set.h"
38
#include "common/config.h"
39
#include "common/exception.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "exec/exchange/local_exchange_sink_operator.h"
43
#include "exec/exchange/local_exchange_source_operator.h"
44
#include "exec/exchange/local_exchanger.h"
45
#include "exec/exchange/vdata_stream_mgr.h"
46
#include "exec/operator/aggregation_sink_operator.h"
47
#include "exec/operator/aggregation_source_operator.h"
48
#include "exec/operator/analytic_sink_operator.h"
49
#include "exec/operator/analytic_source_operator.h"
50
#include "exec/operator/assert_num_rows_operator.h"
51
#include "exec/operator/blackhole_sink_operator.h"
52
#include "exec/operator/cache_sink_operator.h"
53
#include "exec/operator/cache_source_operator.h"
54
#include "exec/operator/datagen_operator.h"
55
#include "exec/operator/dict_sink_operator.h"
56
#include "exec/operator/distinct_streaming_aggregation_operator.h"
57
#include "exec/operator/empty_set_operator.h"
58
#include "exec/operator/es_scan_operator.h"
59
#include "exec/operator/exchange_sink_operator.h"
60
#include "exec/operator/exchange_source_operator.h"
61
#include "exec/operator/file_scan_operator.h"
62
#include "exec/operator/group_commit_block_sink_operator.h"
63
#include "exec/operator/group_commit_scan_operator.h"
64
#include "exec/operator/hashjoin_build_sink.h"
65
#include "exec/operator/hashjoin_probe_operator.h"
66
#include "exec/operator/hive_table_sink_operator.h"
67
#include "exec/operator/iceberg_delete_sink_operator.h"
68
#include "exec/operator/iceberg_merge_sink_operator.h"
69
#include "exec/operator/iceberg_table_sink_operator.h"
70
#include "exec/operator/jdbc_scan_operator.h"
71
#include "exec/operator/jdbc_table_sink_operator.h"
72
#include "exec/operator/local_merge_sort_source_operator.h"
73
#include "exec/operator/materialization_opertor.h"
74
#include "exec/operator/maxcompute_table_sink_operator.h"
75
#include "exec/operator/memory_scratch_sink_operator.h"
76
#include "exec/operator/meta_scan_operator.h"
77
#include "exec/operator/multi_cast_data_stream_sink.h"
78
#include "exec/operator/multi_cast_data_stream_source.h"
79
#include "exec/operator/nested_loop_join_build_operator.h"
80
#include "exec/operator/nested_loop_join_probe_operator.h"
81
#include "exec/operator/olap_scan_operator.h"
82
#include "exec/operator/olap_table_sink_operator.h"
83
#include "exec/operator/olap_table_sink_v2_operator.h"
84
#include "exec/operator/partition_sort_sink_operator.h"
85
#include "exec/operator/partition_sort_source_operator.h"
86
#include "exec/operator/partitioned_aggregation_sink_operator.h"
87
#include "exec/operator/partitioned_aggregation_source_operator.h"
88
#include "exec/operator/partitioned_hash_join_probe_operator.h"
89
#include "exec/operator/partitioned_hash_join_sink_operator.h"
90
#include "exec/operator/rec_cte_anchor_sink_operator.h"
91
#include "exec/operator/rec_cte_scan_operator.h"
92
#include "exec/operator/rec_cte_sink_operator.h"
93
#include "exec/operator/rec_cte_source_operator.h"
94
#include "exec/operator/repeat_operator.h"
95
#include "exec/operator/result_file_sink_operator.h"
96
#include "exec/operator/result_sink_operator.h"
97
#include "exec/operator/schema_scan_operator.h"
98
#include "exec/operator/select_operator.h"
99
#include "exec/operator/set_probe_sink_operator.h"
100
#include "exec/operator/set_sink_operator.h"
101
#include "exec/operator/set_source_operator.h"
102
#include "exec/operator/sort_sink_operator.h"
103
#include "exec/operator/sort_source_operator.h"
104
#include "exec/operator/spill_iceberg_table_sink_operator.h"
105
#include "exec/operator/spill_sort_sink_operator.h"
106
#include "exec/operator/spill_sort_source_operator.h"
107
#include "exec/operator/streaming_aggregation_operator.h"
108
#include "exec/operator/table_function_operator.h"
109
#include "exec/operator/tvf_table_sink_operator.h"
110
#include "exec/operator/union_sink_operator.h"
111
#include "exec/operator/union_source_operator.h"
112
#include "exec/pipeline/dependency.h"
113
#include "exec/pipeline/pipeline_task.h"
114
#include "exec/pipeline/task_scheduler.h"
115
#include "exec/runtime_filter/runtime_filter_mgr.h"
116
#include "exec/sort/topn_sorter.h"
117
#include "exec/spill/spill_file.h"
118
#include "io/fs/stream_load_pipe.h"
119
#include "load/stream_load/new_load_stream_mgr.h"
120
#include "runtime/exec_env.h"
121
#include "runtime/fragment_mgr.h"
122
#include "runtime/result_buffer_mgr.h"
123
#include "runtime/runtime_state.h"
124
#include "runtime/thread_context.h"
125
#include "util/countdown_latch.h"
126
#include "util/debug_util.h"
127
#include "util/uid_util.h"
128
129
namespace doris {
130
#include "common/compile_check_begin.h"
131
PipelineFragmentContext::PipelineFragmentContext(
132
        TUniqueId query_id, const TPipelineFragmentParams& request,
133
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
134
        const std::function<void(RuntimeState*, Status*)>& call_back,
135
        report_status_callback report_status_cb)
136
109k
        : _query_id(std::move(query_id)),
137
109k
          _fragment_id(request.fragment_id),
138
109k
          _exec_env(exec_env),
139
109k
          _query_ctx(std::move(query_ctx)),
140
109k
          _call_back(call_back),
141
109k
          _is_report_on_cancel(true),
142
109k
          _report_status_cb(std::move(report_status_cb)),
143
109k
          _params(request),
144
109k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
145
109k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
146
109k
                                                               : false) {
147
109k
    _fragment_watcher.start();
148
109k
}
149
150
109k
PipelineFragmentContext::~PipelineFragmentContext() {
151
109k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
152
109k
            .tag("query_id", print_id(_query_id))
153
109k
            .tag("fragment_id", _fragment_id);
154
109k
    _release_resource();
155
109k
    {
156
        // The memory released by the query end is recorded in the query mem tracker.
157
109k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
158
109k
        _runtime_state.reset();
159
109k
        _query_ctx.reset();
160
109k
    }
161
109k
}
162
163
0
bool PipelineFragmentContext::is_timeout(timespec now) const {
164
0
    if (_timeout <= 0) {
165
0
        return false;
166
0
    }
167
0
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
168
0
}
169
170
// notify_close() transitions the PFC from "waiting for external close notification" to
171
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
172
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
173
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
174
782
bool PipelineFragmentContext::notify_close() {
175
782
    bool all_closed = false;
176
782
    bool need_remove = false;
177
782
    {
178
782
        std::lock_guard<std::mutex> l(_task_mutex);
179
782
        if (_closed_tasks >= _total_tasks) {
180
14
            if (_need_notify_close) {
181
                // Fragment was cancelled and waiting for notify to close.
182
                // Record that we need to remove from fragment mgr, but do it
183
                // after releasing _task_mutex to avoid ABBA deadlock with
184
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
185
                // first, then _task_mutex via debug_string()).
186
0
                need_remove = true;
187
0
            }
188
14
            all_closed = true;
189
14
        }
190
        // make fragment release by self after cancel
191
782
        _need_notify_close = false;
192
782
    }
193
782
    if (need_remove) {
194
0
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
195
0
    }
196
782
    return all_closed;
197
782
}
198
199
// Must not add lock in this method. Because it will call query ctx cancel. And
200
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
201
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
202
// There maybe dead lock.
203
782
void PipelineFragmentContext::cancel(const Status reason) {
204
782
    LOG_INFO("PipelineFragmentContext::cancel")
205
782
            .tag("query_id", print_id(_query_id))
206
782
            .tag("fragment_id", _fragment_id)
207
782
            .tag("reason", reason.to_string());
208
782
    if (notify_close()) {
209
14
        return;
210
14
    }
211
    // Timeout is a special error code, we need print current stack to debug timeout issue.
212
768
    if (reason.is<ErrorCode::TIMEOUT>()) {
213
0
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
214
0
                                   debug_string());
215
0
        LOG_LONG_STRING(WARNING, dbg_str);
216
0
    }
217
218
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
219
768
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
220
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
221
0
                    debug_string());
222
0
    }
223
224
768
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
225
12
        print_profile("cancel pipeline, reason: " + reason.to_string());
226
12
    }
227
228
768
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
229
0
        _query_ctx->set_load_error_url(error_url);
230
0
    }
231
232
768
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
233
0
        _query_ctx->set_first_error_msg(first_error_msg);
234
0
    }
235
236
768
    _query_ctx->cancel(reason, _fragment_id);
237
768
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
238
210
        _is_report_on_cancel = false;
239
558
    } else {
240
3.21k
        for (auto& id : _fragment_instance_ids) {
241
3.21k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
242
3.21k
        }
243
558
    }
244
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
245
    // For stream load the fragment's query_id == load id, it is set in FE.
246
768
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
247
768
    if (stream_load_ctx != nullptr) {
248
0
        stream_load_ctx->pipe->cancel(reason.to_string());
249
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
250
        // We need to set the error URL at this point to ensure error information is properly
251
        // propagated to the client.
252
0
        stream_load_ctx->error_url = get_load_error_url();
253
0
        stream_load_ctx->first_error_msg = get_first_error_msg();
254
0
    }
255
256
3.52k
    for (auto& tasks : _tasks) {
257
8.56k
        for (auto& task : tasks) {
258
8.56k
            task.first->unblock_all_dependencies();
259
8.56k
        }
260
3.52k
    }
261
768
}
262
263
166k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
264
166k
    PipelineId id = _next_pipeline_id++;
265
166k
    auto pipeline = std::make_shared<Pipeline>(
266
166k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
267
166k
            parent ? parent->num_tasks() : _num_instances);
268
166k
    if (idx >= 0) {
269
15.6k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
270
150k
    } else {
271
150k
        _pipelines.emplace_back(pipeline);
272
150k
    }
273
166k
    if (parent) {
274
52.6k
        parent->set_children(pipeline);
275
52.6k
    }
276
166k
    return pipeline;
277
166k
}
278
279
109k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
280
109k
    {
281
109k
        SCOPED_TIMER(_build_pipelines_timer);
282
        // 2. Build pipelines with operators in this fragment.
283
109k
        auto root_pipeline = add_pipeline();
284
109k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
285
109k
                                         &_root_op, root_pipeline));
286
287
        // 3. Create sink operator
288
109k
        if (!_params.fragment.__isset.output_sink) {
289
0
            return Status::InternalError("No output sink in this fragment!");
290
0
        }
291
109k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
292
109k
                                          _params.fragment.output_exprs, _params,
293
109k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
294
109k
                                          *_desc_tbl, root_pipeline->id()));
295
109k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
296
109k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
297
298
150k
        for (PipelinePtr& pipeline : _pipelines) {
299
18.4E
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
300
150k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
301
150k
        }
302
109k
    }
303
    // 4. Build local exchanger
304
109k
    if (_runtime_state->enable_local_shuffle()) {
305
109k
        SCOPED_TIMER(_plan_local_exchanger_timer);
306
109k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
307
109k
                                             _params.bucket_seq_to_instance_idx,
308
109k
                                             _params.shuffle_idx_to_instance_idx));
309
109k
    }
310
311
    // 5. Initialize global states in pipelines.
312
166k
    for (PipelinePtr& pipeline : _pipelines) {
313
166k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
314
166k
        pipeline->children().clear();
315
166k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
316
166k
    }
317
318
109k
    {
319
109k
        SCOPED_TIMER(_build_tasks_timer);
320
        // 6. Build pipeline tasks and initialize local state.
321
109k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
322
109k
    }
323
324
109k
    return Status::OK();
325
109k
}
326
327
109k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
328
109k
    if (_prepared) {
329
0
        return Status::InternalError("Already prepared");
330
0
    }
331
109k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
332
109k
        _timeout = _params.query_options.execution_timeout;
333
109k
    }
334
335
109k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
336
109k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
337
109k
    SCOPED_TIMER(_prepare_timer);
338
109k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
339
109k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
340
109k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
341
109k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
342
109k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
343
109k
    {
344
109k
        SCOPED_TIMER(_init_context_timer);
345
109k
        cast_set(_num_instances, _params.local_params.size());
346
109k
        _total_instances =
347
109k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
348
349
109k
        auto* fragment_context = this;
350
351
109k
        if (_params.query_options.__isset.is_report_success) {
352
109k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
353
109k
        }
354
355
        // 1. Set up the global runtime state.
356
109k
        _runtime_state = RuntimeState::create_unique(
357
109k
                _params.query_id, _params.fragment_id, _params.query_options,
358
109k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
359
109k
        _runtime_state->set_task_execution_context(shared_from_this());
360
109k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
361
109k
        if (_params.__isset.backend_id) {
362
109k
            _runtime_state->set_backend_id(_params.backend_id);
363
109k
        }
364
109k
        if (_params.__isset.import_label) {
365
0
            _runtime_state->set_import_label(_params.import_label);
366
0
        }
367
109k
        if (_params.__isset.db_name) {
368
0
            _runtime_state->set_db_name(_params.db_name);
369
0
        }
370
109k
        if (_params.__isset.load_job_id) {
371
0
            _runtime_state->set_load_job_id(_params.load_job_id);
372
0
        }
373
374
109k
        if (_params.is_simplified_param) {
375
33.4k
            _desc_tbl = _query_ctx->desc_tbl;
376
75.9k
        } else {
377
75.9k
            DCHECK(_params.__isset.desc_tbl);
378
75.9k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
379
75.9k
                                                  &_desc_tbl));
380
75.9k
        }
381
109k
        _runtime_state->set_desc_tbl(_desc_tbl);
382
109k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
383
109k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
384
109k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
385
109k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
386
387
        // init fragment_instance_ids
388
109k
        const auto target_size = _params.local_params.size();
389
109k
        _fragment_instance_ids.resize(target_size);
390
332k
        for (size_t i = 0; i < _params.local_params.size(); i++) {
391
222k
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
392
222k
            _fragment_instance_ids[i] = fragment_instance_id;
393
222k
        }
394
109k
    }
395
396
109k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
397
398
109k
    _init_next_report_time();
399
400
109k
    _prepared = true;
401
109k
    return Status::OK();
402
109k
}
403
404
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
405
        int instance_idx,
406
223k
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
407
223k
    const auto& local_params = _params.local_params[instance_idx];
408
223k
    auto fragment_instance_id = local_params.fragment_instance_id;
409
223k
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
410
223k
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
411
223k
    auto get_shared_state = [&](PipelinePtr pipeline)
412
223k
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
413
390k
                                       std::vector<std::shared_ptr<Dependency>>>> {
414
390k
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
415
390k
                                std::vector<std::shared_ptr<Dependency>>>>
416
390k
                shared_state_map;
417
409k
        for (auto& op : pipeline->operators()) {
418
409k
            auto source_id = op->operator_id();
419
409k
            if (auto iter = _op_id_to_shared_state.find(source_id);
420
409k
                iter != _op_id_to_shared_state.end()) {
421
131k
                shared_state_map.insert({source_id, iter->second});
422
131k
            }
423
409k
        }
424
393k
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
425
393k
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
426
393k
                iter != _op_id_to_shared_state.end()) {
427
37.5k
                shared_state_map.insert({sink_to_source_id, iter->second});
428
37.5k
            }
429
393k
        }
430
390k
        return shared_state_map;
431
390k
    };
432
433
707k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
434
484k
        auto& pipeline = _pipelines[pip_idx];
435
484k
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
436
390k
            auto task_runtime_state = RuntimeState::create_unique(
437
390k
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
438
390k
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
439
390k
            {
440
                // Initialize runtime state for this task
441
390k
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
442
443
390k
                task_runtime_state->set_task_execution_context(shared_from_this());
444
390k
                task_runtime_state->set_be_number(local_params.backend_num);
445
446
390k
                if (_params.__isset.backend_id) {
447
390k
                    task_runtime_state->set_backend_id(_params.backend_id);
448
390k
                }
449
390k
                if (_params.__isset.import_label) {
450
0
                    task_runtime_state->set_import_label(_params.import_label);
451
0
                }
452
390k
                if (_params.__isset.db_name) {
453
0
                    task_runtime_state->set_db_name(_params.db_name);
454
0
                }
455
390k
                if (_params.__isset.load_job_id) {
456
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
457
0
                }
458
390k
                if (_params.__isset.wal_id) {
459
0
                    task_runtime_state->set_wal_id(_params.wal_id);
460
0
                }
461
390k
                if (_params.__isset.content_length) {
462
0
                    task_runtime_state->set_content_length(_params.content_length);
463
0
                }
464
465
390k
                task_runtime_state->set_desc_tbl(_desc_tbl);
466
390k
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
467
390k
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
468
390k
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
469
390k
                task_runtime_state->set_max_operator_id(max_operator_id());
470
390k
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
471
390k
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
472
390k
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
473
474
390k
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
475
390k
            }
476
390k
            auto cur_task_id = _total_tasks++;
477
390k
            task_runtime_state->set_task_id(cur_task_id);
478
390k
            task_runtime_state->set_task_num(pipeline->num_tasks());
479
390k
            auto task = std::make_shared<PipelineTask>(
480
390k
                    pipeline, cur_task_id, task_runtime_state.get(),
481
390k
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
482
390k
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
483
390k
                    instance_idx);
484
390k
            pipeline->incr_created_tasks(instance_idx, task.get());
485
390k
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
486
390k
            _tasks[instance_idx].emplace_back(
487
390k
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
488
390k
                            std::move(task), std::move(task_runtime_state)});
489
390k
        }
490
484k
    }
491
492
    /**
493
         * Build DAG for pipeline tasks.
494
         * For example, we have
495
         *
496
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
497
         *            \                      /
498
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
499
         *                 \                          /
500
         *               JoinProbeOperator2 (Pipeline1)
501
         *
502
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
503
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
504
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
505
         *
506
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
507
         * and JoinProbeOperator2.
508
         */
509
484k
    for (auto& _pipeline : _pipelines) {
510
484k
        if (pipeline_id_to_task.contains(_pipeline->id())) {
511
390k
            auto* task = pipeline_id_to_task[_pipeline->id()];
512
390k
            DCHECK(task != nullptr);
513
514
            // If this task has upstream dependency, then inject it into this task.
515
390k
            if (_dag.contains(_pipeline->id())) {
516
257k
                auto& deps = _dag[_pipeline->id()];
517
387k
                for (auto& dep : deps) {
518
387k
                    if (pipeline_id_to_task.contains(dep)) {
519
198k
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
520
198k
                        if (ss) {
521
129k
                            task->inject_shared_state(ss);
522
129k
                        } else {
523
69.0k
                            pipeline_id_to_task[dep]->inject_shared_state(
524
69.0k
                                    task->get_source_shared_state());
525
69.0k
                        }
526
198k
                    }
527
387k
                }
528
257k
            }
529
390k
        }
530
484k
    }
531
707k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
532
484k
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
533
390k
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
534
390k
            DCHECK(pipeline_id_to_profile[pip_idx]);
535
390k
            std::vector<TScanRangeParams> scan_ranges;
536
390k
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
537
390k
            if (local_params.per_node_scan_ranges.contains(node_id)) {
538
84.8k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
539
84.8k
            }
540
390k
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
541
390k
                                                             _params.fragment.output_sink));
542
390k
        }
543
484k
    }
544
223k
    {
545
223k
        std::lock_guard<std::mutex> l(_state_map_lock);
546
223k
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
547
223k
    }
548
223k
    return Status::OK();
549
223k
}
550
551
109k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
552
109k
    _total_tasks = 0;
553
109k
    _closed_tasks = 0;
554
109k
    const auto target_size = _params.local_params.size();
555
109k
    _tasks.resize(target_size);
556
109k
    _runtime_filter_mgr_map.resize(target_size);
557
275k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
558
166k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
559
166k
    }
560
109k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
561
562
109k
    if (target_size > 1 &&
563
109k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
564
16.2k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
565
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
566
4
        std::vector<Status> prepare_status(target_size);
567
4
        int submitted_tasks = 0;
568
4
        Status submit_status;
569
4
        CountDownLatch latch((int)target_size);
570
24
        for (int i = 0; i < target_size; i++) {
571
20
            submit_status = thread_pool->submit_func([&, i]() {
572
20
                SCOPED_ATTACH_TASK(_query_ctx.get());
573
20
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
574
20
                latch.count_down();
575
20
            });
576
20
            if (LIKELY(submit_status.ok())) {
577
20
                submitted_tasks++;
578
20
            } else {
579
0
                break;
580
0
            }
581
20
        }
582
4
        latch.arrive_and_wait(target_size - submitted_tasks);
583
4
        if (UNLIKELY(!submit_status.ok())) {
584
0
            return submit_status;
585
0
        }
586
24
        for (int i = 0; i < submitted_tasks; i++) {
587
20
            if (!prepare_status[i].ok()) {
588
0
                return prepare_status[i];
589
0
            }
590
20
        }
591
109k
    } else {
592
332k
        for (int i = 0; i < target_size; i++) {
593
223k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
594
223k
        }
595
109k
    }
596
109k
    _pipeline_parent_map.clear();
597
109k
    _op_id_to_shared_state.clear();
598
599
109k
    return Status::OK();
600
109k
}
601
602
109k
void PipelineFragmentContext::_init_next_report_time() {
603
109k
    auto interval_s = config::pipeline_status_report_interval;
604
109k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
605
10.1k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
606
10.1k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
607
        // We don't want to wait longer than it takes to run the entire fragment.
608
10.1k
        _previous_report_time =
609
10.1k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
610
10.1k
        _disable_period_report = false;
611
10.1k
    }
612
109k
}
613
614
1.21k
void PipelineFragmentContext::refresh_next_report_time() {
615
1.21k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
616
1.21k
    DCHECK(disable == true);
617
1.21k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
618
1.21k
    _disable_period_report.compare_exchange_strong(disable, false);
619
1.21k
}
620
621
1.52M
void PipelineFragmentContext::trigger_report_if_necessary() {
622
1.52M
    if (!_is_report_success) {
623
1.40M
        return;
624
1.40M
    }
625
115k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
626
115k
    if (disable) {
627
2.97k
        return;
628
2.97k
    }
629
112k
    int32_t interval_s = config::pipeline_status_report_interval;
630
112k
    if (interval_s <= 0) {
631
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
632
0
                        "trigger "
633
0
                        "report.";
634
0
    }
635
112k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
636
112k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
637
112k
    if (MonotonicNanos() > next_report_time) {
638
1.21k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
639
1.21k
                                                            std::memory_order_acq_rel)) {
640
4
            return;
641
4
        }
642
1.20k
        if (VLOG_FILE_IS_ON) {
643
0
            VLOG_FILE << "Reporting "
644
0
                      << "profile for query_id " << print_id(_query_id)
645
0
                      << ", fragment id: " << _fragment_id;
646
647
0
            std::stringstream ss;
648
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
649
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
650
0
            if (_runtime_state->load_channel_profile()) {
651
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
652
0
            }
653
654
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
655
0
                      << " profile:\n"
656
0
                      << ss.str();
657
0
        }
658
1.20k
        auto st = send_report(false);
659
1.20k
        if (!st.ok()) {
660
0
            disable = true;
661
0
            _disable_period_report.compare_exchange_strong(disable, false,
662
0
                                                           std::memory_order_acq_rel);
663
0
        }
664
1.20k
    }
665
112k
}
666
667
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
668
109k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
669
109k
    if (_params.fragment.plan.nodes.empty()) {
670
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
671
0
    }
672
673
109k
    int node_idx = 0;
674
675
109k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
676
109k
                                        &node_idx, root, cur_pipe, 0, false, false));
677
678
109k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
679
0
        return Status::InternalError(
680
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
681
0
    }
682
109k
    return Status::OK();
683
109k
}
684
685
Status PipelineFragmentContext::_create_tree_helper(
686
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
687
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
688
154k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
689
    // propagate error case
690
154k
    if (*node_idx >= tnodes.size()) {
691
0
        return Status::InternalError(
692
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
693
0
                *node_idx, tnodes.size());
694
0
    }
695
154k
    const TPlanNode& tnode = tnodes[*node_idx];
696
697
154k
    int num_children = tnodes[*node_idx].num_children;
698
154k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
699
154k
    bool current_require_bucket_distribution = require_bucket_distribution;
700
    // TODO: Create CacheOperator is confused now
701
154k
    OperatorPtr op = nullptr;
702
154k
    OperatorPtr cache_op = nullptr;
703
154k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
704
154k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
705
154k
                                     followed_by_shuffled_operator,
706
154k
                                     current_require_bucket_distribution, cache_op));
707
    // Initialization must be done here. For example, group by expressions in agg will be used to
708
    // decide if a local shuffle should be planed, so it must be initialized here.
709
154k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
710
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
711
154k
    if (parent != nullptr) {
712
        // add to parent's child(s)
713
45.3k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
714
109k
    } else {
715
109k
        *root = op;
716
109k
    }
717
    /**
718
     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
719
     *
720
     * For plan:
721
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
722
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
723
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
724
     *
725
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
726
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
727
     */
728
154k
    auto required_data_distribution =
729
154k
            cur_pipe->operators().empty()
730
154k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
731
154k
                    : op->required_data_distribution(_runtime_state.get());
732
154k
    current_followed_by_shuffled_operator =
733
154k
            ((followed_by_shuffled_operator ||
734
154k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
735
152k
                                             : op->is_shuffled_operator())) &&
736
154k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
737
154k
            (followed_by_shuffled_operator &&
738
150k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
739
740
154k
    current_require_bucket_distribution =
741
154k
            ((require_bucket_distribution ||
742
154k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
743
152k
                                             : op->is_colocated_operator())) &&
744
154k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
745
154k
            (require_bucket_distribution &&
746
150k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
747
748
154k
    if (num_children == 0) {
749
113k
        _use_serial_source = op->is_serial_operator();
750
113k
    }
751
    // rely on that tnodes is preorder of the plan
752
199k
    for (int i = 0; i < num_children; i++) {
753
45.3k
        ++*node_idx;
754
45.3k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
755
45.3k
                                            current_followed_by_shuffled_operator,
756
45.3k
                                            current_require_bucket_distribution));
757
758
        // we are expecting a child, but have used all nodes
759
        // this means we have been given a bad tree and must fail
760
45.3k
        if (*node_idx >= tnodes.size()) {
761
0
            return Status::InternalError(
762
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
763
0
                    "nodes: {}",
764
0
                    *node_idx, tnodes.size());
765
0
        }
766
45.3k
    }
767
768
154k
    return Status::OK();
769
154k
}
770
771
void PipelineFragmentContext::_inherit_pipeline_properties(
772
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
773
15.6k
        PipelinePtr pipe_with_sink) {
774
15.6k
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
775
15.6k
    pipe_with_source->set_num_tasks(_num_instances);
776
15.6k
    pipe_with_source->set_data_distribution(data_distribution);
777
15.6k
}
778
779
Status PipelineFragmentContext::_add_local_exchange_impl(
780
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
781
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
782
        const std::map<int, int>& bucket_seq_to_instance_idx,
783
15.6k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
784
15.6k
    auto& operators = cur_pipe->operators();
785
15.6k
    const auto downstream_pipeline_id = cur_pipe->id();
786
15.6k
    auto local_exchange_id = next_operator_id();
787
    // 1. Create a new pipeline with local exchange sink.
788
15.6k
    DataSinkOperatorPtr sink;
789
15.6k
    auto sink_id = next_sink_operator_id();
790
791
    /**
792
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
793
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
794
     */
795
15.6k
    const bool followed_by_shuffled_operator =
796
15.6k
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
797
15.6k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
798
15.6k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
799
15.6k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
800
15.6k
                                         followed_by_shuffled_operator && !_use_serial_source;
801
15.6k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
802
15.6k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
803
15.6k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
804
15.6k
    if (bucket_seq_to_instance_idx.empty() &&
805
15.6k
        data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
806
0
        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
807
0
    }
808
15.6k
    RETURN_IF_ERROR(new_pip->set_sink(sink));
809
15.6k
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
810
15.6k
                                          num_buckets, use_global_hash_shuffle,
811
15.6k
                                          shuffle_idx_to_instance_idx));
812
813
    // 2. Create and initialize LocalExchangeSharedState.
814
15.6k
    std::shared_ptr<LocalExchangeSharedState> shared_state =
815
15.6k
            LocalExchangeSharedState::create_shared(_num_instances);
816
15.6k
    switch (data_distribution.distribution_type) {
817
4
    case ExchangeType::HASH_SHUFFLE:
818
4
        shared_state->exchanger = ShuffleExchanger::create_unique(
819
4
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
820
4
                use_global_hash_shuffle ? _total_instances : _num_instances,
821
4
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
822
4
                        ? cast_set<int>(
823
4
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
824
4
                        : 0);
825
4
        break;
826
8
    case ExchangeType::BUCKET_HASH_SHUFFLE:
827
8
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
828
8
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
829
8
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
830
8
                        ? cast_set<int>(
831
8
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
832
8
                        : 0);
833
8
        break;
834
14.8k
    case ExchangeType::PASSTHROUGH:
835
14.8k
        shared_state->exchanger = PassthroughExchanger::create_unique(
836
14.8k
                cur_pipe->num_tasks(), _num_instances,
837
14.8k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
838
14.8k
                        ? cast_set<int>(
839
14.8k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
840
14.8k
                        : 0);
841
14.8k
        break;
842
58
    case ExchangeType::BROADCAST:
843
58
        shared_state->exchanger = BroadcastExchanger::create_unique(
844
58
                cur_pipe->num_tasks(), _num_instances,
845
58
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
846
58
                        ? cast_set<int>(
847
58
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
848
58
                        : 0);
849
58
        break;
850
666
    case ExchangeType::PASS_TO_ONE:
851
666
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
852
            // If shared hash table is enabled for BJ, hash table will be built by only one task
853
666
            shared_state->exchanger = PassToOneExchanger::create_unique(
854
666
                    cur_pipe->num_tasks(), _num_instances,
855
666
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
856
666
                            ? cast_set<int>(_runtime_state->query_options()
857
666
                                                    .local_exchange_free_blocks_limit)
858
666
                            : 0);
859
666
        } else {
860
0
            shared_state->exchanger = BroadcastExchanger::create_unique(
861
0
                    cur_pipe->num_tasks(), _num_instances,
862
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
863
0
                            ? cast_set<int>(_runtime_state->query_options()
864
0
                                                    .local_exchange_free_blocks_limit)
865
0
                            : 0);
866
0
        }
867
666
        break;
868
58
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
869
58
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
870
58
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
871
58
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
872
58
                        ? cast_set<int>(
873
58
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
874
58
                        : 0);
875
58
        break;
876
0
    default:
877
0
        return Status::InternalError("Unsupported local exchange type : " +
878
0
                                     std::to_string((int)data_distribution.distribution_type));
879
15.6k
    }
880
15.6k
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
881
15.6k
                                             "LOCAL_EXCHANGE_OPERATOR");
882
15.6k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
883
15.6k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
884
885
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
886
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
887
888
    // 3.1 Initialize new pipeline's operator list.
889
15.6k
    std::copy(operators.begin(), operators.begin() + idx,
890
15.6k
              std::inserter(new_pip->operators(), new_pip->operators().end()));
891
892
    // 3.2 Erase unused operators in previous pipeline.
893
15.6k
    operators.erase(operators.begin(), operators.begin() + idx);
894
895
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
896
15.6k
    OperatorPtr source_op;
897
15.6k
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
898
15.6k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
899
15.6k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
900
15.6k
    if (!operators.empty()) {
901
1.38k
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
902
1.38k
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
903
1.38k
    }
904
15.6k
    operators.insert(operators.begin(), source_op);
905
906
    // 5. Set children for two pipelines separately.
907
15.6k
    std::vector<std::shared_ptr<Pipeline>> new_children;
908
15.6k
    std::vector<PipelineId> edges_with_source;
909
17.7k
    for (auto child : cur_pipe->children()) {
910
17.7k
        bool found = false;
911
18.7k
        for (auto op : new_pip->operators()) {
912
18.7k
            if (child->sink()->node_id() == op->node_id()) {
913
1.33k
                new_pip->set_children(child);
914
1.33k
                found = true;
915
1.33k
            };
916
18.7k
        }
917
17.7k
        if (!found) {
918
16.4k
            new_children.push_back(child);
919
16.4k
            edges_with_source.push_back(child->id());
920
16.4k
        }
921
17.7k
    }
922
15.6k
    new_children.push_back(new_pip);
923
15.6k
    edges_with_source.push_back(new_pip->id());
924
925
    // 6. Set DAG for new pipelines.
926
15.6k
    if (!new_pip->children().empty()) {
927
864
        std::vector<PipelineId> edges_with_sink;
928
1.33k
        for (auto child : new_pip->children()) {
929
1.33k
            edges_with_sink.push_back(child->id());
930
1.33k
        }
931
864
        _dag.insert({new_pip->id(), edges_with_sink});
932
864
    }
933
15.6k
    cur_pipe->set_children(new_children);
934
15.6k
    _dag[downstream_pipeline_id] = edges_with_source;
935
15.6k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
936
15.6k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
937
15.6k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
938
939
    // 7. Inherit properties from current pipeline.
940
15.6k
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
941
15.6k
    return Status::OK();
942
15.6k
}
943
944
Status PipelineFragmentContext::_add_local_exchange(
945
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
946
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
947
        const std::map<int, int>& bucket_seq_to_instance_idx,
948
31.4k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
949
31.4k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
950
15.1k
        return Status::OK();
951
15.1k
    }
952
953
16.3k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
954
780
        return Status::OK();
955
780
    }
956
15.6k
    *do_local_exchange = true;
957
958
15.6k
    auto& operators = cur_pipe->operators();
959
15.6k
    auto total_op_num = operators.size();
960
15.6k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
961
15.6k
    RETURN_IF_ERROR(_add_local_exchange_impl(
962
15.6k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
963
15.6k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
964
965
15.6k
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
966
2
            << "total_op_num: " << total_op_num
967
2
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
968
2
            << " new_pip->operators().size(): " << new_pip->operators().size();
969
970
    // There are some local shuffles with relatively heavy operations on the sink.
971
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
972
    // Therefore, local passthrough is used to increase the concurrency of the sink.
973
    // op -> local sink(1) -> local source (n)
974
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
975
15.6k
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
976
15.6k
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
977
70
        RETURN_IF_ERROR(_add_local_exchange_impl(
978
70
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
979
70
                add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
980
70
                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
981
70
                shuffle_idx_to_instance_idx));
982
70
    }
983
15.6k
    return Status::OK();
984
15.6k
}
985
986
Status PipelineFragmentContext::_plan_local_exchange(
987
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
988
109k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
989
259k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
990
150k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
991
        // Set property if child pipeline is not join operator's child.
992
150k
        if (!_pipelines[pip_idx]->children().empty()) {
993
36.9k
            for (auto& child : _pipelines[pip_idx]->children()) {
994
36.9k
                if (child->sink()->node_id() ==
995
36.9k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
996
33.2k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
997
33.2k
                }
998
36.9k
            }
999
35.0k
        }
1000
1001
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1002
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1003
        // still keep colocate plan after local shuffle
1004
150k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1005
150k
                                             bucket_seq_to_instance_idx,
1006
150k
                                             shuffle_idx_to_instance_idx));
1007
150k
    }
1008
109k
    return Status::OK();
1009
109k
}
1010
1011
Status PipelineFragmentContext::_plan_local_exchange(
1012
        int num_buckets, int pip_idx, PipelinePtr pip,
1013
        const std::map<int, int>& bucket_seq_to_instance_idx,
1014
150k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1015
150k
    int idx = 1;
1016
150k
    bool do_local_exchange = false;
1017
151k
    do {
1018
151k
        auto& ops = pip->operators();
1019
151k
        do_local_exchange = false;
1020
        // Plan local exchange for each operator.
1021
158k
        for (; idx < ops.size();) {
1022
8.38k
            if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1023
4.55k
                RETURN_IF_ERROR(_add_local_exchange(
1024
4.55k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
1025
4.55k
                        ops[idx]->required_data_distribution(_runtime_state.get()),
1026
4.55k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1027
4.55k
                        shuffle_idx_to_instance_idx));
1028
4.55k
            }
1029
8.38k
            if (do_local_exchange) {
1030
                // If local exchange is needed for current operator, we will split this pipeline to
1031
                // two pipelines by local exchange sink/source. And then we need to process remaining
1032
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1033
                // is current operator was already processed) and continue to plan local exchange.
1034
1.38k
                idx = 2;
1035
1.38k
                break;
1036
1.38k
            }
1037
7.00k
            idx++;
1038
7.00k
        }
1039
151k
    } while (do_local_exchange);
1040
150k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1041
26.9k
        RETURN_IF_ERROR(_add_local_exchange(
1042
26.9k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1043
26.9k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1044
26.9k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1045
26.9k
    }
1046
150k
    return Status::OK();
1047
150k
}
1048
1049
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1050
                                                  const std::vector<TExpr>& output_exprs,
1051
                                                  const TPipelineFragmentParams& params,
1052
                                                  const RowDescriptor& row_desc,
1053
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1054
109k
                                                  PipelineId cur_pipeline_id) {
1055
109k
    switch (thrift_sink.type) {
1056
31.9k
    case TDataSinkType::DATA_STREAM_SINK: {
1057
31.9k
        if (!thrift_sink.__isset.stream_sink) {
1058
0
            return Status::InternalError("Missing data stream sink.");
1059
0
        }
1060
31.9k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1061
31.9k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1062
31.9k
                params.destinations, _fragment_instance_ids);
1063
31.9k
        break;
1064
31.9k
    }
1065
69.7k
    case TDataSinkType::RESULT_SINK: {
1066
69.7k
        if (!thrift_sink.__isset.result_sink) {
1067
0
            return Status::InternalError("Missing data buffer sink.");
1068
0
        }
1069
1070
69.7k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), row_desc,
1071
69.7k
                                                      output_exprs, thrift_sink.result_sink);
1072
69.7k
        break;
1073
69.7k
    }
1074
54
    case TDataSinkType::DICTIONARY_SINK: {
1075
54
        if (!thrift_sink.__isset.dictionary_sink) {
1076
0
            return Status::InternalError("Missing dict sink.");
1077
0
        }
1078
1079
54
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1080
54
                                                    thrift_sink.dictionary_sink);
1081
54
        break;
1082
54
    }
1083
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1084
2.40k
    case TDataSinkType::OLAP_TABLE_SINK: {
1085
2.40k
        if (state->query_options().enable_memtable_on_sink_node &&
1086
2.40k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1087
2.40k
            !config::is_cloud_mode()) {
1088
2.05k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(pool, next_sink_operator_id(),
1089
2.05k
                                                               row_desc, output_exprs);
1090
2.05k
        } else {
1091
358
            _sink = std::make_shared<OlapTableSinkOperatorX>(pool, next_sink_operator_id(),
1092
358
                                                             row_desc, output_exprs);
1093
358
        }
1094
2.40k
        break;
1095
0
    }
1096
0
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1097
0
        DCHECK(thrift_sink.__isset.olap_table_sink);
1098
0
        DCHECK(state->get_query_ctx() != nullptr);
1099
0
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1100
0
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1101
0
                                                                output_exprs);
1102
0
        break;
1103
0
    }
1104
1.46k
    case TDataSinkType::HIVE_TABLE_SINK: {
1105
1.46k
        if (!thrift_sink.__isset.hive_table_sink) {
1106
0
            return Status::InternalError("Missing hive table sink.");
1107
0
        }
1108
1.46k
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1109
1.46k
                                                         output_exprs);
1110
1.46k
        break;
1111
1.46k
    }
1112
1.73k
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1113
1.73k
        if (!thrift_sink.__isset.iceberg_table_sink) {
1114
0
            return Status::InternalError("Missing iceberg table sink.");
1115
0
        }
1116
1.73k
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1117
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1118
0
                                                                     row_desc, output_exprs);
1119
1.73k
        } else {
1120
1.73k
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1121
1.73k
                                                                row_desc, output_exprs);
1122
1.73k
        }
1123
1.73k
        break;
1124
1.73k
    }
1125
20
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1126
20
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1127
0
            return Status::InternalError("Missing iceberg delete sink.");
1128
0
        }
1129
20
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1130
20
                                                             row_desc, output_exprs);
1131
20
        break;
1132
20
    }
1133
80
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1134
80
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1135
0
            return Status::InternalError("Missing iceberg merge sink.");
1136
0
        }
1137
80
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1138
80
                                                            output_exprs);
1139
80
        break;
1140
80
    }
1141
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1142
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1143
0
            return Status::InternalError("Missing max compute table sink.");
1144
0
        }
1145
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1146
0
                                                       output_exprs);
1147
0
        break;
1148
0
    }
1149
80
    case TDataSinkType::JDBC_TABLE_SINK: {
1150
80
        if (!thrift_sink.__isset.jdbc_table_sink) {
1151
0
            return Status::InternalError("Missing data jdbc sink.");
1152
0
        }
1153
80
        if (config::enable_java_support) {
1154
80
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1155
80
                                                             output_exprs);
1156
80
        } else {
1157
0
            return Status::InternalError(
1158
0
                    "Jdbc table sink is not enabled, you can change be config "
1159
0
                    "enable_java_support to true and restart be.");
1160
0
        }
1161
80
        break;
1162
80
    }
1163
80
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1164
0
        if (!thrift_sink.__isset.memory_scratch_sink) {
1165
0
            return Status::InternalError("Missing data buffer sink.");
1166
0
        }
1167
1168
0
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1169
0
                                                             output_exprs);
1170
0
        break;
1171
0
    }
1172
164
    case TDataSinkType::RESULT_FILE_SINK: {
1173
164
        if (!thrift_sink.__isset.result_file_sink) {
1174
0
            return Status::InternalError("Missing result file sink.");
1175
0
        }
1176
1177
        // Result file sink is not the top sink
1178
164
        if (params.__isset.destinations && !params.destinations.empty()) {
1179
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1180
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1181
0
                    params.destinations, output_exprs, desc_tbl);
1182
164
        } else {
1183
164
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1184
164
                                                              output_exprs);
1185
164
        }
1186
164
        break;
1187
164
    }
1188
1.44k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1189
1.44k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1190
1.44k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1191
1.44k
        auto sink_id = next_sink_operator_id();
1192
1.44k
        const int multi_cast_node_id = sink_id;
1193
1.44k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1194
        // one sink has multiple sources.
1195
1.44k
        std::vector<int> sources;
1196
5.76k
        for (int i = 0; i < sender_size; ++i) {
1197
4.32k
            auto source_id = next_operator_id();
1198
4.32k
            sources.push_back(source_id);
1199
4.32k
        }
1200
1201
1.44k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1202
1.44k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1203
5.76k
        for (int i = 0; i < sender_size; ++i) {
1204
4.32k
            auto new_pipeline = add_pipeline();
1205
            // use to exchange sink
1206
4.32k
            RowDescriptor* exchange_row_desc = nullptr;
1207
4.32k
            {
1208
4.32k
                const auto& tmp_row_desc =
1209
4.32k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1210
4.32k
                                ? RowDescriptor(state->desc_tbl(),
1211
4.32k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1212
4.32k
                                                         .output_tuple_id})
1213
4.32k
                                : row_desc;
1214
4.32k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1215
4.32k
            }
1216
4.32k
            auto source_id = sources[i];
1217
4.32k
            OperatorPtr source_op;
1218
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1219
4.32k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1220
4.32k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1221
4.32k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1222
4.32k
                    /*operator_id=*/source_id);
1223
4.32k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1224
4.32k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1225
            // 2. create and set sink operator of data stream sender for new pipeline
1226
1227
4.32k
            DataSinkOperatorPtr sink_op;
1228
4.32k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1229
4.32k
                    state, *exchange_row_desc, next_sink_operator_id(),
1230
4.32k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1231
4.32k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1232
1233
4.32k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1234
4.32k
            {
1235
4.32k
                TDataSink* t = pool->add(new TDataSink());
1236
4.32k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1237
4.32k
                RETURN_IF_ERROR(sink_op->init(*t));
1238
4.32k
            }
1239
1240
            // 3. set dependency dag
1241
4.32k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1242
4.32k
        }
1243
1.44k
        if (sources.empty()) {
1244
0
            return Status::InternalError("size of sources must be greater than 0");
1245
0
        }
1246
1.44k
        break;
1247
1.44k
    }
1248
1.44k
    case TDataSinkType::BLACKHOLE_SINK: {
1249
10
        if (!thrift_sink.__isset.blackhole_sink) {
1250
0
            return Status::InternalError("Missing blackhole sink.");
1251
0
        }
1252
1253
10
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1254
10
        break;
1255
10
    }
1256
156
    case TDataSinkType::TVF_TABLE_SINK: {
1257
156
        if (!thrift_sink.__isset.tvf_table_sink) {
1258
0
            return Status::InternalError("Missing TVF table sink.");
1259
0
        }
1260
156
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1261
156
                                                        output_exprs);
1262
156
        break;
1263
156
    }
1264
0
    default:
1265
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1266
109k
    }
1267
109k
    return Status::OK();
1268
109k
}
1269
1270
// NOLINTBEGIN(readability-function-size)
1271
// NOLINTBEGIN(readability-function-cognitive-complexity)
1272
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1273
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1274
                                                 PipelinePtr& cur_pipe, int parent_idx,
1275
                                                 int child_idx,
1276
                                                 const bool followed_by_shuffled_operator,
1277
                                                 const bool require_bucket_distribution,
1278
154k
                                                 OperatorPtr& cache_op) {
1279
154k
    std::vector<DataSinkOperatorPtr> sink_ops;
1280
154k
    Defer defer = Defer([&]() {
1281
154k
        if (op) {
1282
154k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1283
154k
        }
1284
154k
        for (auto& s : sink_ops) {
1285
36.9k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1286
36.9k
        }
1287
154k
    });
1288
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1289
    // Therefore, here we need to use a stack-like structure.
1290
154k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1291
154k
    std::stringstream error_msg;
1292
154k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1293
1294
154k
    bool fe_with_old_version = false;
1295
154k
    switch (tnode.node_type) {
1296
49.7k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1297
49.7k
        op = std::make_shared<OlapScanOperatorX>(
1298
49.7k
                pool, tnode, next_operator_id(), descs, _num_instances,
1299
49.7k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1300
49.7k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1301
49.7k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1302
49.7k
        break;
1303
49.7k
    }
1304
0
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1305
0
        DCHECK(_query_ctx != nullptr);
1306
0
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1307
0
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1308
0
                                                    _num_instances);
1309
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1310
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1311
0
        break;
1312
0
    }
1313
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1314
0
        if (config::enable_java_support) {
1315
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1316
0
                                                     _num_instances);
1317
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1318
0
        } else {
1319
0
            return Status::InternalError(
1320
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1321
0
                    "to true and restart be.");
1322
0
        }
1323
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1324
0
        break;
1325
0
    }
1326
20.3k
    case TPlanNodeType::FILE_SCAN_NODE: {
1327
20.3k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1328
20.3k
                                                 _num_instances);
1329
20.3k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1330
20.3k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1331
20.3k
        break;
1332
20.3k
    }
1333
0
    case TPlanNodeType::ES_SCAN_NODE:
1334
592
    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
1335
592
        op = std::make_shared<EsScanOperatorX>(pool, tnode, next_operator_id(), descs,
1336
592
                                               _num_instances);
1337
592
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1338
592
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1339
592
        break;
1340
592
    }
1341
36.3k
    case TPlanNodeType::EXCHANGE_NODE: {
1342
36.3k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1343
36.3k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1344
36.3k
                                  : 0;
1345
36.3k
        DCHECK_GT(num_senders, 0);
1346
36.3k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1347
36.3k
                                                       num_senders);
1348
36.3k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1349
36.3k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1350
36.3k
        break;
1351
36.3k
    }
1352
22.8k
    case TPlanNodeType::AGGREGATION_NODE: {
1353
22.8k
        if (tnode.agg_node.grouping_exprs.empty() &&
1354
22.8k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1355
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1356
0
                                         ": group by and output is empty");
1357
0
        }
1358
22.8k
        bool need_create_cache_op =
1359
22.8k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1360
22.8k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1361
0
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1362
0
            auto cache_source_id = next_operator_id();
1363
0
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1364
0
                                                        _params.fragment.query_cache_param);
1365
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1366
1367
0
            const auto downstream_pipeline_id = cur_pipe->id();
1368
0
            if (!_dag.contains(downstream_pipeline_id)) {
1369
0
                _dag.insert({downstream_pipeline_id, {}});
1370
0
            }
1371
0
            new_pipe = add_pipeline(cur_pipe);
1372
0
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1373
1374
0
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1375
0
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1376
0
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1377
0
            return Status::OK();
1378
0
        };
1379
22.8k
        const bool group_by_limit_opt =
1380
22.8k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1381
1382
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1383
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1384
22.8k
        const bool enable_spill = _runtime_state->enable_spill() &&
1385
22.8k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1386
22.8k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1387
22.8k
                                      tnode.agg_node.use_streaming_preaggregation &&
1388
22.8k
                                      !tnode.agg_node.grouping_exprs.empty();
1389
        // TODO: distinct streaming agg does not support spill.
1390
22.8k
        const bool can_use_distinct_streaming_agg =
1391
22.8k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1392
22.8k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1393
22.8k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1394
22.8k
                _params.query_options.enable_distinct_streaming_aggregation;
1395
1396
22.8k
        if (can_use_distinct_streaming_agg) {
1397
36
            if (need_create_cache_op) {
1398
0
                PipelinePtr new_pipe;
1399
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1400
1401
0
                cache_op = op;
1402
0
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1403
0
                                                                     tnode, descs);
1404
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1405
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1406
0
                cur_pipe = new_pipe;
1407
36
            } else {
1408
36
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1409
36
                                                                     tnode, descs);
1410
36
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1411
36
            }
1412
22.8k
        } else if (is_streaming_agg) {
1413
2.00k
            if (need_create_cache_op) {
1414
0
                PipelinePtr new_pipe;
1415
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1416
0
                cache_op = op;
1417
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1418
0
                                                             descs);
1419
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1420
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1421
0
                cur_pipe = new_pipe;
1422
2.00k
            } else {
1423
2.00k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1424
2.00k
                                                             descs);
1425
2.00k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1426
2.00k
            }
1427
20.8k
        } else {
1428
            // create new pipeline to add query cache operator
1429
20.8k
            PipelinePtr new_pipe;
1430
20.8k
            if (need_create_cache_op) {
1431
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1432
0
                cache_op = op;
1433
0
            }
1434
1435
20.8k
            if (enable_spill) {
1436
0
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1437
0
                                                                     next_operator_id(), descs);
1438
20.8k
            } else {
1439
20.8k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1440
20.8k
            }
1441
20.8k
            if (need_create_cache_op) {
1442
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1443
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1444
0
                cur_pipe = new_pipe;
1445
20.8k
            } else {
1446
20.8k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1447
20.8k
            }
1448
1449
20.8k
            const auto downstream_pipeline_id = cur_pipe->id();
1450
20.8k
            if (!_dag.contains(downstream_pipeline_id)) {
1451
19.3k
                _dag.insert({downstream_pipeline_id, {}});
1452
19.3k
            }
1453
20.8k
            cur_pipe = add_pipeline(cur_pipe);
1454
20.8k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1455
1456
20.8k
            if (enable_spill) {
1457
0
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1458
0
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1459
20.8k
            } else {
1460
20.8k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1461
20.8k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1462
20.8k
            }
1463
20.8k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1464
20.8k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1465
20.8k
        }
1466
22.8k
        break;
1467
22.8k
    }
1468
22.8k
    case TPlanNodeType::HASH_JOIN_NODE: {
1469
824
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1470
824
                                       tnode.hash_join_node.is_broadcast_join;
1471
824
        const auto enable_spill = _runtime_state->enable_spill();
1472
824
        if (enable_spill && !is_broadcast_join) {
1473
0
            auto tnode_ = tnode;
1474
0
            tnode_.runtime_filters.clear();
1475
0
            auto inner_probe_operator =
1476
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1477
1478
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1479
            // So here use `tnode_` which has no runtime filters.
1480
0
            auto probe_side_inner_sink_operator =
1481
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1482
1483
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1484
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1485
1486
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1487
0
                    pool, tnode_, next_operator_id(), descs);
1488
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1489
0
                                                inner_probe_operator);
1490
0
            op = std::move(probe_operator);
1491
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1492
1493
0
            const auto downstream_pipeline_id = cur_pipe->id();
1494
0
            if (!_dag.contains(downstream_pipeline_id)) {
1495
0
                _dag.insert({downstream_pipeline_id, {}});
1496
0
            }
1497
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1498
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1499
1500
0
            auto inner_sink_operator =
1501
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1502
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1503
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1504
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1505
1506
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1507
0
            sink_ops.push_back(std::move(sink_operator));
1508
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1509
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1510
1511
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1512
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1513
824
        } else {
1514
824
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1515
824
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1516
1517
824
            const auto downstream_pipeline_id = cur_pipe->id();
1518
824
            if (!_dag.contains(downstream_pipeline_id)) {
1519
812
                _dag.insert({downstream_pipeline_id, {}});
1520
812
            }
1521
824
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1522
824
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1523
1524
824
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1525
824
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1526
824
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1527
824
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1528
1529
824
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1530
824
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1531
824
        }
1532
824
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1533
748
            std::shared_ptr<HashJoinSharedState> shared_state =
1534
748
                    HashJoinSharedState::create_shared(_num_instances);
1535
6.53k
            for (int i = 0; i < _num_instances; i++) {
1536
5.78k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1537
5.78k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1538
5.78k
                sink_dep->set_shared_state(shared_state.get());
1539
5.78k
                shared_state->sink_deps.push_back(sink_dep);
1540
5.78k
            }
1541
748
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1542
748
                                                     op->node_id(), "HASH_JOIN_PROBE");
1543
748
            _op_id_to_shared_state.insert(
1544
748
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1545
748
        }
1546
824
        break;
1547
824
    }
1548
2.94k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1549
2.94k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1550
2.94k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1551
1552
2.94k
        const auto downstream_pipeline_id = cur_pipe->id();
1553
2.94k
        if (!_dag.contains(downstream_pipeline_id)) {
1554
2.94k
            _dag.insert({downstream_pipeline_id, {}});
1555
2.94k
        }
1556
2.94k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1557
2.94k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1558
1559
2.94k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1560
2.94k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1561
2.94k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1562
2.94k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1563
2.94k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1564
2.94k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1565
2.94k
        break;
1566
2.94k
    }
1567
4.10k
    case TPlanNodeType::UNION_NODE: {
1568
4.10k
        int child_count = tnode.num_children;
1569
4.10k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1570
4.10k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1571
1572
4.10k
        const auto downstream_pipeline_id = cur_pipe->id();
1573
4.10k
        if (!_dag.contains(downstream_pipeline_id)) {
1574
4.05k
            _dag.insert({downstream_pipeline_id, {}});
1575
4.05k
        }
1576
4.78k
        for (int i = 0; i < child_count; i++) {
1577
676
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1578
676
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1579
676
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1580
676
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1581
676
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1582
676
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1583
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1584
676
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1585
676
        }
1586
4.10k
        break;
1587
4.10k
    }
1588
11.6k
    case TPlanNodeType::SORT_NODE: {
1589
11.6k
        const auto should_spill = _runtime_state->enable_spill() &&
1590
11.6k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1591
11.6k
        const bool use_local_merge =
1592
11.6k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1593
11.6k
        if (should_spill) {
1594
2
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1595
11.6k
        } else if (use_local_merge) {
1596
11.1k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1597
11.1k
                                                                 descs);
1598
11.1k
        } else {
1599
466
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1600
466
        }
1601
11.6k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1602
1603
11.6k
        const auto downstream_pipeline_id = cur_pipe->id();
1604
11.6k
        if (!_dag.contains(downstream_pipeline_id)) {
1605
11.6k
            _dag.insert({downstream_pipeline_id, {}});
1606
11.6k
        }
1607
11.6k
        cur_pipe = add_pipeline(cur_pipe);
1608
11.6k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1609
1610
11.6k
        if (should_spill) {
1611
2
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1612
2
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1613
11.6k
        } else {
1614
11.6k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1615
11.6k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1616
11.6k
        }
1617
11.6k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1618
11.6k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1619
11.6k
        break;
1620
11.6k
    }
1621
11.6k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1622
0
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1623
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1624
1625
0
        const auto downstream_pipeline_id = cur_pipe->id();
1626
0
        if (!_dag.contains(downstream_pipeline_id)) {
1627
0
            _dag.insert({downstream_pipeline_id, {}});
1628
0
        }
1629
0
        cur_pipe = add_pipeline(cur_pipe);
1630
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1631
1632
0
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1633
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1634
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1635
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1636
0
        break;
1637
0
    }
1638
44
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1639
44
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1640
44
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1641
1642
44
        const auto downstream_pipeline_id = cur_pipe->id();
1643
44
        if (!_dag.contains(downstream_pipeline_id)) {
1644
44
            _dag.insert({downstream_pipeline_id, {}});
1645
44
        }
1646
44
        cur_pipe = add_pipeline(cur_pipe);
1647
44
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1648
1649
44
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1650
44
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1651
44
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1652
44
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1653
44
        break;
1654
44
    }
1655
936
    case TPlanNodeType::MATERIALIZATION_NODE: {
1656
936
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1657
936
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1658
936
        break;
1659
936
    }
1660
936
    case TPlanNodeType::INTERSECT_NODE: {
1661
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1662
0
                                                                      cur_pipe, sink_ops));
1663
0
        break;
1664
0
    }
1665
0
    case TPlanNodeType::EXCEPT_NODE: {
1666
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1667
0
                                                                       cur_pipe, sink_ops));
1668
0
        break;
1669
0
    }
1670
0
    case TPlanNodeType::REPEAT_NODE: {
1671
0
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1672
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1673
0
        break;
1674
0
    }
1675
4
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1676
4
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1677
4
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1678
4
        break;
1679
4
    }
1680
200
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1681
200
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1682
200
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1683
200
        break;
1684
200
    }
1685
200
    case TPlanNodeType::EMPTY_SET_NODE: {
1686
154
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1687
154
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1688
154
        break;
1689
154
    }
1690
194
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1691
194
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1692
194
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1693
194
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1694
194
        break;
1695
194
    }
1696
776
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1697
776
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1698
776
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1699
776
        break;
1700
776
    }
1701
1.45k
    case TPlanNodeType::META_SCAN_NODE: {
1702
1.45k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1703
1.45k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1704
1.45k
        break;
1705
1.45k
    }
1706
1.45k
    case TPlanNodeType::SELECT_NODE: {
1707
1.44k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1708
1.44k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1709
1.44k
        break;
1710
1.44k
    }
1711
1.44k
    case TPlanNodeType::REC_CTE_NODE: {
1712
0
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1713
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1714
1715
0
        const auto downstream_pipeline_id = cur_pipe->id();
1716
0
        if (!_dag.contains(downstream_pipeline_id)) {
1717
0
            _dag.insert({downstream_pipeline_id, {}});
1718
0
        }
1719
1720
0
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1721
0
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1722
1723
0
        DataSinkOperatorPtr anchor_sink;
1724
0
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1725
0
                                                                  op->operator_id(), tnode, descs);
1726
0
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1727
0
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1728
0
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1729
1730
0
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1731
0
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1732
1733
0
        DataSinkOperatorPtr rec_sink;
1734
0
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
1735
0
                                                         tnode, descs);
1736
0
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
1737
0
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
1738
0
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
1739
1740
0
        break;
1741
0
    }
1742
0
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
1743
0
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
1744
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1745
0
        break;
1746
0
    }
1747
0
    default:
1748
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
1749
0
                                     print_plan_node_type(tnode.node_type));
1750
154k
    }
1751
154k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
1752
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
1753
0
        op->set_serial_operator();
1754
0
    }
1755
1756
154k
    return Status::OK();
1757
154k
}
1758
// NOLINTEND(readability-function-cognitive-complexity)
1759
// NOLINTEND(readability-function-size)
1760
1761
template <bool is_intersect>
1762
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
1763
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1764
0
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1765
0
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1766
0
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1767
1768
0
    const auto downstream_pipeline_id = cur_pipe->id();
1769
0
    if (!_dag.contains(downstream_pipeline_id)) {
1770
0
        _dag.insert({downstream_pipeline_id, {}});
1771
0
    }
1772
1773
0
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1774
0
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1775
0
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1776
1777
0
        if (child_id == 0) {
1778
0
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1779
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1780
0
        } else {
1781
0
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1782
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1783
0
        }
1784
0
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1785
0
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1786
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1787
0
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1788
0
    }
1789
1790
0
    return Status::OK();
1791
0
}
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
1792
1793
108k
Status PipelineFragmentContext::submit() {
1794
108k
    if (_submitted) {
1795
0
        return Status::InternalError("submitted");
1796
0
    }
1797
108k
    _submitted = true;
1798
1799
108k
    int submit_tasks = 0;
1800
108k
    Status st;
1801
108k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
1802
222k
    for (auto& task : _tasks) {
1803
390k
        for (auto& t : task) {
1804
390k
            st = scheduler->submit(t.first);
1805
390k
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
1806
390k
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
1807
390k
            if (!st) {
1808
0
                cancel(Status::InternalError("submit context to executor fail"));
1809
0
                std::lock_guard<std::mutex> l(_task_mutex);
1810
0
                _total_tasks = submit_tasks;
1811
0
                break;
1812
0
            }
1813
390k
            submit_tasks++;
1814
390k
        }
1815
222k
    }
1816
108k
    if (!st.ok()) {
1817
0
        bool need_remove = false;
1818
0
        {
1819
0
            std::lock_guard<std::mutex> l(_task_mutex);
1820
0
            if (_closed_tasks >= _total_tasks) {
1821
0
                need_remove = _close_fragment_instance();
1822
0
            }
1823
0
        }
1824
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1825
0
        if (need_remove) {
1826
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1827
0
        }
1828
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
1829
0
                                     BackendOptions::get_localhost());
1830
108k
    } else {
1831
108k
        return st;
1832
108k
    }
1833
108k
}
1834
1835
12
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
1836
12
    if (_runtime_state->enable_profile()) {
1837
0
        std::stringstream ss;
1838
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1839
0
            runtime_profile_ptr->pretty_print(&ss);
1840
0
        }
1841
1842
0
        if (_runtime_state->load_channel_profile()) {
1843
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1844
0
        }
1845
1846
0
        auto profile_str =
1847
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
1848
0
                            this->_fragment_id, extra_info, ss.str());
1849
0
        LOG_LONG_STRING(INFO, profile_str);
1850
0
    }
1851
12
}
1852
// If all pipeline tasks binded to the fragment instance are finished, then we could
1853
// close the fragment instance.
1854
// Returns true if the caller should call remove_pipeline_context() **after** releasing
1855
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
1856
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
1857
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
1858
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
1859
// (via debug_string()).
1860
109k
bool PipelineFragmentContext::_close_fragment_instance() {
1861
109k
    if (_is_fragment_instance_closed) {
1862
0
        return false;
1863
0
    }
1864
109k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
1865
109k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1866
109k
    if (!_need_notify_close) {
1867
109k
        auto st = send_report(true);
1868
109k
        if (!st) {
1869
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1870
0
                                        print_id(_query_id), _fragment_id, st.to_string());
1871
0
        }
1872
109k
    }
1873
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
1874
    // Since stream load does not have someting like coordinator on FE, so
1875
    // backend can not report profile to FE, ant its profile can not be shown
1876
    // in the same way with other query. So we print the profile content to info log.
1877
1878
109k
    if (_runtime_state->enable_profile() &&
1879
109k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
1880
440
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
1881
440
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
1882
0
        std::stringstream ss;
1883
        // Compute the _local_time_percent before pretty_print the runtime_profile
1884
        // Before add this operation, the print out like that:
1885
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
1886
        // After add the operation, the print out like that:
1887
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
1888
        // We can easily know the exec node execute time without child time consumed.
1889
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1890
0
            runtime_profile_ptr->pretty_print(&ss);
1891
0
        }
1892
1893
0
        if (_runtime_state->load_channel_profile()) {
1894
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1895
0
        }
1896
1897
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
1898
0
    }
1899
1900
109k
    if (_query_ctx->enable_profile()) {
1901
440
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
1902
440
                                         collect_realtime_load_channel_profile());
1903
440
    }
1904
1905
    // Return whether the caller needs to remove from the pipeline map.
1906
    // The caller must do this after releasing _task_mutex.
1907
109k
    return !_need_notify_close;
1908
109k
}
1909
1910
389k
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
1911
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
1912
389k
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
1913
389k
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
1914
166k
        if (_dag.contains(pipeline_id)) {
1915
72.6k
            for (auto dep : _dag[pipeline_id]) {
1916
72.6k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
1917
72.6k
            }
1918
58.1k
        }
1919
166k
    }
1920
389k
    bool need_remove = false;
1921
389k
    {
1922
389k
        std::lock_guard<std::mutex> l(_task_mutex);
1923
389k
        ++_closed_tasks;
1924
389k
        if (_closed_tasks >= _total_tasks) {
1925
109k
            need_remove = _close_fragment_instance();
1926
109k
        }
1927
389k
    }
1928
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1929
389k
    if (need_remove) {
1930
109k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1931
109k
    }
1932
389k
}
1933
1934
12.2k
std::string PipelineFragmentContext::get_load_error_url() {
1935
12.2k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
1936
0
        return to_load_error_http_path(str);
1937
0
    }
1938
37.8k
    for (auto& tasks : _tasks) {
1939
48.6k
        for (auto& task : tasks) {
1940
48.6k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
1941
12
                return to_load_error_http_path(str);
1942
12
            }
1943
48.6k
        }
1944
37.8k
    }
1945
12.2k
    return "";
1946
12.2k
}
1947
1948
12.2k
std::string PipelineFragmentContext::get_first_error_msg() {
1949
12.2k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
1950
0
        return str;
1951
0
    }
1952
37.8k
    for (auto& tasks : _tasks) {
1953
48.6k
        for (auto& task : tasks) {
1954
48.6k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
1955
12
                return str;
1956
12
            }
1957
48.6k
        }
1958
37.8k
    }
1959
12.2k
    return "";
1960
12.2k
}
1961
1962
110k
Status PipelineFragmentContext::send_report(bool done) {
1963
110k
    Status exec_status = _query_ctx->exec_status();
1964
    // If plan is done successfully, but _is_report_success is false,
1965
    // no need to send report.
1966
    // Load will set _is_report_success to true because load wants to know
1967
    // the process.
1968
110k
    if (!_is_report_success && done && exec_status.ok()) {
1969
98.9k
        return Status::OK();
1970
98.9k
    }
1971
1972
    // If both _is_report_success and _is_report_on_cancel are false,
1973
    // which means no matter query is success or failed, no report is needed.
1974
    // This may happen when the query limit reached and
1975
    // a internal cancellation being processed
1976
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
1977
    // be set to false, to avoid sending fault report to FE.
1978
11.6k
    if (!_is_report_success && !_is_report_on_cancel) {
1979
210
        if (done) {
1980
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
1981
210
            return Status::OK();
1982
210
        }
1983
0
        return Status::NeedSendAgain("");
1984
210
    }
1985
1986
11.4k
    std::vector<RuntimeState*> runtime_states;
1987
1988
34.4k
    for (auto& tasks : _tasks) {
1989
40.1k
        for (auto& task : tasks) {
1990
40.1k
            runtime_states.push_back(task.second.get());
1991
40.1k
        }
1992
34.4k
    }
1993
1994
11.4k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
1995
11.4k
                                        ? get_load_error_url()
1996
18.4E
                                        : _query_ctx->get_load_error_url();
1997
11.4k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
1998
11.4k
                                          ? get_first_error_msg()
1999
18.4E
                                          : _query_ctx->get_first_error_msg();
2000
2001
11.4k
    ReportStatusRequest req {.status = exec_status,
2002
11.4k
                             .runtime_states = runtime_states,
2003
11.4k
                             .done = done || !exec_status.ok(),
2004
11.4k
                             .coord_addr = _query_ctx->coord_addr,
2005
11.4k
                             .query_id = _query_id,
2006
11.4k
                             .fragment_id = _fragment_id,
2007
11.4k
                             .fragment_instance_id = TUniqueId(),
2008
11.4k
                             .backend_num = -1,
2009
11.4k
                             .runtime_state = _runtime_state.get(),
2010
11.4k
                             .load_error_url = load_eror_url,
2011
11.4k
                             .first_error_msg = first_error_msg,
2012
11.4k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2013
2014
11.4k
    return _report_status_cb(
2015
11.4k
            req, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
2016
11.6k
}
2017
2018
4
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2019
4
    size_t res = 0;
2020
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2021
    // here to traverse the vector.
2022
4
    for (const auto& task_instances : _tasks) {
2023
6
        for (const auto& task : task_instances) {
2024
6
            if (task.first->is_running()) {
2025
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2026
0
                                      << " is running, task: " << (void*)task.first.get()
2027
0
                                      << ", is_running: " << task.first->is_running();
2028
0
                *has_running_task = true;
2029
0
                return 0;
2030
0
            }
2031
2032
6
            size_t revocable_size = task.first->get_revocable_size();
2033
6
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2034
2
                res += revocable_size;
2035
2
            }
2036
6
        }
2037
4
    }
2038
4
    return res;
2039
4
}
2040
2041
8
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2042
8
    std::vector<PipelineTask*> revocable_tasks;
2043
8
    for (const auto& task_instances : _tasks) {
2044
12
        for (const auto& task : task_instances) {
2045
12
            size_t revocable_size_ = task.first->get_revocable_size();
2046
2047
12
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2048
4
                revocable_tasks.emplace_back(task.first.get());
2049
4
            }
2050
12
        }
2051
8
    }
2052
8
    return revocable_tasks;
2053
8
}
2054
2055
0
std::string PipelineFragmentContext::debug_string() {
2056
0
    std::lock_guard<std::mutex> l(_task_mutex);
2057
0
    fmt::memory_buffer debug_string_buffer;
2058
0
    fmt::format_to(debug_string_buffer,
2059
0
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2060
0
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2061
0
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2062
0
    for (size_t j = 0; j < _tasks.size(); j++) {
2063
0
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2064
0
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2065
0
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2066
0
                           _tasks[j][i].first->debug_string());
2067
0
        }
2068
0
    }
2069
2070
0
    return fmt::to_string(debug_string_buffer);
2071
0
}
2072
2073
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2074
440
PipelineFragmentContext::collect_realtime_profile() const {
2075
440
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2076
2077
    // we do not have mutex to protect pipeline_id_to_profile
2078
    // so we need to make sure this funciton is invoked after fragment context
2079
    // has already been prepared.
2080
440
    if (!_prepared) {
2081
0
        std::string msg =
2082
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2083
0
        DCHECK(false) << msg;
2084
0
        LOG_ERROR(msg);
2085
0
        return res;
2086
0
    }
2087
2088
    // Make sure first profile is fragment level profile
2089
440
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2090
440
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2091
440
    res.push_back(fragment_profile);
2092
2093
    // pipeline_id_to_profile is initialized in prepare stage
2094
648
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2095
648
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2096
648
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2097
648
        res.push_back(profile_ptr);
2098
648
    }
2099
2100
440
    return res;
2101
440
}
2102
2103
std::shared_ptr<TRuntimeProfileTree>
2104
440
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2105
    // we do not have mutex to protect pipeline_id_to_profile
2106
    // so we need to make sure this funciton is invoked after fragment context
2107
    // has already been prepared.
2108
440
    if (!_prepared) {
2109
0
        std::string msg =
2110
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2111
0
        DCHECK(false) << msg;
2112
0
        LOG_ERROR(msg);
2113
0
        return nullptr;
2114
0
    }
2115
2116
608
    for (const auto& tasks : _tasks) {
2117
1.06k
        for (const auto& task : tasks) {
2118
1.06k
            if (task.second->load_channel_profile() == nullptr) {
2119
0
                continue;
2120
0
            }
2121
2122
1.06k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2123
2124
1.06k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2125
1.06k
                                                           _runtime_state->profile_level());
2126
1.06k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2127
1.06k
        }
2128
608
    }
2129
2130
440
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2131
440
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2132
440
                                                      _runtime_state->profile_level());
2133
440
    return load_channel_profile;
2134
440
}
2135
2136
// Collect runtime filter IDs registered by all tasks in this PFC.
2137
// Used during recursive CTE stage transitions to know which filters to deregister
2138
// before creating the new PFC for the next recursion round.
2139
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2140
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2141
// the vector sizes do not change, and individual RuntimeState filter sets are
2142
// written only during open() which has completed by the time we reach rerun.
2143
0
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2144
0
    std::set<int> result;
2145
0
    for (const auto& _task : _tasks) {
2146
0
        for (const auto& task : _task) {
2147
0
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2148
0
            result.merge(set);
2149
0
        }
2150
0
    }
2151
0
    if (_runtime_state) {
2152
0
        auto set = _runtime_state->get_deregister_runtime_filter();
2153
0
        result.merge(set);
2154
0
    }
2155
0
    return result;
2156
0
}
2157
2158
109k
void PipelineFragmentContext::_release_resource() {
2159
109k
    std::lock_guard<std::mutex> l(_task_mutex);
2160
    // The memory released by the query end is recorded in the query mem tracker.
2161
109k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2162
109k
    auto st = _query_ctx->exec_status();
2163
223k
    for (auto& _task : _tasks) {
2164
223k
        if (!_task.empty()) {
2165
223k
            _call_back(_task.front().first->runtime_state(), &st);
2166
223k
        }
2167
223k
    }
2168
109k
    _tasks.clear();
2169
109k
    _dag.clear();
2170
109k
    _pip_id_to_pipeline.clear();
2171
109k
    _pipelines.clear();
2172
109k
    _sink.reset();
2173
109k
    _root_op.reset();
2174
109k
    _runtime_filter_mgr_map.clear();
2175
109k
    _op_id_to_shared_state.clear();
2176
109k
}
2177
2178
#include "common/compile_check_end.h"
2179
} // namespace doris