Coverage Report

Created: 2026-06-28 17:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/FrontendService.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/PlanNodes_types.h>
25
#include <pthread.h>
26
27
#include <algorithm>
28
#include <cstdlib>
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <fmt/format.h>
31
#include <thrift/Thrift.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
#include <thrift/transport/TTransportException.h>
34
35
#include <chrono> // IWYU pragma: keep
36
#include <map>
37
#include <memory>
38
#include <ostream>
39
#include <utility>
40
41
#include "cloud/config.h"
42
#include "common/cast_set.h"
43
#include "common/config.h"
44
#include "common/exception.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "exec/exchange/local_exchange_sink_operator.h"
48
#include "exec/exchange/local_exchange_source_operator.h"
49
#include "exec/exchange/local_exchanger.h"
50
#include "exec/exchange/vdata_stream_mgr.h"
51
#include "exec/operator/aggregation_sink_operator.h"
52
#include "exec/operator/aggregation_source_operator.h"
53
#include "exec/operator/analytic_sink_operator.h"
54
#include "exec/operator/analytic_source_operator.h"
55
#include "exec/operator/assert_num_rows_operator.h"
56
#include "exec/operator/blackhole_sink_operator.h"
57
#include "exec/operator/bucketed_aggregation_sink_operator.h"
58
#include "exec/operator/bucketed_aggregation_source_operator.h"
59
#include "exec/operator/cache_sink_operator.h"
60
#include "exec/operator/cache_source_operator.h"
61
#include "exec/operator/datagen_operator.h"
62
#include "exec/operator/dict_sink_operator.h"
63
#include "exec/operator/distinct_streaming_aggregation_operator.h"
64
#include "exec/operator/empty_set_operator.h"
65
#include "exec/operator/exchange_sink_operator.h"
66
#include "exec/operator/exchange_source_operator.h"
67
#include "exec/operator/file_scan_operator.h"
68
#include "exec/operator/group_commit_block_sink_operator.h"
69
#include "exec/operator/group_commit_scan_operator.h"
70
#include "exec/operator/hashjoin_build_sink.h"
71
#include "exec/operator/hashjoin_probe_operator.h"
72
#include "exec/operator/hive_table_sink_operator.h"
73
#include "exec/operator/iceberg_delete_sink_operator.h"
74
#include "exec/operator/iceberg_merge_sink_operator.h"
75
#include "exec/operator/iceberg_table_sink_operator.h"
76
#include "exec/operator/jdbc_scan_operator.h"
77
#include "exec/operator/jdbc_table_sink_operator.h"
78
#include "exec/operator/local_merge_sort_source_operator.h"
79
#include "exec/operator/materialization_opertor.h"
80
#include "exec/operator/maxcompute_table_sink_operator.h"
81
#include "exec/operator/memory_scratch_sink_operator.h"
82
#include "exec/operator/meta_scan_operator.h"
83
#include "exec/operator/multi_cast_data_stream_sink.h"
84
#include "exec/operator/multi_cast_data_stream_source.h"
85
#include "exec/operator/nested_loop_join_build_operator.h"
86
#include "exec/operator/nested_loop_join_probe_operator.h"
87
#include "exec/operator/olap_scan_operator.h"
88
#include "exec/operator/olap_table_sink_operator.h"
89
#include "exec/operator/olap_table_sink_v2_operator.h"
90
#include "exec/operator/partition_sort_sink_operator.h"
91
#include "exec/operator/partition_sort_source_operator.h"
92
#include "exec/operator/partitioned_aggregation_sink_operator.h"
93
#include "exec/operator/partitioned_aggregation_source_operator.h"
94
#include "exec/operator/partitioned_hash_join_probe_operator.h"
95
#include "exec/operator/partitioned_hash_join_sink_operator.h"
96
#include "exec/operator/rec_cte_anchor_sink_operator.h"
97
#include "exec/operator/rec_cte_scan_operator.h"
98
#include "exec/operator/rec_cte_sink_operator.h"
99
#include "exec/operator/rec_cte_source_operator.h"
100
#include "exec/operator/repeat_operator.h"
101
#include "exec/operator/result_file_sink_operator.h"
102
#include "exec/operator/result_sink_operator.h"
103
#include "exec/operator/schema_scan_operator.h"
104
#include "exec/operator/select_operator.h"
105
#include "exec/operator/set_probe_sink_operator.h"
106
#include "exec/operator/set_sink_operator.h"
107
#include "exec/operator/set_source_operator.h"
108
#include "exec/operator/sort_sink_operator.h"
109
#include "exec/operator/sort_source_operator.h"
110
#include "exec/operator/spill_iceberg_table_sink_operator.h"
111
#include "exec/operator/spill_sort_sink_operator.h"
112
#include "exec/operator/spill_sort_source_operator.h"
113
#include "exec/operator/streaming_aggregation_operator.h"
114
#include "exec/operator/table_function_operator.h"
115
#include "exec/operator/tvf_table_sink_operator.h"
116
#include "exec/operator/union_sink_operator.h"
117
#include "exec/operator/union_source_operator.h"
118
#include "exec/pipeline/dependency.h"
119
#include "exec/pipeline/pipeline_task.h"
120
#include "exec/pipeline/task_scheduler.h"
121
#include "exec/runtime_filter/runtime_filter_mgr.h"
122
#include "exec/sort/topn_sorter.h"
123
#include "exec/spill/spill_file.h"
124
#include "io/fs/stream_load_pipe.h"
125
#include "load/stream_load/new_load_stream_mgr.h"
126
#include "runtime/exec_env.h"
127
#include "runtime/fragment_mgr.h"
128
#include "runtime/result_buffer_mgr.h"
129
#include "runtime/runtime_state.h"
130
#include "runtime/thread_context.h"
131
#include "service/backend_options.h"
132
#include "util/client_cache.h"
133
#include "util/countdown_latch.h"
134
#include "util/debug_util.h"
135
#include "util/network_util.h"
136
#include "util/uid_util.h"
137
138
namespace doris {
139
PipelineFragmentContext::PipelineFragmentContext(
140
        TUniqueId query_id, const TPipelineFragmentParams& request,
141
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
142
        const std::function<void(RuntimeState*, Status*)>& call_back)
143
123k
        : _query_id(std::move(query_id)),
144
123k
          _fragment_id(request.fragment_id),
145
123k
          _exec_env(exec_env),
146
123k
          _query_ctx(std::move(query_ctx)),
147
123k
          _call_back(call_back),
148
123k
          _is_report_on_cancel(true),
149
123k
          _params(request),
150
123k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
151
123k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
152
123k
                                                               : false) {
153
123k
    _fragment_watcher.start();
154
123k
}
155
156
123k
PipelineFragmentContext::~PipelineFragmentContext() {
157
123k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
158
123k
            .tag("query_id", print_id(_query_id))
159
123k
            .tag("fragment_id", _fragment_id);
160
123k
    _release_resource();
161
123k
    {
162
        // The memory released by the query end is recorded in the query mem tracker.
163
123k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
164
123k
        _runtime_state.reset();
165
123k
        _query_ctx.reset();
166
123k
    }
167
123k
}
168
169
2
bool PipelineFragmentContext::is_timeout(timespec now) const {
170
2
    if (_timeout <= 0) {
171
0
        return false;
172
0
    }
173
2
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
174
2
}
175
176
// notify_close() transitions the PFC from "waiting for external close notification" to
177
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
178
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
179
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
180
884
bool PipelineFragmentContext::notify_close() {
181
884
    bool all_closed = false;
182
884
    bool need_remove = false;
183
884
    {
184
884
        std::lock_guard<std::mutex> l(_task_mutex);
185
884
        if (_closed_tasks >= _total_tasks) {
186
10
            if (_need_notify_close) {
187
                // Fragment was cancelled and waiting for notify to close.
188
                // Record that we need to remove from fragment mgr, but do it
189
                // after releasing _task_mutex to avoid ABBA deadlock with
190
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
191
                // first, then _task_mutex via debug_string()).
192
0
                need_remove = true;
193
0
            }
194
10
            all_closed = true;
195
10
        }
196
        // make fragment release by self after cancel
197
884
        _need_notify_close = false;
198
884
    }
199
884
    if (need_remove) {
200
0
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
201
0
    }
202
884
    return all_closed;
203
884
}
204
205
// Must not add lock in this method. Because it will call query ctx cancel. And
206
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
207
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
208
// There maybe dead lock.
209
884
void PipelineFragmentContext::cancel(const Status reason) {
210
884
    LOG_INFO("PipelineFragmentContext::cancel")
211
884
            .tag("query_id", print_id(_query_id))
212
884
            .tag("fragment_id", _fragment_id)
213
884
            .tag("reason", reason.to_string());
214
884
    if (notify_close()) {
215
10
        return;
216
10
    }
217
    // Timeout is a special error code, we need print current stack to debug timeout issue.
218
874
    if (reason.is<ErrorCode::TIMEOUT>()) {
219
0
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
220
0
                                   debug_string());
221
0
        LOG_LONG_STRING(WARNING, dbg_str);
222
0
    }
223
224
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
225
874
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
226
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
227
0
                    debug_string());
228
0
    }
229
230
874
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
231
0
        print_profile("cancel pipeline, reason: " + reason.to_string());
232
0
    }
233
234
874
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
235
0
        _query_ctx->set_load_error_url(error_url);
236
0
    }
237
238
874
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
239
0
        _query_ctx->set_first_error_msg(first_error_msg);
240
0
    }
241
242
874
    _query_ctx->cancel(reason, _fragment_id);
243
874
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
244
182
        _is_report_on_cancel = false;
245
692
    } else {
246
3.43k
        for (auto& id : _fragment_instance_ids) {
247
3.43k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
248
3.43k
        }
249
692
    }
250
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
251
    // For stream load the fragment's query_id == load id, it is set in FE.
252
874
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
253
874
    if (stream_load_ctx != nullptr) {
254
0
        stream_load_ctx->pipe->cancel(reason.to_string());
255
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
256
        // We need to set the error URL at this point to ensure error information is properly
257
        // propagated to the client.
258
0
        stream_load_ctx->error_url = get_load_error_url();
259
0
        stream_load_ctx->first_error_msg = get_first_error_msg();
260
0
    }
261
262
3.63k
    for (auto& tasks : _tasks) {
263
8.73k
        for (auto& task : tasks) {
264
8.73k
            task.first->unblock_all_dependencies();
265
8.73k
        }
266
3.63k
    }
267
874
}
268
269
192k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
270
192k
    PipelineId id = _next_pipeline_id++;
271
192k
    auto pipeline = std::make_shared<Pipeline>(
272
192k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
273
192k
            parent ? parent->num_tasks() : _num_instances);
274
192k
    if (idx >= 0) {
275
526
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
276
191k
    } else {
277
191k
        _pipelines.emplace_back(pipeline);
278
191k
    }
279
192k
    if (parent) {
280
62.5k
        parent->set_children(pipeline);
281
62.5k
    }
282
192k
    return pipeline;
283
192k
}
284
285
123k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
286
123k
    {
287
123k
        SCOPED_TIMER(_build_pipelines_timer);
288
        // 2. Build pipelines with operators in this fragment.
289
123k
        auto root_pipeline = add_pipeline();
290
123k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
291
123k
                                         &_root_op, root_pipeline));
292
293
        // Propagate _num_instances from LOCAL_EXCHANGE pipelines to ancestor pipelines
294
        // that inherited reduced num_tasks from a serial operator.
295
123k
        _propagate_local_exchange_num_tasks();
296
297
        // Create deferred local exchangers now that all pipelines have final num_tasks.
298
123k
        RETURN_IF_ERROR(_create_deferred_local_exchangers());
299
300
        // Raise num_tasks for pipelines whose serial non-scan operators (e.g.,
301
        // UNPARTITIONED Exchange) reduced num_tasks below _num_instances.
302
        // Without this, fragment instances 1+ have no task for these pipelines
303
        // and downstream operators fail with "must set shared state".
304
        //
305
        // This applies to ALL pipelines (not just deferred exchanger upstreams):
306
        // fragments with UNION/INTERSECT/EXCEPT + serial Exchange in child
307
        // pipelines also need the raise, even without FE-planned local exchange.
308
        //
309
        // Exception: serial scan sources (pooling scan) keep num_tasks=1 — the
310
        // PassthroughExchanger(1, N) handles the fan-out correctly.
311
        // NOTE: Do NOT raise pipelines whose source is a serial operator
312
        // (Exchange or scan) — they legitimately have 1 task, and raising
313
        // them causes crashes (e.g., 4 Exchange tasks but only 1 receives
314
        // data).  The correct fix for shared state injection across
315
        // instances is handled by the FE: it inserts local exchange nodes
316
        // between serial operators and their downstream consumers, creating
317
        // proper pipeline boundaries with _num_instances tasks.
318
319
        // 3. Create sink operator
320
123k
        if (!_params.fragment.__isset.output_sink) {
321
0
            return Status::InternalError("No output sink in this fragment!");
322
0
        }
323
123k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
324
123k
                                          _params.fragment.output_exprs, _params,
325
123k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
326
123k
                                          *_desc_tbl, root_pipeline->id()));
327
123k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
328
123k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
329
330
191k
        for (PipelinePtr& pipeline : _pipelines) {
331
191k
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
332
191k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
333
191k
        }
334
123k
    }
335
    // 4. Build local exchanger
336
123k
    if (_runtime_state->plan_local_shuffle()) {
337
64.9k
        SCOPED_TIMER(_plan_local_exchanger_timer);
338
64.9k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
339
64.9k
                                             _params.bucket_seq_to_instance_idx,
340
64.9k
                                             _params.shuffle_idx_to_instance_idx));
341
64.9k
    }
342
343
    // 5. Initialize global states in pipelines.
344
192k
    for (PipelinePtr& pipeline : _pipelines) {
345
192k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
346
192k
        pipeline->children().clear();
347
192k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
348
192k
    }
349
350
123k
    {
351
123k
        SCOPED_TIMER(_build_tasks_timer);
352
        // 6. Build pipeline tasks and initialize local state.
353
123k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
354
123k
    }
355
356
123k
    return Status::OK();
357
123k
}
358
359
123k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
360
123k
    if (_prepared) {
361
0
        return Status::InternalError("Already prepared");
362
0
    }
363
123k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
364
123k
        _timeout = _params.query_options.execution_timeout;
365
123k
    }
366
367
123k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
368
123k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
369
123k
    SCOPED_TIMER(_prepare_timer);
370
123k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
371
123k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
372
123k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
373
123k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
374
123k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
375
123k
    {
376
123k
        SCOPED_TIMER(_init_context_timer);
377
123k
        cast_set(_num_instances, _params.local_params.size());
378
123k
        _total_instances =
379
123k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
380
381
123k
        auto* fragment_context = this;
382
383
123k
        if (_params.query_options.__isset.is_report_success) {
384
123k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
385
123k
        }
386
387
        // 1. Set up the global runtime state.
388
123k
        _runtime_state = RuntimeState::create_unique(
389
123k
                _params.query_id, _params.fragment_id, _params.query_options,
390
123k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
391
123k
        _runtime_state->set_task_execution_context(shared_from_this());
392
123k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
393
123k
        if (_params.__isset.backend_id) {
394
121k
            _runtime_state->set_backend_id(_params.backend_id);
395
121k
        }
396
123k
        if (_params.__isset.import_label) {
397
0
            _runtime_state->set_import_label(_params.import_label);
398
0
        }
399
123k
        if (_params.__isset.db_name) {
400
0
            _runtime_state->set_db_name(_params.db_name);
401
0
        }
402
123k
        if (_params.__isset.load_job_id) {
403
0
            _runtime_state->set_load_job_id(_params.load_job_id);
404
0
        }
405
406
123k
        if (_params.is_simplified_param) {
407
36.5k
            _desc_tbl = _query_ctx->desc_tbl;
408
86.9k
        } else {
409
86.9k
            DCHECK(_params.__isset.desc_tbl);
410
86.9k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
411
86.9k
                                                  &_desc_tbl));
412
86.9k
        }
413
123k
        _runtime_state->set_desc_tbl(_desc_tbl);
414
123k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
415
123k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
416
123k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
417
123k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
418
419
        // init fragment_instance_ids
420
123k
        const auto target_size = _params.local_params.size();
421
123k
        _fragment_instance_ids.resize(target_size);
422
396k
        for (size_t i = 0; i < _params.local_params.size(); i++) {
423
272k
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
424
272k
            _fragment_instance_ids[i] = fragment_instance_id;
425
272k
        }
426
123k
    }
427
428
123k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
429
430
123k
    _init_next_report_time();
431
432
123k
    _prepared = true;
433
123k
    return Status::OK();
434
123k
}
435
436
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
437
        int instance_idx,
438
272k
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
439
272k
    const auto& local_params = _params.local_params[instance_idx];
440
272k
    auto fragment_instance_id = local_params.fragment_instance_id;
441
272k
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
442
272k
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
443
272k
    auto get_shared_state = [&](PipelinePtr pipeline)
444
272k
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
445
484k
                                       std::vector<std::shared_ptr<Dependency>>>> {
446
484k
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
447
484k
                                std::vector<std::shared_ptr<Dependency>>>>
448
484k
                shared_state_map;
449
504k
        for (auto& op : pipeline->operators()) {
450
504k
            auto source_id = op->operator_id();
451
504k
            if (auto iter = _op_id_to_shared_state.find(source_id);
452
504k
                iter != _op_id_to_shared_state.end()) {
453
160k
                shared_state_map.insert({source_id, iter->second});
454
160k
            }
455
504k
        }
456
489k
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
457
489k
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
458
489k
                iter != _op_id_to_shared_state.end()) {
459
45.7k
                shared_state_map.insert({sink_to_source_id, iter->second});
460
45.7k
            }
461
489k
        }
462
484k
        return shared_state_map;
463
484k
    };
464
465
871k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
466
599k
        auto& pipeline = _pipelines[pip_idx];
467
599k
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
468
484k
            auto task_runtime_state = RuntimeState::create_unique(
469
484k
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
470
484k
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
471
484k
            {
472
                // Initialize runtime state for this task
473
484k
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
474
475
484k
                task_runtime_state->set_task_execution_context(shared_from_this());
476
484k
                task_runtime_state->set_be_number(local_params.backend_num);
477
478
484k
                if (_params.__isset.backend_id) {
479
482k
                    task_runtime_state->set_backend_id(_params.backend_id);
480
482k
                }
481
484k
                if (_params.__isset.import_label) {
482
0
                    task_runtime_state->set_import_label(_params.import_label);
483
0
                }
484
484k
                if (_params.__isset.db_name) {
485
0
                    task_runtime_state->set_db_name(_params.db_name);
486
0
                }
487
484k
                if (_params.__isset.load_job_id) {
488
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
489
0
                }
490
484k
                if (_params.__isset.wal_id) {
491
0
                    task_runtime_state->set_wal_id(_params.wal_id);
492
0
                }
493
484k
                if (_params.__isset.content_length) {
494
0
                    task_runtime_state->set_content_length(_params.content_length);
495
0
                }
496
497
484k
                task_runtime_state->set_desc_tbl(_desc_tbl);
498
484k
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
499
484k
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
500
484k
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
501
484k
                task_runtime_state->set_max_operator_id(max_operator_id());
502
484k
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
503
484k
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
504
484k
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
505
506
484k
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
507
484k
            }
508
484k
            auto cur_task_id = _total_tasks++;
509
484k
            task_runtime_state->set_task_id(cur_task_id);
510
484k
            task_runtime_state->set_task_num(pipeline->num_tasks());
511
484k
            auto task = std::make_shared<PipelineTask>(
512
484k
                    pipeline, cur_task_id, task_runtime_state.get(),
513
484k
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
514
484k
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
515
484k
                    instance_idx);
516
484k
            pipeline->incr_created_tasks(instance_idx, task.get());
517
484k
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
518
484k
            _tasks[instance_idx].emplace_back(
519
484k
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
520
484k
                            std::move(task), std::move(task_runtime_state)});
521
484k
        }
522
599k
    }
523
524
    /**
525
         * Build DAG for pipeline tasks.
526
         * For example, we have
527
         *
528
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
529
         *            \                      /
530
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
531
         *                 \                          /
532
         *               JoinProbeOperator2 (Pipeline1)
533
         *
534
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
535
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
536
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
537
         *
538
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
539
         * and JoinProbeOperator2.
540
         */
541
599k
    for (auto& _pipeline : _pipelines) {
542
599k
        if (pipeline_id_to_task.contains(_pipeline->id())) {
543
484k
            auto* task = pipeline_id_to_task[_pipeline->id()];
544
484k
            DCHECK(task != nullptr);
545
546
            // If this task has upstream dependency, then inject it into this task.
547
484k
            if (_dag.contains(_pipeline->id())) {
548
322k
                auto& deps = _dag[_pipeline->id()];
549
331k
                for (auto& dep : deps) {
550
331k
                    if (pipeline_id_to_task.contains(dep)) {
551
213k
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
552
213k
                        if (ss) {
553
166k
                            task->inject_shared_state(ss);
554
166k
                        } else {
555
46.9k
                            pipeline_id_to_task[dep]->inject_shared_state(
556
46.9k
                                    task->get_source_shared_state());
557
46.9k
                        }
558
213k
                    }
559
331k
                }
560
322k
            }
561
484k
        }
562
599k
    }
563
871k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
564
599k
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
565
484k
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
566
484k
            DCHECK(pipeline_id_to_profile[pip_idx]);
567
484k
            std::vector<TScanRangeParams> scan_ranges;
568
484k
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
569
484k
            if (local_params.per_node_scan_ranges.contains(node_id)) {
570
109k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
571
109k
            }
572
484k
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
573
484k
                                                             _params.fragment.output_sink));
574
484k
        }
575
599k
    }
576
272k
    {
577
272k
        std::lock_guard<std::mutex> l(_state_map_lock);
578
272k
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
579
272k
    }
580
272k
    return Status::OK();
581
272k
}
582
583
123k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
584
123k
    _total_tasks = 0;
585
123k
    _closed_tasks = 0;
586
123k
    const auto target_size = _params.local_params.size();
587
123k
    _tasks.resize(target_size);
588
123k
    _runtime_filter_mgr_map.resize(target_size);
589
315k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
590
192k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
591
192k
    }
592
123k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
593
594
123k
    if (target_size > 1 &&
595
123k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
596
21.2k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
597
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
598
4
        std::vector<Status> prepare_status(target_size);
599
4
        int submitted_tasks = 0;
600
4
        Status submit_status;
601
4
        CountDownLatch latch((int)target_size);
602
68
        for (int i = 0; i < target_size; i++) {
603
64
            submit_status = thread_pool->submit_func([&, i]() {
604
64
                SCOPED_ATTACH_TASK(_query_ctx.get());
605
64
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
606
64
                latch.count_down();
607
64
            });
608
64
            if (LIKELY(submit_status.ok())) {
609
64
                submitted_tasks++;
610
64
            } else {
611
0
                break;
612
0
            }
613
64
        }
614
4
        latch.arrive_and_wait(target_size - submitted_tasks);
615
4
        if (UNLIKELY(!submit_status.ok())) {
616
0
            return submit_status;
617
0
        }
618
68
        for (int i = 0; i < submitted_tasks; i++) {
619
64
            if (!prepare_status[i].ok()) {
620
0
                return prepare_status[i];
621
0
            }
622
64
        }
623
123k
    } else {
624
395k
        for (int i = 0; i < target_size; i++) {
625
272k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
626
272k
        }
627
123k
    }
628
123k
    _pipeline_parent_map.clear();
629
123k
    _op_id_to_shared_state.clear();
630
    // Record task cardinality once when this fragment context finishes task initialization.
631
123k
    _query_ctx->add_total_task_num(_total_tasks.load(std::memory_order_relaxed));
632
633
123k
    return Status::OK();
634
123k
}
635
636
123k
void PipelineFragmentContext::_init_next_report_time() {
637
123k
    auto interval_s = config::pipeline_status_report_interval;
638
123k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
639
11.7k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
640
11.7k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
641
        // We don't want to wait longer than it takes to run the entire fragment.
642
11.7k
        _previous_report_time =
643
11.7k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
644
11.7k
        _disable_period_report = false;
645
11.7k
    }
646
123k
}
647
648
1.69k
void PipelineFragmentContext::refresh_next_report_time() {
649
1.69k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
650
1.69k
    DCHECK(disable == true);
651
1.69k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
652
1.69k
    _disable_period_report.compare_exchange_strong(disable, false);
653
1.69k
}
654
655
1.87M
void PipelineFragmentContext::trigger_report_if_necessary() {
656
1.87M
    if (!_is_report_success) {
657
1.72M
        return;
658
1.72M
    }
659
145k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
660
145k
    if (disable) {
661
5.85k
        return;
662
5.85k
    }
663
139k
    int32_t interval_s = config::pipeline_status_report_interval;
664
139k
    if (interval_s <= 0) {
665
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
666
0
                        "trigger "
667
0
                        "report.";
668
0
    }
669
139k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
670
139k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
671
139k
    if (MonotonicNanos() > next_report_time) {
672
1.69k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
673
1.69k
                                                            std::memory_order_acq_rel)) {
674
6
            return;
675
6
        }
676
1.69k
        if (VLOG_FILE_IS_ON) {
677
0
            VLOG_FILE << "Reporting "
678
0
                      << "profile for query_id " << print_id(_query_id)
679
0
                      << ", fragment id: " << _fragment_id;
680
681
0
            std::stringstream ss;
682
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
683
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
684
0
            if (_runtime_state->load_channel_profile()) {
685
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
686
0
            }
687
688
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
689
0
                      << " profile:\n"
690
0
                      << ss.str();
691
0
        }
692
1.69k
        auto st = send_report(false);
693
1.69k
        if (!st.ok()) {
694
0
            disable = true;
695
0
            _disable_period_report.compare_exchange_strong(disable, false,
696
0
                                                           std::memory_order_acq_rel);
697
0
        }
698
1.69k
    }
699
139k
}
700
701
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
702
123k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
703
123k
    if (_params.fragment.plan.nodes.empty()) {
704
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
705
0
    }
706
707
123k
    int node_idx = 0;
708
709
123k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
710
123k
                                        &node_idx, root, cur_pipe, 0, false, false));
711
712
123k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
713
0
        return Status::InternalError(
714
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
715
0
    }
716
123k
    return Status::OK();
717
123k
}
718
719
123k
Status PipelineFragmentContext::_create_deferred_local_exchangers() {
720
123k
    for (auto& info : _deferred_exchangers) {
721
        // DANGER ZONE — do not "fix" this line without reading the history.
722
        //
723
        // sender_count seeds Exchanger::_running_sink_operators, which the source side
724
        // waits to reach 0 via sub_running_sink_operators on each sink LocalState close.
725
        // The correct value is THIS pipeline-instance's sink task count, which is exactly
726
        // info.upstream_pipe->num_tasks() — one PipelineTask per task, one close per task.
727
        //
728
        // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to mirror the
729
        //   BE-planned path in _add_local_exchange_impl (~line 1023).  THIS BREAKS the
730
        //   common FE-planned shape of `serial scan → LE(PT) → ...`: upstream_pipe
731
        //   genuinely has num_tasks=1, only 1 close arrives, but seed becomes
732
        //   _num_instances so _running_sink_operators never reaches 0 — downstream
733
        //   sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
734
        //   mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING and regressed
735
        //   exactly this way).  BE-planned mode uses max() because its
736
        //   `cur_pipe` is the source-side pipeline (always raised to _num_instances by
737
        //   add_pipeline) — not analogous to our `upstream_pipe` here, which is the
738
        //   sink-side pipeline that may legitimately stay at 1 for serial sources.
739
        //
740
        // Tempting wrong fix #2: multiply by _num_instances on the theory shared_state
741
        //   is shared across all instances.  Same hang — each fragment-instance
742
        //   PipelineFragmentContext has its OWN _op_id_to_shared_state map, so the
743
        //   exchanger is per-instance, not per-BE.  num_tasks() is already the right
744
        //   close-count for one instance.
745
        //
746
        // If a hang shows up with `_running_sink_operators < 0`, the bug is upstream:
747
        // _propagate_local_exchange_num_tasks left num_tasks too low (or too high) for
748
        // this fragment shape.  Fix THAT pass, not this seed value.
749
18.7k
        const int sender_count = info.upstream_pipe->num_tasks();
750
18.7k
        switch (info.partition_type) {
751
680
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
752
680
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
753
680
            info.shared_state->exchanger = ShuffleExchanger::create_unique(
754
680
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit,
755
680
                    info.partition_type);
756
680
            break;
757
8
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
758
8
            info.shared_state->exchanger = BucketShuffleExchanger::create_unique(
759
8
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit);
760
8
            break;
761
17.2k
        case TLocalPartitionType::PASSTHROUGH:
762
17.2k
            info.shared_state->exchanger = PassthroughExchanger::create_unique(
763
17.2k
                    sender_count, _num_instances, info.free_blocks_limit);
764
17.2k
            break;
765
58
        case TLocalPartitionType::BROADCAST:
766
58
            info.shared_state->exchanger = BroadcastExchanger::create_unique(
767
58
                    sender_count, _num_instances, info.free_blocks_limit);
768
58
            break;
769
672
        case TLocalPartitionType::PASS_TO_ONE:
770
672
            if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
771
672
                info.shared_state->exchanger = PassToOneExchanger::create_unique(
772
672
                        sender_count, _num_instances, info.free_blocks_limit);
773
672
            } else {
774
0
                info.shared_state->exchanger = BroadcastExchanger::create_unique(
775
0
                        sender_count, _num_instances, info.free_blocks_limit);
776
0
            }
777
672
            break;
778
58
        case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
779
58
            info.shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
780
58
                    sender_count, _num_instances, info.free_blocks_limit);
781
58
            break;
782
0
        case TLocalPartitionType::NOOP:
783
0
        case TLocalPartitionType::LOCAL_MERGE_SORT:
784
            // FE-planned LocalExchangeNode currently never emits NOOP or LOCAL_MERGE_SORT
785
            // through the deferred-exchanger path.  NOOP means "no exchange needed" and
786
            // is filtered out before reaching here; LOCAL_MERGE_SORT is planned by the
787
            // legacy BE path only.  Crash in debug to surface the protocol violation if
788
            // that ever changes; return an error in release to avoid silently corrupting
789
            // execution.
790
0
            DCHECK(false) << "FE-planned local exchange should not emit partition_type="
791
0
                          << static_cast<int>(info.partition_type);
792
0
            return Status::InternalError("FE-planned local exchange emitted unsupported type: " +
793
0
                                         std::to_string(static_cast<int>(info.partition_type)));
794
0
        default:
795
            // New TLocalPartitionType added on FE side without a BE handler here.
796
0
            DCHECK(false) << "Unhandled TLocalPartitionType in deferred exchangers: "
797
0
                          << static_cast<int>(info.partition_type);
798
0
            return Status::InternalError("Unsupported FE-planned local exchange type: " +
799
0
                                         std::to_string(static_cast<int>(info.partition_type)));
800
18.7k
        }
801
18.7k
    }
802
123k
    _deferred_exchangers.clear();
803
123k
    return Status::OK();
804
123k
}
805
806
123k
void PipelineFragmentContext::_propagate_local_exchange_num_tasks() {
807
    // Only runs when FE has planned local exchanges and BE deferred their construction.
808
    // In legacy mode (enable_local_shuffle_planner=false) BE plans LE itself via
809
    // _plan_local_exchange and _deferred_exchangers stays empty — the legacy path
810
    // already gets its num_tasks right at construction time, so the propagate passes
811
    // would be no-ops and are skipped.  This is a transitional design: once the FE
812
    // planner is the only planner, the propagation logic itself should degrade into
813
    // a pure assertion that the FE plan already wired the right num_tasks everywhere.
814
123k
    if (_deferred_exchangers.empty()) {
815
106k
        return;
816
106k
    }
817
    // Reconcile num_tasks across paired pipelines created by pipeline-splitting operators
818
    // (AGG, SORT, JOIN): they share state via inject_shared_state and must agree, or
819
    // instance 1+ tasks access null shared_state.  A pipeline's num_tasks is fully
820
    // determined by its source operator plus its upstreams:
821
    //   - LocalExchangeSource  -> _num_instances (the LE re-parallelizes)
822
    //   - serial source        -> its reduced count (kept as-is, typically 1)
823
    //   - otherwise (splitter) -> inherit from upstreams: raise to _num_instances if any
824
    //                             upstream was raised by an LE, then lower to a serial
825
    //                             upstream's count (lower wins).
826
    // Visiting each pipeline only after all its upstreams (topological order over _dag) lets
827
    // a single sweep reach the same fixpoint the previous two while-loops iterated to — those
828
    // only existed to reconcile the top-down build's parent-inherited num_tasks guesses.
829
17.3k
    std::map<PipelineId, PipelinePtr> id_to_pipe;
830
17.3k
    std::map<PipelineId, std::vector<PipelineId>> downstreams_of;
831
17.3k
    std::map<PipelineId, int> in_degree;
832
51.1k
    for (auto& p : _pipelines) {
833
51.1k
        id_to_pipe[p->id()] = p;
834
51.1k
        in_degree.try_emplace(p->id(), 0);
835
51.1k
    }
836
33.0k
    for (const auto& [downstream_id, upstream_ids] : _dag) {
837
33.8k
        for (auto upstream_id : upstream_ids) {
838
33.8k
            downstreams_of[upstream_id].push_back(downstream_id);
839
33.8k
            in_degree[downstream_id]++;
840
33.8k
        }
841
33.0k
    }
842
17.3k
    std::vector<PipelineId> ready;
843
51.1k
    for (const auto& [id, deg] : in_degree) {
844
51.1k
        if (deg == 0) {
845
18.1k
            ready.push_back(id);
846
18.1k
        }
847
51.1k
    }
848
17.3k
    size_t visited = 0;
849
68.4k
    while (!ready.empty()) {
850
51.1k
        const auto id = ready.back();
851
51.1k
        ready.pop_back();
852
51.1k
        visited++;
853
51.1k
        auto pit = id_to_pipe.find(id);
854
51.1k
        if (pit != id_to_pipe.end()) {
855
51.1k
            auto& pipe = pit->second;
856
51.1k
            const auto& ops = pipe->operators();
857
51.1k
            const bool le_source =
858
51.1k
                    !ops.empty() && dynamic_cast<LocalExchangeSourceOperatorX*>(ops.front().get());
859
51.1k
            const bool serial_source = !ops.empty() && ops.front()->is_serial_operator();
860
51.1k
            if (le_source) {
861
18.7k
                pipe->set_num_tasks(_num_instances);
862
32.3k
            } else if (!serial_source) {
863
16.5k
                int target = pipe->num_tasks();
864
16.5k
                const auto up_it = _dag.find(id);
865
16.5k
                if (up_it != _dag.end()) {
866
                    // raise: any upstream already at _num_instances (e.g. an LE source)
867
14.2k
                    for (auto upstream_id : up_it->second) {
868
14.2k
                        auto uit = id_to_pipe.find(upstream_id);
869
14.2k
                        if (uit != id_to_pipe.end() && uit->second->num_tasks() >= _num_instances) {
870
14.2k
                            target = _num_instances;
871
14.2k
                            break;
872
14.2k
                        }
873
14.2k
                    }
874
                    // lower: a serial upstream with fewer tasks (wins over the raise above)
875
14.3k
                    for (auto upstream_id : up_it->second) {
876
14.3k
                        auto uit = id_to_pipe.find(upstream_id);
877
14.3k
                        if (uit != id_to_pipe.end() && uit->second->num_tasks() < target &&
878
14.3k
                            !uit->second->operators().empty() &&
879
14.3k
                            uit->second->operators().front()->is_serial_operator()) {
880
0
                            target = uit->second->num_tasks();
881
0
                        }
882
14.3k
                    }
883
14.2k
                }
884
16.5k
                pipe->set_num_tasks(target);
885
16.5k
            }
886
51.1k
        }
887
51.1k
        for (auto down : downstreams_of[id]) {
888
33.8k
            if (--in_degree[down] == 0) {
889
33.0k
                ready.push_back(down);
890
33.0k
            }
891
33.8k
        }
892
51.1k
    }
893
    // The pipeline DAG is acyclic; if a future change introduces a back-edge, some pipelines
894
    // stay unvisited (in_degree never reaches 0) — fail loudly rather than silently leaving
895
    // their num_tasks unreconciled.
896
17.3k
    DCHECK_EQ(visited, in_degree.size())
897
0
            << "pipeline num_tasks topological sweep visited " << visited << " of "
898
0
            << in_degree.size() << " pipelines (cycle in _dag?)";
899
17.3k
}
900
901
Status PipelineFragmentContext::_create_tree_helper(
902
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
903
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
904
194k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
905
    // propagate error case
906
194k
    if (*node_idx >= tnodes.size()) {
907
0
        return Status::InternalError(
908
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
909
0
                *node_idx, tnodes.size());
910
0
    }
911
194k
    const TPlanNode& tnode = tnodes[*node_idx];
912
913
194k
    int num_children = tnodes[*node_idx].num_children;
914
194k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
915
194k
    bool current_require_bucket_distribution = require_bucket_distribution;
916
    // TODO: Create CacheOperator is confused now
917
194k
    OperatorPtr op = nullptr;
918
194k
    OperatorPtr cache_op = nullptr;
919
194k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
920
194k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
921
194k
                                     followed_by_shuffled_operator,
922
194k
                                     current_require_bucket_distribution, cache_op));
923
    // Initialization must be done here. For example, group by expressions in agg will be used to
924
    // decide if a local shuffle should be planed, so it must be initialized here.
925
194k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
926
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
927
194k
    if (parent != nullptr) {
928
        // add to parent's child(s)
929
71.0k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
930
123k
    } else {
931
123k
        *root = op;
932
123k
    }
933
    /**
934
     * `TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
935
     *
936
     * For plan:
937
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
938
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
939
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
940
     *
941
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
942
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
943
     */
944
194k
    auto required_data_distribution =
945
194k
            cur_pipe->operators().empty()
946
194k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
947
194k
                    : op->required_data_distribution(_runtime_state.get());
948
194k
    current_followed_by_shuffled_operator =
949
194k
            ((followed_by_shuffled_operator ||
950
194k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
951
191k
                                             : op->is_shuffled_operator())) &&
952
194k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
953
194k
            (followed_by_shuffled_operator &&
954
188k
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
955
956
194k
    current_require_bucket_distribution =
957
194k
            ((require_bucket_distribution ||
958
194k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
959
191k
                                             : op->is_colocated_operator())) &&
960
194k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
961
194k
            (require_bucket_distribution &&
962
189k
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
963
964
194k
    if (num_children == 0) {
965
128k
        _use_serial_source = op->is_serial_operator();
966
128k
    }
967
    // rely on that tnodes is preorder of the plan
968
265k
    for (int i = 0; i < num_children; i++) {
969
71.0k
        ++*node_idx;
970
71.0k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
971
71.0k
                                            current_followed_by_shuffled_operator,
972
71.0k
                                            current_require_bucket_distribution));
973
974
        // we are expecting a child, but have used all nodes
975
        // this means we have been given a bad tree and must fail
976
71.0k
        if (*node_idx >= tnodes.size()) {
977
0
            return Status::InternalError(
978
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
979
0
                    "nodes: {}",
980
0
                    *node_idx, tnodes.size());
981
0
        }
982
71.0k
    }
983
984
194k
    return Status::OK();
985
194k
}
986
987
void PipelineFragmentContext::_inherit_pipeline_properties(
988
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
989
526
        PipelinePtr pipe_with_sink) {
990
526
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
991
526
    pipe_with_source->set_num_tasks(_num_instances);
992
526
    pipe_with_source->set_data_distribution(data_distribution);
993
526
}
994
995
Status PipelineFragmentContext::_add_local_exchange_impl(
996
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
997
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
998
        const std::map<int, int>& bucket_seq_to_instance_idx,
999
526
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1000
526
    auto& operators = cur_pipe->operators();
1001
526
    const auto downstream_pipeline_id = cur_pipe->id();
1002
526
    auto local_exchange_id = next_operator_id();
1003
    // 1. Create a new pipeline with local exchange sink.
1004
526
    DataSinkOperatorPtr sink;
1005
526
    auto sink_id = next_sink_operator_id();
1006
1007
    /**
1008
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
1009
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
1010
     */
1011
526
    const bool followed_by_shuffled_operator =
1012
526
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
1013
526
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
1014
526
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
1015
526
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
1016
526
                                         followed_by_shuffled_operator && !_use_serial_source;
1017
526
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
1018
526
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
1019
526
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
1020
526
    if (bucket_seq_to_instance_idx.empty() &&
1021
526
        data_distribution.distribution_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
1022
0
        data_distribution.distribution_type =
1023
0
                use_global_hash_shuffle ? TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE
1024
0
                                        : TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1025
0
    }
1026
526
    if (!use_global_hash_shuffle &&
1027
526
        data_distribution.distribution_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
1028
0
        data_distribution.distribution_type = TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1029
0
    }
1030
526
    RETURN_IF_ERROR(new_pip->set_sink(sink));
1031
526
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
1032
526
                                          num_buckets, shuffle_idx_to_instance_idx));
1033
1034
    // 2. Create and initialize LocalExchangeSharedState.
1035
526
    std::shared_ptr<LocalExchangeSharedState> shared_state =
1036
526
            LocalExchangeSharedState::create_shared(_num_instances);
1037
526
    switch (data_distribution.distribution_type) {
1038
0
    case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
1039
0
    case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
1040
0
        shared_state->exchanger = ShuffleExchanger::create_unique(
1041
0
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1042
0
                use_global_hash_shuffle ? _total_instances : _num_instances,
1043
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1044
0
                        ? cast_set<int>(
1045
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1046
0
                        : 0,
1047
0
                data_distribution.distribution_type);
1048
0
        break;
1049
0
    case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
1050
0
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
1051
0
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
1052
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1053
0
                        ? cast_set<int>(
1054
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1055
0
                        : 0);
1056
0
        break;
1057
526
    case TLocalPartitionType::PASSTHROUGH:
1058
526
        shared_state->exchanger = PassthroughExchanger::create_unique(
1059
526
                cur_pipe->num_tasks(), _num_instances,
1060
526
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1061
526
                        ? cast_set<int>(
1062
526
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1063
526
                        : 0);
1064
526
        break;
1065
0
    case TLocalPartitionType::BROADCAST:
1066
0
        shared_state->exchanger = BroadcastExchanger::create_unique(
1067
0
                cur_pipe->num_tasks(), _num_instances,
1068
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1069
0
                        ? cast_set<int>(
1070
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1071
0
                        : 0);
1072
0
        break;
1073
0
    case TLocalPartitionType::PASS_TO_ONE:
1074
0
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
1075
            // If shared hash table is enabled for BJ, hash table will be built by only one task
1076
0
            shared_state->exchanger = PassToOneExchanger::create_unique(
1077
0
                    cur_pipe->num_tasks(), _num_instances,
1078
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1079
0
                            ? cast_set<int>(_runtime_state->query_options()
1080
0
                                                    .local_exchange_free_blocks_limit)
1081
0
                            : 0);
1082
0
        } else {
1083
0
            shared_state->exchanger = BroadcastExchanger::create_unique(
1084
0
                    cur_pipe->num_tasks(), _num_instances,
1085
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1086
0
                            ? cast_set<int>(_runtime_state->query_options()
1087
0
                                                    .local_exchange_free_blocks_limit)
1088
0
                            : 0);
1089
0
        }
1090
0
        break;
1091
0
    case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
1092
0
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
1093
0
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1094
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1095
0
                        ? cast_set<int>(
1096
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1097
0
                        : 0);
1098
0
        break;
1099
0
    default:
1100
0
        return Status::InternalError("Unsupported local exchange type : " +
1101
0
                                     std::to_string((int)data_distribution.distribution_type));
1102
526
    }
1103
526
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
1104
526
                                             "LOCAL_EXCHANGE_OPERATOR");
1105
526
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
1106
526
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
1107
1108
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
1109
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
1110
1111
    // 3.1 Initialize new pipeline's operator list.
1112
526
    std::copy(operators.begin(), operators.begin() + idx,
1113
526
              std::inserter(new_pip->operators(), new_pip->operators().end()));
1114
1115
    // 3.2 Erase unused operators in previous pipeline.
1116
526
    operators.erase(operators.begin(), operators.begin() + idx);
1117
1118
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
1119
526
    OperatorPtr source_op;
1120
526
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
1121
526
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
1122
526
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
1123
526
    if (!operators.empty()) {
1124
0
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
1125
0
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
1126
0
    }
1127
526
    operators.insert(operators.begin(), source_op);
1128
1129
    // 5. Set children for two pipelines separately.
1130
526
    std::vector<std::shared_ptr<Pipeline>> new_children;
1131
526
    std::vector<PipelineId> edges_with_source;
1132
526
    for (auto child : cur_pipe->children()) {
1133
526
        bool found = false;
1134
526
        for (auto op : new_pip->operators()) {
1135
526
            if (child->sink()->node_id() == op->node_id()) {
1136
0
                new_pip->set_children(child);
1137
0
                found = true;
1138
0
            };
1139
526
        }
1140
526
        if (!found) {
1141
526
            new_children.push_back(child);
1142
526
            edges_with_source.push_back(child->id());
1143
526
        }
1144
526
    }
1145
526
    new_children.push_back(new_pip);
1146
526
    edges_with_source.push_back(new_pip->id());
1147
1148
    // 6. Set DAG for new pipelines.
1149
526
    if (!new_pip->children().empty()) {
1150
0
        std::vector<PipelineId> edges_with_sink;
1151
0
        for (auto child : new_pip->children()) {
1152
0
            edges_with_sink.push_back(child->id());
1153
0
        }
1154
0
        _dag.insert({new_pip->id(), edges_with_sink});
1155
0
    }
1156
526
    cur_pipe->set_children(new_children);
1157
526
    _dag[downstream_pipeline_id] = edges_with_source;
1158
526
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
1159
526
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
1160
526
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
1161
1162
    // 7. Inherit properties from current pipeline.
1163
526
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
1164
526
    return Status::OK();
1165
526
}
1166
1167
Status PipelineFragmentContext::_add_local_exchange(
1168
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
1169
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
1170
        const std::map<int, int>& bucket_seq_to_instance_idx,
1171
9.04k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1172
9.04k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
1173
8.52k
        return Status::OK();
1174
8.52k
    }
1175
1176
526
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
1177
0
        return Status::OK();
1178
0
    }
1179
526
    *do_local_exchange = true;
1180
1181
526
    auto& operators = cur_pipe->operators();
1182
526
    auto total_op_num = operators.size();
1183
526
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
1184
526
    RETURN_IF_ERROR(_add_local_exchange_impl(
1185
526
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
1186
526
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1187
1188
526
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
1189
0
            << "total_op_num: " << total_op_num
1190
0
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
1191
0
            << " new_pip->operators().size(): " << new_pip->operators().size();
1192
1193
    // There are some local shuffles with relatively heavy operations on the sink.
1194
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
1195
    // Therefore, local passthrough is used to increase the concurrency of the sink.
1196
    // op -> local sink(1) -> local source (n)
1197
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
1198
526
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
1199
526
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
1200
0
        RETURN_IF_ERROR(_add_local_exchange_impl(
1201
0
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
1202
0
                add_pipeline(new_pip, pip_idx + 2),
1203
0
                DataDistribution(TLocalPartitionType::PASSTHROUGH), do_local_exchange, num_buckets,
1204
0
                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1205
0
    }
1206
526
    return Status::OK();
1207
526
}
1208
1209
Status PipelineFragmentContext::_plan_local_exchange(
1210
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
1211
64.9k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1212
155k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
1213
90.0k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
1214
        // Set property if child pipeline is not join operator's child.
1215
90.0k
        if (!_pipelines[pip_idx]->children().empty()) {
1216
18.9k
            for (auto& child : _pipelines[pip_idx]->children()) {
1217
18.9k
                if (child->sink()->node_id() ==
1218
18.9k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
1219
14.8k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
1220
14.8k
                }
1221
18.9k
            }
1222
16.8k
        }
1223
1224
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1225
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1226
        // still keep colocate plan after local shuffle
1227
90.0k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1228
90.0k
                                             bucket_seq_to_instance_idx,
1229
90.0k
                                             shuffle_idx_to_instance_idx));
1230
90.0k
    }
1231
64.9k
    return Status::OK();
1232
64.9k
}
1233
1234
Status PipelineFragmentContext::_plan_local_exchange(
1235
        int num_buckets, int pip_idx, PipelinePtr pip,
1236
        const std::map<int, int>& bucket_seq_to_instance_idx,
1237
90.0k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1238
90.0k
    int idx = 1;
1239
90.0k
    bool do_local_exchange = false;
1240
90.0k
    do {
1241
90.0k
        auto& ops = pip->operators();
1242
90.0k
        do_local_exchange = false;
1243
        // Plan local exchange for each operator.
1244
96.2k
        for (; idx < ops.size();) {
1245
6.17k
            auto _le_req = ops[idx]->required_data_distribution(_runtime_state.get());
1246
6.17k
            if (_le_req.need_local_exchange()) {
1247
4.13k
                RETURN_IF_ERROR(_add_local_exchange(
1248
4.13k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, _le_req,
1249
4.13k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1250
4.13k
                        shuffle_idx_to_instance_idx));
1251
4.13k
            }
1252
6.17k
            if (do_local_exchange) {
1253
                // If local exchange is needed for current operator, we will split this pipeline to
1254
                // two pipelines by local exchange sink/source. And then we need to process remaining
1255
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1256
                // is current operator was already processed) and continue to plan local exchange.
1257
0
                idx = 2;
1258
0
                break;
1259
0
            }
1260
6.17k
            idx++;
1261
6.17k
        }
1262
90.0k
    } while (do_local_exchange);
1263
90.0k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1264
4.91k
        RETURN_IF_ERROR(_add_local_exchange(
1265
4.91k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1266
4.91k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1267
4.91k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1268
4.91k
    }
1269
90.0k
    return Status::OK();
1270
90.0k
}
1271
1272
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1273
                                                  const std::vector<TExpr>& output_exprs,
1274
                                                  const TPipelineFragmentParams& params,
1275
                                                  const RowDescriptor& row_desc,
1276
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1277
123k
                                                  PipelineId cur_pipeline_id) {
1278
123k
    switch (thrift_sink.type) {
1279
34.5k
    case TDataSinkType::DATA_STREAM_SINK: {
1280
34.5k
        if (!thrift_sink.__isset.stream_sink) {
1281
0
            return Status::InternalError("Missing data stream sink.");
1282
0
        }
1283
34.5k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1284
34.5k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1285
34.5k
                params.destinations, _fragment_instance_ids);
1286
34.5k
        break;
1287
34.5k
    }
1288
77.3k
    case TDataSinkType::RESULT_SINK: {
1289
77.3k
        if (!thrift_sink.__isset.result_sink) {
1290
0
            return Status::InternalError("Missing data buffer sink.");
1291
0
        }
1292
1293
77.3k
        auto& pipeline = _pipelines[cur_pipeline_id];
1294
77.3k
        int child_node_id = pipeline->operators().back()->node_id();
1295
77.3k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), child_node_id + 1,
1296
77.3k
                                                      row_desc, output_exprs,
1297
77.3k
                                                      thrift_sink.result_sink);
1298
77.3k
        break;
1299
77.3k
    }
1300
54
    case TDataSinkType::DICTIONARY_SINK: {
1301
54
        if (!thrift_sink.__isset.dictionary_sink) {
1302
0
            return Status::InternalError("Missing dict sink.");
1303
0
        }
1304
1305
54
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1306
54
                                                    thrift_sink.dictionary_sink);
1307
54
        break;
1308
54
    }
1309
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1310
5.89k
    case TDataSinkType::OLAP_TABLE_SINK: {
1311
5.89k
        auto& pipeline = _pipelines[cur_pipeline_id];
1312
5.89k
        int child_node_id = pipeline->operators().back()->node_id();
1313
5.89k
        if (state->query_options().enable_memtable_on_sink_node &&
1314
5.89k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1315
5.89k
            !_has_row_binlog(thrift_sink.olap_table_sink) && !config::is_cloud_mode()) {
1316
3.23k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(
1317
3.23k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1318
3.23k
        } else {
1319
2.66k
            _sink = std::make_shared<OlapTableSinkOperatorX>(
1320
2.66k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1321
2.66k
        }
1322
5.89k
        break;
1323
0
    }
1324
0
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1325
0
        DCHECK(thrift_sink.__isset.olap_table_sink);
1326
0
        DCHECK(state->get_query_ctx() != nullptr);
1327
0
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1328
0
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1329
0
                                                                output_exprs);
1330
0
        break;
1331
0
    }
1332
1.48k
    case TDataSinkType::HIVE_TABLE_SINK: {
1333
1.48k
        if (!thrift_sink.__isset.hive_table_sink) {
1334
0
            return Status::InternalError("Missing hive table sink.");
1335
0
        }
1336
1.48k
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1337
1.48k
                                                         output_exprs);
1338
1.48k
        break;
1339
1.48k
    }
1340
1.73k
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1341
1.73k
        if (!thrift_sink.__isset.iceberg_table_sink) {
1342
0
            return Status::InternalError("Missing iceberg table sink.");
1343
0
        }
1344
1.73k
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1345
4
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1346
4
                                                                     row_desc, output_exprs);
1347
1.73k
        } else {
1348
1.73k
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1349
1.73k
                                                                row_desc, output_exprs);
1350
1.73k
        }
1351
1.73k
        break;
1352
1.73k
    }
1353
20
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1354
20
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1355
0
            return Status::InternalError("Missing iceberg delete sink.");
1356
0
        }
1357
20
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1358
20
                                                             row_desc, output_exprs);
1359
20
        break;
1360
20
    }
1361
80
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1362
80
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1363
0
            return Status::InternalError("Missing iceberg merge sink.");
1364
0
        }
1365
80
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1366
80
                                                            output_exprs);
1367
80
        break;
1368
80
    }
1369
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1370
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1371
0
            return Status::InternalError("Missing max compute table sink.");
1372
0
        }
1373
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1374
0
                                                       output_exprs);
1375
0
        break;
1376
0
    }
1377
88
    case TDataSinkType::JDBC_TABLE_SINK: {
1378
88
        if (!thrift_sink.__isset.jdbc_table_sink) {
1379
0
            return Status::InternalError("Missing data jdbc sink.");
1380
0
        }
1381
88
        if (config::enable_java_support) {
1382
88
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1383
88
                                                             output_exprs);
1384
88
        } else {
1385
0
            return Status::InternalError(
1386
0
                    "Jdbc table sink is not enabled, you can change be config "
1387
0
                    "enable_java_support to true and restart be.");
1388
0
        }
1389
88
        break;
1390
88
    }
1391
88
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1392
0
        if (!thrift_sink.__isset.memory_scratch_sink) {
1393
0
            return Status::InternalError("Missing data buffer sink.");
1394
0
        }
1395
1396
0
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1397
0
                                                             output_exprs);
1398
0
        break;
1399
0
    }
1400
164
    case TDataSinkType::RESULT_FILE_SINK: {
1401
164
        if (!thrift_sink.__isset.result_file_sink) {
1402
0
            return Status::InternalError("Missing result file sink.");
1403
0
        }
1404
1405
        // Result file sink is not the top sink
1406
164
        if (params.__isset.destinations && !params.destinations.empty()) {
1407
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1408
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1409
0
                    params.destinations, output_exprs, desc_tbl);
1410
164
        } else {
1411
164
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1412
164
                                                              output_exprs);
1413
164
        }
1414
164
        break;
1415
164
    }
1416
2.06k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1417
2.06k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1418
2.06k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1419
2.06k
        auto sink_id = next_sink_operator_id();
1420
2.06k
        const int multi_cast_node_id = sink_id;
1421
2.06k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1422
        // one sink has multiple sources.
1423
2.06k
        std::vector<int> sources;
1424
8.27k
        for (int i = 0; i < sender_size; ++i) {
1425
6.20k
            auto source_id = next_operator_id();
1426
6.20k
            sources.push_back(source_id);
1427
6.20k
        }
1428
1429
2.06k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1430
2.06k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1431
8.27k
        for (int i = 0; i < sender_size; ++i) {
1432
6.20k
            auto new_pipeline = add_pipeline();
1433
            // use to exchange sink
1434
6.20k
            RowDescriptor* exchange_row_desc = nullptr;
1435
6.20k
            {
1436
6.20k
                const auto& tmp_row_desc =
1437
6.20k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1438
6.20k
                                ? RowDescriptor(state->desc_tbl(),
1439
6.20k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1440
6.20k
                                                         .output_tuple_id})
1441
6.20k
                                : row_desc;
1442
6.20k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1443
6.20k
            }
1444
6.20k
            auto source_id = sources[i];
1445
6.20k
            OperatorPtr source_op;
1446
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1447
6.20k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1448
6.20k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1449
6.20k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1450
6.20k
                    /*operator_id=*/source_id);
1451
6.20k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1452
6.20k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1453
            // 2. create and set sink operator of data stream sender for new pipeline
1454
1455
6.20k
            DataSinkOperatorPtr sink_op;
1456
6.20k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1457
6.20k
                    state, *exchange_row_desc, next_sink_operator_id(),
1458
6.20k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1459
6.20k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1460
1461
6.20k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1462
6.20k
            {
1463
6.20k
                TDataSink* t = pool->add(new TDataSink());
1464
6.20k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1465
6.20k
                RETURN_IF_ERROR(sink_op->init(*t));
1466
6.20k
            }
1467
1468
            // 3. set dependency dag
1469
6.20k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1470
6.20k
        }
1471
2.06k
        if (sources.empty()) {
1472
0
            return Status::InternalError("size of sources must be greater than 0");
1473
0
        }
1474
2.06k
        break;
1475
2.06k
    }
1476
2.06k
    case TDataSinkType::BLACKHOLE_SINK: {
1477
10
        if (!thrift_sink.__isset.blackhole_sink) {
1478
0
            return Status::InternalError("Missing blackhole sink.");
1479
0
        }
1480
1481
10
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1482
10
        break;
1483
10
    }
1484
156
    case TDataSinkType::TVF_TABLE_SINK: {
1485
156
        if (!thrift_sink.__isset.tvf_table_sink) {
1486
0
            return Status::InternalError("Missing TVF table sink.");
1487
0
        }
1488
156
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1489
156
                                                        output_exprs);
1490
156
        break;
1491
156
    }
1492
0
    default:
1493
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1494
123k
    }
1495
123k
    return Status::OK();
1496
123k
}
1497
1498
// NOLINTBEGIN(readability-function-size)
1499
// NOLINTBEGIN(readability-function-cognitive-complexity)
1500
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1501
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1502
                                                 PipelinePtr& cur_pipe, int parent_idx,
1503
                                                 int child_idx,
1504
                                                 const bool followed_by_shuffled_operator,
1505
                                                 const bool require_bucket_distribution,
1506
194k
                                                 OperatorPtr& cache_op) {
1507
194k
    std::vector<DataSinkOperatorPtr> sink_ops;
1508
194k
    Defer defer = Defer([&]() {
1509
194k
        if (op) {
1510
194k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1511
194k
        }
1512
194k
        for (auto& s : sink_ops) {
1513
62.0k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1514
62.0k
        }
1515
194k
    });
1516
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1517
    // Therefore, here we need to use a stack-like structure.
1518
194k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1519
194k
    std::stringstream error_msg;
1520
194k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1521
1522
194k
    bool fe_with_old_version = false;
1523
194k
    switch (tnode.node_type) {
1524
57.2k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1525
57.2k
        op = std::make_shared<OlapScanOperatorX>(
1526
57.2k
                pool, tnode, next_operator_id(), descs, _num_instances,
1527
57.2k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1528
57.2k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1529
57.2k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1530
57.2k
        break;
1531
57.2k
    }
1532
0
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1533
0
        DCHECK(_query_ctx != nullptr);
1534
0
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1535
0
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1536
0
                                                    _num_instances);
1537
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1538
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1539
0
        break;
1540
0
    }
1541
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1542
0
        if (config::enable_java_support) {
1543
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1544
0
                                                     _num_instances);
1545
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1546
0
        } else {
1547
0
            return Status::InternalError(
1548
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1549
0
                    "to true and restart be.");
1550
0
        }
1551
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1552
0
        break;
1553
0
    }
1554
23.6k
    case TPlanNodeType::FILE_SCAN_NODE: {
1555
23.6k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1556
23.6k
                                                 _num_instances);
1557
23.6k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1558
23.6k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1559
23.6k
        break;
1560
23.6k
    }
1561
40.7k
    case TPlanNodeType::EXCHANGE_NODE: {
1562
40.7k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1563
40.7k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1564
40.7k
                                  : 0;
1565
40.7k
        DCHECK_GT(num_senders, 0);
1566
40.7k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1567
40.7k
                                                       num_senders);
1568
40.7k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1569
40.7k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1570
40.7k
        break;
1571
40.7k
    }
1572
25.7k
    case TPlanNodeType::AGGREGATION_NODE: {
1573
25.7k
        if (tnode.agg_node.grouping_exprs.empty() &&
1574
25.7k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1575
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1576
0
                                         ": group by and output is empty");
1577
0
        }
1578
25.7k
        bool need_create_cache_op =
1579
25.7k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1580
25.7k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1581
0
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1582
0
            auto cache_source_id = next_operator_id();
1583
0
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1584
0
                                                        _params.fragment.query_cache_param);
1585
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1586
1587
0
            const auto downstream_pipeline_id = cur_pipe->id();
1588
0
            if (!_dag.contains(downstream_pipeline_id)) {
1589
0
                _dag.insert({downstream_pipeline_id, {}});
1590
0
            }
1591
0
            new_pipe = add_pipeline(cur_pipe);
1592
0
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1593
1594
0
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1595
0
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1596
0
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1597
0
            return Status::OK();
1598
0
        };
1599
25.7k
        const bool group_by_limit_opt =
1600
25.7k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1601
1602
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1603
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1604
25.7k
        const bool enable_spill = _runtime_state->enable_spill() &&
1605
25.7k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1606
25.7k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1607
25.7k
                                      tnode.agg_node.use_streaming_preaggregation &&
1608
25.7k
                                      !tnode.agg_node.grouping_exprs.empty();
1609
        // TODO: distinct streaming agg does not support spill.
1610
25.7k
        const bool can_use_distinct_streaming_agg =
1611
25.7k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1612
25.7k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1613
25.7k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1614
25.7k
                _params.query_options.enable_distinct_streaming_aggregation;
1615
1616
25.7k
        if (can_use_distinct_streaming_agg) {
1617
52
            if (need_create_cache_op) {
1618
0
                PipelinePtr new_pipe;
1619
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1620
1621
0
                cache_op = op;
1622
0
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1623
0
                                                                     tnode, descs);
1624
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1625
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1626
0
                cur_pipe = new_pipe;
1627
52
            } else {
1628
52
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1629
52
                                                                     tnode, descs);
1630
52
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1631
52
            }
1632
25.6k
        } else if (is_streaming_agg) {
1633
682
            if (need_create_cache_op) {
1634
0
                PipelinePtr new_pipe;
1635
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1636
0
                cache_op = op;
1637
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1638
0
                                                             descs);
1639
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1640
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1641
0
                cur_pipe = new_pipe;
1642
682
            } else {
1643
682
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1644
682
                                                             descs);
1645
682
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1646
682
            }
1647
24.9k
        } else {
1648
            // create new pipeline to add query cache operator
1649
24.9k
            PipelinePtr new_pipe;
1650
24.9k
            if (need_create_cache_op) {
1651
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1652
0
                cache_op = op;
1653
0
            }
1654
1655
24.9k
            if (enable_spill) {
1656
110
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1657
110
                                                                     next_operator_id(), descs);
1658
24.8k
            } else {
1659
24.8k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1660
24.8k
            }
1661
24.9k
            if (need_create_cache_op) {
1662
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1663
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1664
0
                cur_pipe = new_pipe;
1665
24.9k
            } else {
1666
24.9k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1667
24.9k
            }
1668
1669
24.9k
            const auto downstream_pipeline_id = cur_pipe->id();
1670
24.9k
            if (!_dag.contains(downstream_pipeline_id)) {
1671
22.9k
                _dag.insert({downstream_pipeline_id, {}});
1672
22.9k
            }
1673
24.9k
            cur_pipe = add_pipeline(cur_pipe);
1674
24.9k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1675
1676
24.9k
            if (enable_spill) {
1677
110
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1678
110
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1679
24.8k
            } else {
1680
24.8k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1681
24.8k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1682
24.8k
            }
1683
24.9k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1684
24.9k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1685
24.9k
        }
1686
25.7k
        break;
1687
25.7k
    }
1688
25.7k
    case TPlanNodeType::BUCKETED_AGGREGATION_NODE: {
1689
0
        if (tnode.bucketed_agg_node.grouping_exprs.empty()) {
1690
0
            return Status::InternalError(
1691
0
                    "Bucketed aggregation node {} should not be used without group by keys",
1692
0
                    tnode.node_id);
1693
0
        }
1694
1695
        // Create source operator (goes on the current / downstream pipeline).
1696
0
        op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1697
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1698
1699
        // Create a new pipeline for the sink side.
1700
0
        const auto downstream_pipeline_id = cur_pipe->id();
1701
0
        if (!_dag.contains(downstream_pipeline_id)) {
1702
0
            _dag.insert({downstream_pipeline_id, {}});
1703
0
        }
1704
0
        cur_pipe = add_pipeline(cur_pipe);
1705
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1706
1707
        // Create sink operator.
1708
0
        sink_ops.push_back(std::make_shared<BucketedAggSinkOperatorX>(
1709
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1710
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1711
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1712
1713
        // Pre-register a single shared state for ALL instances so that every
1714
        // sink instance writes its per-instance hash table into the same
1715
        // BucketedAggSharedState and every source instance can merge across
1716
        // all of them.
1717
0
        {
1718
0
            auto shared_state = BucketedAggSharedState::create_shared();
1719
0
            shared_state->id = op->operator_id();
1720
0
            shared_state->related_op_ids.insert(op->operator_id());
1721
1722
0
            for (int i = 0; i < _num_instances; i++) {
1723
0
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1724
0
                                                             "BUCKETED_AGG_SINK_DEPENDENCY");
1725
0
                sink_dep->set_shared_state(shared_state.get());
1726
0
                shared_state->sink_deps.push_back(sink_dep);
1727
0
            }
1728
0
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1729
0
                                                     op->node_id(), "BUCKETED_AGG_SOURCE");
1730
0
            _op_id_to_shared_state.insert(
1731
0
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1732
0
        }
1733
0
        break;
1734
0
    }
1735
832
    case TPlanNodeType::HASH_JOIN_NODE: {
1736
832
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1737
832
                                       tnode.hash_join_node.is_broadcast_join;
1738
832
        const auto enable_spill = _runtime_state->enable_spill();
1739
832
        if (enable_spill && !is_broadcast_join) {
1740
0
            auto tnode_ = tnode;
1741
0
            tnode_.runtime_filters.clear();
1742
0
            auto inner_probe_operator =
1743
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1744
1745
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1746
            // So here use `tnode_` which has no runtime filters.
1747
0
            auto probe_side_inner_sink_operator =
1748
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1749
1750
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1751
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1752
1753
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1754
0
                    pool, tnode_, next_operator_id(), descs);
1755
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1756
0
                                                inner_probe_operator);
1757
0
            op = std::move(probe_operator);
1758
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1759
1760
0
            const auto downstream_pipeline_id = cur_pipe->id();
1761
0
            if (!_dag.contains(downstream_pipeline_id)) {
1762
0
                _dag.insert({downstream_pipeline_id, {}});
1763
0
            }
1764
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1765
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1766
1767
0
            auto inner_sink_operator =
1768
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1769
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1770
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1771
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1772
1773
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1774
0
            sink_ops.push_back(std::move(sink_operator));
1775
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1776
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1777
1778
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1779
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1780
832
        } else {
1781
832
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1782
832
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1783
1784
832
            const auto downstream_pipeline_id = cur_pipe->id();
1785
832
            if (!_dag.contains(downstream_pipeline_id)) {
1786
820
                _dag.insert({downstream_pipeline_id, {}});
1787
820
            }
1788
832
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1789
832
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1790
1791
832
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1792
832
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1793
832
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1794
832
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1795
1796
832
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1797
832
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1798
832
        }
1799
832
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1800
756
            std::shared_ptr<HashJoinSharedState> shared_state =
1801
756
                    HashJoinSharedState::create_shared(_num_instances);
1802
6.60k
            for (int i = 0; i < _num_instances; i++) {
1803
5.85k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1804
5.85k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1805
5.85k
                sink_dep->set_shared_state(shared_state.get());
1806
5.85k
                shared_state->sink_deps.push_back(sink_dep);
1807
5.85k
            }
1808
756
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1809
756
                                                     op->node_id(), "HASH_JOIN_PROBE");
1810
756
            _op_id_to_shared_state.insert(
1811
756
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1812
756
        }
1813
832
        break;
1814
832
    }
1815
4.20k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1816
4.20k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1817
4.20k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1818
1819
4.20k
        const auto downstream_pipeline_id = cur_pipe->id();
1820
4.20k
        if (!_dag.contains(downstream_pipeline_id)) {
1821
4.20k
            _dag.insert({downstream_pipeline_id, {}});
1822
4.20k
        }
1823
4.20k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1824
4.20k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1825
1826
4.20k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1827
4.20k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1828
4.20k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1829
4.20k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1830
4.20k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1831
4.20k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1832
4.20k
        break;
1833
4.20k
    }
1834
4.42k
    case TPlanNodeType::UNION_NODE: {
1835
4.42k
        int child_count = tnode.num_children;
1836
4.42k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1837
4.42k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1838
1839
4.42k
        const auto downstream_pipeline_id = cur_pipe->id();
1840
4.42k
        if (!_dag.contains(downstream_pipeline_id)) {
1841
4.39k
            _dag.insert({downstream_pipeline_id, {}});
1842
4.39k
        }
1843
4.83k
        for (int i = 0; i < child_count; i++) {
1844
408
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1845
408
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1846
408
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1847
408
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1848
408
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1849
408
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1850
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1851
408
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1852
408
        }
1853
4.42k
        break;
1854
4.42k
    }
1855
12.8k
    case TPlanNodeType::SORT_NODE: {
1856
12.8k
        const auto should_spill = _runtime_state->enable_spill() &&
1857
12.8k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1858
12.8k
        const bool use_local_merge =
1859
12.8k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1860
12.8k
        if (should_spill) {
1861
2
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1862
12.8k
        } else if (use_local_merge) {
1863
12.3k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1864
12.3k
                                                                 descs);
1865
12.3k
        } else {
1866
474
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1867
474
        }
1868
12.8k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1869
1870
12.8k
        const auto downstream_pipeline_id = cur_pipe->id();
1871
12.8k
        if (!_dag.contains(downstream_pipeline_id)) {
1872
12.8k
            _dag.insert({downstream_pipeline_id, {}});
1873
12.8k
        }
1874
12.8k
        cur_pipe = add_pipeline(cur_pipe);
1875
12.8k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1876
1877
12.8k
        if (should_spill) {
1878
2
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1879
2
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1880
12.8k
        } else {
1881
12.8k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1882
12.8k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1883
12.8k
        }
1884
12.8k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1885
12.8k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1886
12.8k
        break;
1887
12.8k
    }
1888
12.8k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1889
0
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1890
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1891
1892
0
        const auto downstream_pipeline_id = cur_pipe->id();
1893
0
        if (!_dag.contains(downstream_pipeline_id)) {
1894
0
            _dag.insert({downstream_pipeline_id, {}});
1895
0
        }
1896
0
        cur_pipe = add_pipeline(cur_pipe);
1897
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1898
1899
0
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1900
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1901
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1902
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1903
0
        break;
1904
0
    }
1905
44
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1906
44
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1907
44
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1908
1909
44
        const auto downstream_pipeline_id = cur_pipe->id();
1910
44
        if (!_dag.contains(downstream_pipeline_id)) {
1911
44
            _dag.insert({downstream_pipeline_id, {}});
1912
44
        }
1913
44
        cur_pipe = add_pipeline(cur_pipe);
1914
44
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1915
1916
44
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1917
44
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1918
44
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1919
44
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1920
44
        break;
1921
44
    }
1922
934
    case TPlanNodeType::MATERIALIZATION_NODE: {
1923
934
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1924
934
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1925
934
        break;
1926
934
    }
1927
934
    case TPlanNodeType::INTERSECT_NODE: {
1928
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1929
0
                                                                      cur_pipe, sink_ops));
1930
0
        break;
1931
0
    }
1932
0
    case TPlanNodeType::EXCEPT_NODE: {
1933
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1934
0
                                                                       cur_pipe, sink_ops));
1935
0
        break;
1936
0
    }
1937
0
    case TPlanNodeType::REPEAT_NODE: {
1938
0
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1939
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1940
0
        break;
1941
0
    }
1942
4
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1943
4
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1944
4
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1945
4
        break;
1946
4
    }
1947
200
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1948
200
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1949
200
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1950
200
        break;
1951
200
    }
1952
308
    case TPlanNodeType::EMPTY_SET_NODE: {
1953
308
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1954
308
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1955
308
        break;
1956
308
    }
1957
308
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1958
198
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1959
198
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1960
198
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1961
198
        break;
1962
198
    }
1963
536
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1964
536
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1965
536
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1966
536
        break;
1967
536
    }
1968
1.82k
    case TPlanNodeType::META_SCAN_NODE: {
1969
1.82k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1970
1.82k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1971
1.82k
        break;
1972
1.82k
    }
1973
2.05k
    case TPlanNodeType::SELECT_NODE: {
1974
2.05k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1975
2.05k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1976
2.05k
        break;
1977
2.05k
    }
1978
2.05k
    case TPlanNodeType::REC_CTE_NODE: {
1979
0
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1980
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1981
1982
0
        const auto downstream_pipeline_id = cur_pipe->id();
1983
0
        if (!_dag.contains(downstream_pipeline_id)) {
1984
0
            _dag.insert({downstream_pipeline_id, {}});
1985
0
        }
1986
1987
0
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1988
0
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1989
1990
0
        DataSinkOperatorPtr anchor_sink;
1991
0
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1992
0
                                                                  op->operator_id(), tnode, descs);
1993
0
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1994
0
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1995
0
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1996
1997
0
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1998
0
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1999
2000
0
        DataSinkOperatorPtr rec_sink;
2001
0
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
2002
0
                                                         tnode, descs);
2003
0
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
2004
0
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
2005
0
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
2006
2007
0
        break;
2008
0
    }
2009
0
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
2010
0
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
2011
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2012
0
        break;
2013
0
    }
2014
18.7k
    case TPlanNodeType::LOCAL_EXCHANGE_NODE: {
2015
18.7k
        op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs);
2016
        // The downstream pipeline (containing LocalExchangeSource) must have
2017
        // _num_instances tasks — matching BE-native _inherit_pipeline_properties
2018
        // which sets pipe_with_source.set_num_tasks(_num_instances).
2019
        // Without this, when the parent pipeline was reduced by a serial operator
2020
        // (e.g., serial Exchange with use_serial_exchange=true, or UNPARTITIONED
2021
        // Exchange), the downstream inherits the reduced num_tasks via
2022
        // add_pipeline(parent).  The deferred exchanger creates _num_instances
2023
        // channels but only fewer source tasks initialize mem_counters — the
2024
        // sink round-robins to all channels and crashes on uninitialized ones.
2025
18.7k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2026
        // Restore downstream pipeline's num_tasks (mirroring _inherit_pipeline_properties:
2027
        // downstream keeps _num_instances, upstream gets the serial/reduced count)
2028
18.7k
        cur_pipe->set_num_tasks(_num_instances);
2029
2030
18.7k
        const auto downstream_pipeline_id = cur_pipe->id();
2031
18.7k
        if (!_dag.contains(downstream_pipeline_id)) {
2032
18.0k
            _dag.insert({downstream_pipeline_id, {}});
2033
18.0k
        }
2034
18.7k
        cur_pipe = add_pipeline(cur_pipe);
2035
        // If this local exchange was inserted because of a serial scan (is_serial_operator),
2036
        // the upstream pipeline (cur_pipe) should have num_tasks=1 (only 1 scan task).
2037
        // We set this now so the exchanger is created with the correct sender count.
2038
        // Child operators added later (serial scan) will also set num_tasks=1, which is
2039
        // consistent with this.
2040
18.7k
        if (op->is_serial_operator() && _parallel_instances > 0) {
2041
0
            cur_pipe->set_num_tasks(_parallel_instances);
2042
0
        }
2043
18.7k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
2044
18.7k
        int num_partitions = 0;
2045
18.7k
        std::map<int, int> shuffle_id_to_instance_idx;
2046
18.7k
        auto partition_type = tnode.local_exchange_node.partition_type;
2047
18.7k
        switch (partition_type) {
2048
8
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
2049
8
            num_partitions = _params.num_buckets;
2050
8
            shuffle_id_to_instance_idx = _params.bucket_seq_to_instance_idx;
2051
8
            break;
2052
680
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
2053
6.12k
            for (int i = 0; i < _num_instances; i++) {
2054
5.44k
                shuffle_id_to_instance_idx[i] = i;
2055
5.44k
            }
2056
680
            num_partitions = _num_instances;
2057
680
            break;
2058
0
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
2059
0
            num_partitions = _total_instances;
2060
0
            shuffle_id_to_instance_idx = _params.shuffle_idx_to_instance_idx;
2061
0
            break;
2062
18.0k
        default:
2063
18.0k
            break;
2064
18.7k
        }
2065
18.7k
        auto local_exchange_id = op->operator_id();
2066
18.7k
        auto sink_id = next_sink_operator_id();
2067
18.7k
        DataSinkOperatorPtr sink = std::make_shared<LocalExchangeSinkOperatorX>(
2068
18.7k
                sink_id, local_exchange_id, tnode, num_partitions, shuffle_id_to_instance_idx);
2069
18.7k
        sink_ops.push_back(sink);
2070
18.7k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
2071
18.7k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
2072
2073
        // For FE-planned local exchange, we need to:
2074
        // 1. Initialize the partitioner for hash shuffle types
2075
        // 2. Defer exchanger creation until after the full plan tree is built
2076
        //    (child operators like serial ExchangeNode may change cur_pipe->num_tasks())
2077
        // 3. Register shared state so pipeline tasks can find it
2078
18.7k
        RETURN_IF_ERROR(static_cast<LocalExchangeSinkOperatorX*>(cur_pipe->sink())
2079
18.7k
                                ->init_partitioner(_runtime_state.get()));
2080
2081
18.7k
        int free_blocks_limit =
2082
18.7k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
2083
18.7k
                        ? cast_set<int>(
2084
18.7k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
2085
18.7k
                        : 0;
2086
18.7k
        auto shared_state = LocalExchangeSharedState::create_shared(_num_instances);
2087
18.7k
        shared_state->create_source_dependencies(_num_instances, local_exchange_id,
2088
18.7k
                                                 local_exchange_id, "LOCAL_EXCHANGE_OPERATOR");
2089
18.7k
        shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
2090
18.7k
        _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
2091
        // Defer exchanger creation: sender count depends on final upstream num_tasks
2092
18.7k
        _deferred_exchangers.push_back({shared_state, cur_pipe, partition_type, num_partitions,
2093
18.7k
                                        free_blocks_limit, local_exchange_id, sink_id});
2094
18.7k
        break;
2095
18.7k
    }
2096
0
    default:
2097
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
2098
0
                                     print_plan_node_type(tnode.node_type));
2099
194k
    }
2100
194k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
2101
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
2102
0
        op->set_serial_operator();
2103
0
    }
2104
2105
194k
    return Status::OK();
2106
194k
}
2107
// NOLINTEND(readability-function-cognitive-complexity)
2108
// NOLINTEND(readability-function-size)
2109
2110
template <bool is_intersect>
2111
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
2112
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
2113
0
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2114
0
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2115
0
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2116
2117
0
    const auto downstream_pipeline_id = cur_pipe->id();
2118
0
    if (!_dag.contains(downstream_pipeline_id)) {
2119
0
        _dag.insert({downstream_pipeline_id, {}});
2120
0
    }
2121
2122
0
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2123
0
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2124
0
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2125
2126
0
        if (child_id == 0) {
2127
0
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2128
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2129
0
        } else {
2130
0
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2131
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2132
0
        }
2133
0
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2134
0
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2135
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2136
0
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2137
0
    }
2138
2139
0
    return Status::OK();
2140
0
}
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
2141
2142
123k
Status PipelineFragmentContext::submit() {
2143
123k
    if (_submitted) {
2144
0
        return Status::InternalError("submitted");
2145
0
    }
2146
123k
    _submitted = true;
2147
2148
123k
    int submit_tasks = 0;
2149
123k
    Status st;
2150
123k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
2151
272k
    for (auto& task : _tasks) {
2152
484k
        for (auto& t : task) {
2153
484k
            st = scheduler->submit(t.first);
2154
484k
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
2155
484k
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
2156
484k
            if (!st) {
2157
0
                cancel(Status::InternalError("submit context to executor fail"));
2158
0
                std::lock_guard<std::mutex> l(_task_mutex);
2159
0
                _total_tasks = submit_tasks;
2160
0
                break;
2161
0
            }
2162
484k
            submit_tasks++;
2163
484k
        }
2164
272k
    }
2165
123k
    if (!st.ok()) {
2166
0
        bool need_remove = false;
2167
0
        {
2168
0
            std::lock_guard<std::mutex> l(_task_mutex);
2169
0
            if (_closed_tasks >= _total_tasks) {
2170
0
                need_remove = _close_fragment_instance();
2171
0
            }
2172
0
        }
2173
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2174
0
        if (need_remove) {
2175
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2176
0
        }
2177
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
2178
0
                                     BackendOptions::get_localhost());
2179
123k
    } else {
2180
123k
        return st;
2181
123k
    }
2182
123k
}
2183
2184
0
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
2185
0
    if (_runtime_state->enable_profile()) {
2186
0
        std::stringstream ss;
2187
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2188
0
            runtime_profile_ptr->pretty_print(&ss);
2189
0
        }
2190
2191
0
        if (_runtime_state->load_channel_profile()) {
2192
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2193
0
        }
2194
2195
0
        auto profile_str =
2196
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
2197
0
                            this->_fragment_id, extra_info, ss.str());
2198
0
        LOG_LONG_STRING(INFO, profile_str);
2199
0
    }
2200
0
}
2201
// If all pipeline tasks binded to the fragment instance are finished, then we could
2202
// close the fragment instance.
2203
// Returns true if the caller should call remove_pipeline_context() **after** releasing
2204
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
2205
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
2206
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
2207
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
2208
// (via debug_string()).
2209
123k
bool PipelineFragmentContext::_close_fragment_instance() {
2210
123k
    if (_is_fragment_instance_closed) {
2211
0
        return false;
2212
0
    }
2213
123k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
2214
123k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
2215
123k
    if (!_need_notify_close) {
2216
123k
        auto st = send_report(true);
2217
123k
        if (!st) {
2218
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
2219
0
                                        print_id(_query_id), _fragment_id, st.to_string());
2220
0
        }
2221
123k
    }
2222
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
2223
    // Since stream load does not have someting like coordinator on FE, so
2224
    // backend can not report profile to FE, ant its profile can not be shown
2225
    // in the same way with other query. So we print the profile content to info log.
2226
2227
123k
    if (_runtime_state->enable_profile() &&
2228
123k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
2229
456
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
2230
456
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
2231
0
        std::stringstream ss;
2232
        // Compute the _local_time_percent before pretty_print the runtime_profile
2233
        // Before add this operation, the print out like that:
2234
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
2235
        // After add the operation, the print out like that:
2236
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
2237
        // We can easily know the exec node execute time without child time consumed.
2238
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2239
0
            runtime_profile_ptr->pretty_print(&ss);
2240
0
        }
2241
2242
0
        if (_runtime_state->load_channel_profile()) {
2243
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2244
0
        }
2245
2246
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
2247
0
    }
2248
2249
123k
    if (_query_ctx->enable_profile()) {
2250
456
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
2251
456
                                         collect_realtime_load_channel_profile());
2252
456
    }
2253
2254
    // Return whether the caller needs to remove from the pipeline map.
2255
    // The caller must do this after releasing _task_mutex.
2256
123k
    return !_need_notify_close;
2257
123k
}
2258
2259
483k
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
2260
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
2261
483k
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
2262
483k
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
2263
192k
        if (_dag.contains(pipeline_id)) {
2264
69.9k
            for (auto dep : _dag[pipeline_id]) {
2265
69.3k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
2266
69.3k
            }
2267
69.9k
        }
2268
192k
    }
2269
483k
    bool need_remove = false;
2270
483k
    {
2271
483k
        std::lock_guard<std::mutex> l(_task_mutex);
2272
483k
        ++_closed_tasks;
2273
        // Update query-level finished task progress in real time.
2274
483k
        _query_ctx->inc_finished_task_num();
2275
483k
        if (_closed_tasks >= _total_tasks) {
2276
123k
            need_remove = _close_fragment_instance();
2277
123k
        }
2278
483k
    }
2279
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2280
483k
    if (need_remove) {
2281
123k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2282
123k
    }
2283
483k
}
2284
2285
14.4k
std::string PipelineFragmentContext::get_load_error_url() {
2286
14.4k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
2287
0
        return to_load_error_http_path(str);
2288
0
    }
2289
50.2k
    for (auto& tasks : _tasks) {
2290
62.7k
        for (auto& task : tasks) {
2291
62.7k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
2292
12
                return to_load_error_http_path(str);
2293
12
            }
2294
62.7k
        }
2295
50.2k
    }
2296
14.4k
    return "";
2297
14.4k
}
2298
2299
14.4k
std::string PipelineFragmentContext::get_first_error_msg() {
2300
14.4k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
2301
0
        return str;
2302
0
    }
2303
50.2k
    for (auto& tasks : _tasks) {
2304
62.7k
        for (auto& task : tasks) {
2305
62.7k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
2306
12
                return str;
2307
12
            }
2308
62.7k
        }
2309
50.2k
    }
2310
14.4k
    return "";
2311
14.4k
}
2312
2313
0
std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const {
2314
0
    std::stringstream url;
2315
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
2316
0
        << "/api/_download_load?"
2317
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
2318
0
    return url.str();
2319
0
}
2320
2321
13.5k
void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) {
2322
13.5k
    DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
2323
13.5k
        int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
2324
13.5k
        LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
2325
13.5k
        std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
2326
13.5k
        LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
2327
13.5k
    });
2328
2329
13.5k
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
2330
13.5k
    if (req.coord_addr.hostname == "external") {
2331
        // External query (flink/spark read tablets) not need to report to FE.
2332
0
        return;
2333
0
    }
2334
13.5k
    int callback_retries = 10;
2335
13.5k
    const int sleep_ms = 1000;
2336
13.5k
    Status exec_status = req.status;
2337
13.5k
    Status coord_status;
2338
13.5k
    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
2339
13.5k
    do {
2340
13.5k
        coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
2341
13.5k
                                                            req.coord_addr, &coord_status);
2342
13.5k
        if (!coord_status.ok()) {
2343
0
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
2344
0
        }
2345
13.5k
    } while (!coord_status.ok() && callback_retries-- > 0);
2346
2347
13.5k
    if (!coord_status.ok()) {
2348
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
2349
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
2350
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
2351
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
2352
0
        return;
2353
0
    }
2354
2355
13.5k
    TReportExecStatusParams params;
2356
13.5k
    params.protocol_version = FrontendServiceVersion::V1;
2357
13.5k
    params.__set_query_id(req.query_id);
2358
13.5k
    params.__set_backend_num(req.backend_num);
2359
13.5k
    params.__set_fragment_instance_id(req.fragment_instance_id);
2360
13.5k
    params.__set_fragment_id(req.fragment_id);
2361
13.5k
    params.__set_status(exec_status.to_thrift());
2362
13.5k
    params.__set_done(req.done);
2363
13.5k
    params.__set_query_type(req.runtime_state->query_type());
2364
13.5k
    params.__isset.profile = false;
2365
2366
13.5k
    DCHECK(req.runtime_state != nullptr);
2367
2368
13.5k
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
2369
12.5k
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2370
12.5k
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2371
12.5k
    } else {
2372
1.01k
        DCHECK(!req.runtime_states.empty());
2373
1.01k
        if (!req.runtime_state->output_files().empty()) {
2374
0
            params.__isset.delta_urls = true;
2375
0
            for (auto& it : req.runtime_state->output_files()) {
2376
0
                params.delta_urls.push_back(_to_http_path(it));
2377
0
            }
2378
0
        }
2379
1.01k
        if (!params.delta_urls.empty()) {
2380
0
            params.__isset.delta_urls = true;
2381
0
        }
2382
1.01k
    }
2383
2384
13.5k
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
2385
13.5k
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
2386
13.5k
    static std::string s_unselected_rows = "unselected.rows";
2387
13.5k
    int64_t num_rows_load_success = 0;
2388
13.5k
    int64_t num_rows_load_filtered = 0;
2389
13.5k
    int64_t num_rows_load_unselected = 0;
2390
13.5k
    if (req.runtime_state->num_rows_load_total() > 0 ||
2391
13.5k
        req.runtime_state->num_rows_load_filtered() > 0 ||
2392
13.5k
        req.runtime_state->num_finished_range() > 0) {
2393
0
        params.__isset.load_counters = true;
2394
2395
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
2396
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
2397
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
2398
0
        params.__isset.fragment_instance_reports = true;
2399
0
        TFragmentInstanceReport t;
2400
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
2401
0
        t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
2402
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2403
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2404
0
        params.fragment_instance_reports.push_back(t);
2405
13.5k
    } else if (!req.runtime_states.empty()) {
2406
54.1k
        for (auto* rs : req.runtime_states) {
2407
54.1k
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
2408
54.1k
                rs->num_finished_range() > 0) {
2409
7.76k
                params.__isset.load_counters = true;
2410
7.76k
                num_rows_load_success += rs->num_rows_load_success();
2411
7.76k
                num_rows_load_filtered += rs->num_rows_load_filtered();
2412
7.76k
                num_rows_load_unselected += rs->num_rows_load_unselected();
2413
7.76k
                params.__isset.fragment_instance_reports = true;
2414
7.76k
                TFragmentInstanceReport t;
2415
7.76k
                t.__set_fragment_instance_id(rs->fragment_instance_id());
2416
7.76k
                t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
2417
7.76k
                t.__set_loaded_rows(rs->num_rows_load_total());
2418
7.76k
                t.__set_loaded_bytes(rs->num_bytes_load_total());
2419
7.76k
                params.fragment_instance_reports.push_back(t);
2420
7.76k
            }
2421
54.1k
        }
2422
13.5k
    }
2423
13.5k
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
2424
13.5k
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
2425
13.5k
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
2426
2427
13.5k
    if (!req.load_error_url.empty()) {
2428
12
        params.__set_tracking_url(req.load_error_url);
2429
12
    }
2430
13.5k
    if (!req.first_error_msg.empty()) {
2431
12
        params.__set_first_error_msg(req.first_error_msg);
2432
12
    }
2433
54.1k
    for (auto* rs : req.runtime_states) {
2434
54.1k
        if (rs->wal_id() > 0) {
2435
0
            params.__set_txn_id(rs->wal_id());
2436
0
            params.__set_label(rs->import_label());
2437
0
        }
2438
54.1k
    }
2439
13.5k
    if (!req.runtime_state->export_output_files().empty()) {
2440
0
        params.__isset.export_files = true;
2441
0
        params.export_files = req.runtime_state->export_output_files();
2442
13.5k
    } else if (!req.runtime_states.empty()) {
2443
54.1k
        for (auto* rs : req.runtime_states) {
2444
54.1k
            if (!rs->export_output_files().empty()) {
2445
0
                params.__isset.export_files = true;
2446
0
                params.export_files.insert(params.export_files.end(),
2447
0
                                           rs->export_output_files().begin(),
2448
0
                                           rs->export_output_files().end());
2449
0
            }
2450
54.1k
        }
2451
13.5k
    }
2452
13.5k
    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
2453
0
        params.__isset.commitInfos = true;
2454
0
        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
2455
13.5k
    } else if (!req.runtime_states.empty()) {
2456
54.1k
        for (auto* rs : req.runtime_states) {
2457
54.1k
            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
2458
3.03k
                params.__isset.commitInfos = true;
2459
3.03k
                params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
2460
3.03k
            }
2461
54.1k
        }
2462
13.5k
    }
2463
13.5k
    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
2464
0
        params.__isset.errorTabletInfos = true;
2465
0
        params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
2466
13.5k
    } else if (!req.runtime_states.empty()) {
2467
54.1k
        for (auto* rs : req.runtime_states) {
2468
54.1k
            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
2469
0
                params.__isset.errorTabletInfos = true;
2470
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
2471
0
                                               rs_eti.end());
2472
0
            }
2473
54.1k
        }
2474
13.5k
    }
2475
13.5k
    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
2476
0
        params.__isset.hive_partition_updates = true;
2477
0
        params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
2478
0
                                             hpu.end());
2479
13.5k
    } else if (!req.runtime_states.empty()) {
2480
54.1k
        for (auto* rs : req.runtime_states) {
2481
54.1k
            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
2482
2.15k
                params.__isset.hive_partition_updates = true;
2483
2.15k
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
2484
2.15k
                                                     rs_hpu.begin(), rs_hpu.end());
2485
2.15k
            }
2486
54.1k
        }
2487
13.5k
    }
2488
13.5k
    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
2489
0
        params.__isset.iceberg_commit_datas = true;
2490
0
        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
2491
0
                                           icd.end());
2492
13.5k
    } else if (!req.runtime_states.empty()) {
2493
54.1k
        for (auto* rs : req.runtime_states) {
2494
54.1k
            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
2495
2.08k
                params.__isset.iceberg_commit_datas = true;
2496
2.08k
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
2497
2.08k
                                                   rs_icd.begin(), rs_icd.end());
2498
2.08k
            }
2499
54.1k
        }
2500
13.5k
    }
2501
2502
13.5k
    if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
2503
0
        params.__isset.mc_commit_datas = true;
2504
0
        params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
2505
13.5k
    } else if (!req.runtime_states.empty()) {
2506
54.1k
        for (auto* rs : req.runtime_states) {
2507
54.1k
            if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
2508
0
                params.__isset.mc_commit_datas = true;
2509
0
                params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
2510
0
                                              rs_mcd.end());
2511
0
            }
2512
54.1k
        }
2513
13.5k
    }
2514
2515
13.5k
    req.runtime_state->get_unreported_errors(&(params.error_log));
2516
13.5k
    params.__isset.error_log = (!params.error_log.empty());
2517
2518
13.5k
    if (_exec_env->cluster_info()->backend_id != 0) {
2519
13.5k
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
2520
13.5k
    }
2521
2522
13.5k
    TReportExecStatusResult res;
2523
13.5k
    Status rpc_status;
2524
2525
13.5k
    VLOG_DEBUG << "reportExecStatus params is "
2526
0
               << apache::thrift::ThriftDebugString(params).c_str();
2527
13.5k
    if (!exec_status.ok()) {
2528
149
        LOG(WARNING) << "report error status: " << exec_status.msg()
2529
149
                     << " to coordinator: " << req.coord_addr
2530
149
                     << ", query id: " << print_id(req.query_id);
2531
149
    }
2532
13.5k
    try {
2533
13.5k
        try {
2534
13.5k
            (*coord)->reportExecStatus(res, params);
2535
13.5k
        } catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) {
2536
#ifndef ADDRESS_SANITIZER
2537
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
2538
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
2539
                         << req.coord_addr << ", err: " << e.what();
2540
#endif
2541
0
            rpc_status = coord->reopen();
2542
2543
0
            if (!rpc_status.ok()) {
2544
0
                req.cancel_fn(rpc_status);
2545
0
                return;
2546
0
            }
2547
0
            (*coord)->reportExecStatus(res, params);
2548
0
        }
2549
2550
13.5k
        rpc_status = Status::create<false>(res.status);
2551
13.5k
    } catch (apache::thrift::TException& e) {
2552
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
2553
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
2554
0
    }
2555
2556
13.5k
    if (!rpc_status.ok()) {
2557
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
2558
0
                 print_id(req.query_id), rpc_status.to_string());
2559
0
        req.cancel_fn(rpc_status);
2560
0
    }
2561
13.5k
}
2562
2563
125k
Status PipelineFragmentContext::send_report(bool done) {
2564
125k
    Status exec_status = _query_ctx->exec_status();
2565
    // If plan is done successfully, but _is_report_success is false,
2566
    // no need to send report.
2567
    // Load will set _is_report_success to true because load wants to know
2568
    // the process.
2569
125k
    if (!_is_report_success && done && exec_status.ok()) {
2570
111k
        return Status::OK();
2571
111k
    }
2572
2573
    // If both _is_report_success and _is_report_on_cancel are false,
2574
    // which means no matter query is success or failed, no report is needed.
2575
    // This may happen when the query limit reached and
2576
    // a internal cancellation being processed
2577
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
2578
    // be set to false, to avoid sending fault report to FE.
2579
13.7k
    if (!_is_report_success && !_is_report_on_cancel) {
2580
178
        if (done) {
2581
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
2582
178
            return Status::OK();
2583
178
        }
2584
0
        return Status::NeedSendAgain("");
2585
178
    }
2586
2587
13.5k
    std::vector<RuntimeState*> runtime_states;
2588
2589
46.7k
    for (auto& tasks : _tasks) {
2590
54.1k
        for (auto& task : tasks) {
2591
54.1k
            runtime_states.push_back(task.second.get());
2592
54.1k
        }
2593
46.7k
    }
2594
2595
13.5k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
2596
13.5k
                                        ? get_load_error_url()
2597
18.4E
                                        : _query_ctx->get_load_error_url();
2598
13.5k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2599
13.5k
                                          ? get_first_error_msg()
2600
18.4E
                                          : _query_ctx->get_first_error_msg();
2601
2602
13.5k
    ReportStatusRequest req {.status = exec_status,
2603
13.5k
                             .runtime_states = runtime_states,
2604
13.5k
                             .done = done || !exec_status.ok(),
2605
13.5k
                             .coord_addr = _query_ctx->coord_addr,
2606
13.5k
                             .query_id = _query_id,
2607
13.5k
                             .fragment_id = _fragment_id,
2608
13.5k
                             .fragment_instance_id = TUniqueId(),
2609
13.5k
                             .backend_num = -1,
2610
13.5k
                             .runtime_state = _runtime_state.get(),
2611
13.5k
                             .load_error_url = load_eror_url,
2612
13.5k
                             .first_error_msg = first_error_msg,
2613
13.5k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2614
13.5k
    auto ctx = std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this());
2615
13.5k
    return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() {
2616
13.5k
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
2617
13.5k
        _coordinator_callback(req);
2618
13.5k
        if (!req.done) {
2619
1.69k
            ctx->refresh_next_report_time();
2620
1.69k
        }
2621
13.5k
    });
2622
13.7k
}
2623
2624
4
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2625
4
    size_t res = 0;
2626
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2627
    // here to traverse the vector.
2628
4
    for (const auto& task_instances : _tasks) {
2629
6
        for (const auto& task : task_instances) {
2630
6
            if (task.first->is_running()) {
2631
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2632
0
                                      << " is running, task: " << (void*)task.first.get()
2633
0
                                      << ", is_running: " << task.first->is_running();
2634
0
                *has_running_task = true;
2635
0
                return 0;
2636
0
            }
2637
2638
6
            size_t revocable_size = task.first->get_revocable_size();
2639
6
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2640
2
                res += revocable_size;
2641
2
            }
2642
6
        }
2643
4
    }
2644
4
    return res;
2645
4
}
2646
2647
8
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2648
8
    std::vector<PipelineTask*> revocable_tasks;
2649
8
    for (const auto& task_instances : _tasks) {
2650
12
        for (const auto& task : task_instances) {
2651
12
            size_t revocable_size_ = task.first->get_revocable_size();
2652
2653
12
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2654
4
                revocable_tasks.emplace_back(task.first.get());
2655
4
            }
2656
12
        }
2657
8
    }
2658
8
    return revocable_tasks;
2659
8
}
2660
2661
2
std::string PipelineFragmentContext::debug_string() {
2662
2
    std::lock_guard<std::mutex> l(_task_mutex);
2663
2
    fmt::memory_buffer debug_string_buffer;
2664
2
    fmt::format_to(debug_string_buffer,
2665
2
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2666
2
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2667
2
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2668
18
    for (size_t j = 0; j < _tasks.size(); j++) {
2669
16
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2670
34
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2671
18
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2672
18
                           _tasks[j][i].first->debug_string());
2673
18
        }
2674
16
    }
2675
2676
2
    return fmt::to_string(debug_string_buffer);
2677
2
}
2678
2679
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2680
456
PipelineFragmentContext::collect_realtime_profile() const {
2681
456
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2682
2683
    // we do not have mutex to protect pipeline_id_to_profile
2684
    // so we need to make sure this funciton is invoked after fragment context
2685
    // has already been prepared.
2686
456
    if (!_prepared) {
2687
0
        std::string msg =
2688
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2689
0
        DCHECK(false) << msg;
2690
0
        LOG_ERROR(msg);
2691
0
        return res;
2692
0
    }
2693
2694
    // Make sure first profile is fragment level profile
2695
456
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2696
456
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2697
456
    res.push_back(fragment_profile);
2698
2699
    // pipeline_id_to_profile is initialized in prepare stage
2700
664
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2701
664
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2702
664
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2703
664
        res.push_back(profile_ptr);
2704
664
    }
2705
2706
456
    return res;
2707
456
}
2708
2709
std::shared_ptr<TRuntimeProfileTree>
2710
456
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2711
    // we do not have mutex to protect pipeline_id_to_profile
2712
    // so we need to make sure this funciton is invoked after fragment context
2713
    // has already been prepared.
2714
456
    if (!_prepared) {
2715
0
        std::string msg =
2716
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2717
0
        DCHECK(false) << msg;
2718
0
        LOG_ERROR(msg);
2719
0
        return nullptr;
2720
0
    }
2721
2722
624
    for (const auto& tasks : _tasks) {
2723
1.08k
        for (const auto& task : tasks) {
2724
1.08k
            if (task.second->load_channel_profile() == nullptr) {
2725
0
                continue;
2726
0
            }
2727
2728
1.08k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2729
2730
1.08k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2731
1.08k
                                                           _runtime_state->profile_level());
2732
1.08k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2733
1.08k
        }
2734
624
    }
2735
2736
456
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2737
456
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2738
456
                                                      _runtime_state->profile_level());
2739
456
    return load_channel_profile;
2740
456
}
2741
2742
// Collect runtime filter IDs registered by all tasks in this PFC.
2743
// Used during recursive CTE stage transitions to know which filters to deregister
2744
// before creating the new PFC for the next recursion round.
2745
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2746
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2747
// the vector sizes do not change, and individual RuntimeState filter sets are
2748
// written only during open() which has completed by the time we reach rerun.
2749
0
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2750
0
    std::set<int> result;
2751
0
    for (const auto& _task : _tasks) {
2752
0
        for (const auto& task : _task) {
2753
0
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2754
0
            result.merge(set);
2755
0
        }
2756
0
    }
2757
0
    if (_runtime_state) {
2758
0
        auto set = _runtime_state->get_deregister_runtime_filter();
2759
0
        result.merge(set);
2760
0
    }
2761
0
    return result;
2762
0
}
2763
2764
123k
void PipelineFragmentContext::_release_resource() {
2765
123k
    std::lock_guard<std::mutex> l(_task_mutex);
2766
    // The memory released by the query end is recorded in the query mem tracker.
2767
123k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2768
123k
    auto st = _query_ctx->exec_status();
2769
272k
    for (auto& _task : _tasks) {
2770
272k
        if (!_task.empty()) {
2771
272k
            _call_back(_task.front().first->runtime_state(), &st);
2772
272k
        }
2773
272k
    }
2774
123k
    _tasks.clear();
2775
123k
    _dag.clear();
2776
123k
    _pip_id_to_pipeline.clear();
2777
123k
    _pipelines.clear();
2778
123k
    _sink.reset();
2779
123k
    _root_op.reset();
2780
123k
    _runtime_filter_mgr_map.clear();
2781
123k
    _op_id_to_shared_state.clear();
2782
123k
}
2783
2784
} // namespace doris