Coverage Report

Created: 2026-06-29 14:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/FrontendService.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/PlanNodes_types.h>
25
#include <pthread.h>
26
27
#include <algorithm>
28
#include <cstdlib>
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <fmt/format.h>
31
#include <thrift/Thrift.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
#include <thrift/transport/TTransportException.h>
34
35
#include <chrono> // IWYU pragma: keep
36
#include <map>
37
#include <memory>
38
#include <ostream>
39
#include <utility>
40
41
#include "cloud/config.h"
42
#include "common/cast_set.h"
43
#include "common/config.h"
44
#include "common/exception.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "exec/exchange/local_exchange_sink_operator.h"
48
#include "exec/exchange/local_exchange_source_operator.h"
49
#include "exec/exchange/local_exchanger.h"
50
#include "exec/exchange/vdata_stream_mgr.h"
51
#include "exec/operator/aggregation_sink_operator.h"
52
#include "exec/operator/aggregation_source_operator.h"
53
#include "exec/operator/analytic_sink_operator.h"
54
#include "exec/operator/analytic_source_operator.h"
55
#include "exec/operator/assert_num_rows_operator.h"
56
#include "exec/operator/blackhole_sink_operator.h"
57
#include "exec/operator/bucketed_aggregation_sink_operator.h"
58
#include "exec/operator/bucketed_aggregation_source_operator.h"
59
#include "exec/operator/cache_sink_operator.h"
60
#include "exec/operator/cache_source_operator.h"
61
#include "exec/operator/datagen_operator.h"
62
#include "exec/operator/dict_sink_operator.h"
63
#include "exec/operator/distinct_streaming_aggregation_operator.h"
64
#include "exec/operator/empty_set_operator.h"
65
#include "exec/operator/exchange_sink_operator.h"
66
#include "exec/operator/exchange_source_operator.h"
67
#include "exec/operator/file_scan_operator.h"
68
#include "exec/operator/group_commit_block_sink_operator.h"
69
#include "exec/operator/group_commit_scan_operator.h"
70
#include "exec/operator/hashjoin_build_sink.h"
71
#include "exec/operator/hashjoin_probe_operator.h"
72
#include "exec/operator/hive_table_sink_operator.h"
73
#include "exec/operator/iceberg_delete_sink_operator.h"
74
#include "exec/operator/iceberg_merge_sink_operator.h"
75
#include "exec/operator/iceberg_table_sink_operator.h"
76
#include "exec/operator/jdbc_scan_operator.h"
77
#include "exec/operator/jdbc_table_sink_operator.h"
78
#include "exec/operator/local_merge_sort_source_operator.h"
79
#include "exec/operator/materialization_opertor.h"
80
#include "exec/operator/maxcompute_table_sink_operator.h"
81
#include "exec/operator/memory_scratch_sink_operator.h"
82
#include "exec/operator/meta_scan_operator.h"
83
#include "exec/operator/multi_cast_data_stream_sink.h"
84
#include "exec/operator/multi_cast_data_stream_source.h"
85
#include "exec/operator/nested_loop_join_build_operator.h"
86
#include "exec/operator/nested_loop_join_probe_operator.h"
87
#include "exec/operator/olap_scan_operator.h"
88
#include "exec/operator/olap_table_sink_operator.h"
89
#include "exec/operator/olap_table_sink_v2_operator.h"
90
#include "exec/operator/partition_sort_sink_operator.h"
91
#include "exec/operator/partition_sort_source_operator.h"
92
#include "exec/operator/partitioned_aggregation_sink_operator.h"
93
#include "exec/operator/partitioned_aggregation_source_operator.h"
94
#include "exec/operator/partitioned_hash_join_probe_operator.h"
95
#include "exec/operator/partitioned_hash_join_sink_operator.h"
96
#include "exec/operator/rec_cte_anchor_sink_operator.h"
97
#include "exec/operator/rec_cte_scan_operator.h"
98
#include "exec/operator/rec_cte_sink_operator.h"
99
#include "exec/operator/rec_cte_source_operator.h"
100
#include "exec/operator/repeat_operator.h"
101
#include "exec/operator/result_file_sink_operator.h"
102
#include "exec/operator/result_sink_operator.h"
103
#include "exec/operator/schema_scan_operator.h"
104
#include "exec/operator/select_operator.h"
105
#include "exec/operator/set_probe_sink_operator.h"
106
#include "exec/operator/set_sink_operator.h"
107
#include "exec/operator/set_source_operator.h"
108
#include "exec/operator/sort_sink_operator.h"
109
#include "exec/operator/sort_source_operator.h"
110
#include "exec/operator/spill_iceberg_table_sink_operator.h"
111
#include "exec/operator/spill_sort_sink_operator.h"
112
#include "exec/operator/spill_sort_source_operator.h"
113
#include "exec/operator/streaming_aggregation_operator.h"
114
#include "exec/operator/table_function_operator.h"
115
#include "exec/operator/tvf_table_sink_operator.h"
116
#include "exec/operator/union_sink_operator.h"
117
#include "exec/operator/union_source_operator.h"
118
#include "exec/pipeline/dependency.h"
119
#include "exec/pipeline/pipeline_task.h"
120
#include "exec/pipeline/task_scheduler.h"
121
#include "exec/runtime_filter/runtime_filter_mgr.h"
122
#include "exec/sort/topn_sorter.h"
123
#include "exec/spill/spill_file.h"
124
#include "io/fs/stream_load_pipe.h"
125
#include "load/stream_load/new_load_stream_mgr.h"
126
#include "runtime/exec_env.h"
127
#include "runtime/fragment_mgr.h"
128
#include "runtime/result_buffer_mgr.h"
129
#include "runtime/runtime_state.h"
130
#include "runtime/thread_context.h"
131
#include "service/backend_options.h"
132
#include "util/client_cache.h"
133
#include "util/countdown_latch.h"
134
#include "util/debug_util.h"
135
#include "util/network_util.h"
136
#include "util/uid_util.h"
137
138
namespace doris {
139
PipelineFragmentContext::PipelineFragmentContext(
140
        TUniqueId query_id, const TPipelineFragmentParams& request,
141
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
142
        const std::function<void(RuntimeState*, Status*)>& call_back)
143
394k
        : _query_id(std::move(query_id)),
144
394k
          _fragment_id(request.fragment_id),
145
394k
          _exec_env(exec_env),
146
394k
          _query_ctx(std::move(query_ctx)),
147
394k
          _call_back(call_back),
148
394k
          _is_report_on_cancel(true),
149
394k
          _params(request),
150
394k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
151
394k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
152
394k
                                                               : false) {
153
394k
    _fragment_watcher.start();
154
394k
}
155
156
394k
PipelineFragmentContext::~PipelineFragmentContext() {
157
394k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
158
394k
            .tag("query_id", print_id(_query_id))
159
394k
            .tag("fragment_id", _fragment_id);
160
394k
    _release_resource();
161
394k
    {
162
        // The memory released by the query end is recorded in the query mem tracker.
163
394k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
164
394k
        _runtime_state.reset();
165
394k
        _query_ctx.reset();
166
394k
    }
167
394k
}
168
169
190
bool PipelineFragmentContext::is_timeout(timespec now) const {
170
190
    if (_timeout <= 0) {
171
0
        return false;
172
0
    }
173
190
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
174
190
}
175
176
// notify_close() transitions the PFC from "waiting for external close notification" to
177
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
178
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
179
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
180
9.49k
bool PipelineFragmentContext::notify_close() {
181
9.49k
    bool all_closed = false;
182
9.49k
    bool need_remove = false;
183
9.49k
    {
184
9.49k
        std::lock_guard<std::mutex> l(_task_mutex);
185
9.49k
        if (_closed_tasks >= _total_tasks) {
186
3.72k
            if (_need_notify_close) {
187
                // Fragment was cancelled and waiting for notify to close.
188
                // Record that we need to remove from fragment mgr, but do it
189
                // after releasing _task_mutex to avoid ABBA deadlock with
190
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
191
                // first, then _task_mutex via debug_string()).
192
3.65k
                need_remove = true;
193
3.65k
            }
194
3.72k
            all_closed = true;
195
3.72k
        }
196
        // make fragment release by self after cancel
197
9.49k
        _need_notify_close = false;
198
9.49k
    }
199
9.49k
    if (need_remove) {
200
3.65k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
201
3.65k
    }
202
9.49k
    return all_closed;
203
9.49k
}
204
205
// Must not add lock in this method. Because it will call query ctx cancel. And
206
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
207
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
208
// There maybe dead lock.
209
5.78k
void PipelineFragmentContext::cancel(const Status reason) {
210
5.78k
    LOG_INFO("PipelineFragmentContext::cancel")
211
5.78k
            .tag("query_id", print_id(_query_id))
212
5.78k
            .tag("fragment_id", _fragment_id)
213
5.78k
            .tag("reason", reason.to_string());
214
5.78k
    if (notify_close()) {
215
85
        return;
216
85
    }
217
    // Timeout is a special error code, we need print current stack to debug timeout issue.
218
5.69k
    if (reason.is<ErrorCode::TIMEOUT>()) {
219
1
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
220
1
                                   debug_string());
221
1
        LOG_LONG_STRING(WARNING, dbg_str);
222
1
    }
223
224
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
225
5.69k
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
226
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
227
0
                    debug_string());
228
0
    }
229
230
5.69k
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
231
0
        print_profile("cancel pipeline, reason: " + reason.to_string());
232
0
    }
233
234
5.69k
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
235
25
        _query_ctx->set_load_error_url(error_url);
236
25
    }
237
238
5.69k
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
239
25
        _query_ctx->set_first_error_msg(first_error_msg);
240
25
    }
241
242
5.69k
    _query_ctx->cancel(reason, _fragment_id);
243
5.69k
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
244
270
        _is_report_on_cancel = false;
245
5.42k
    } else {
246
24.5k
        for (auto& id : _fragment_instance_ids) {
247
24.5k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
248
24.5k
        }
249
5.42k
    }
250
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
251
    // For stream load the fragment's query_id == load id, it is set in FE.
252
5.69k
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
253
5.69k
    if (stream_load_ctx != nullptr) {
254
33
        stream_load_ctx->pipe->cancel(reason.to_string());
255
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
256
        // We need to set the error URL at this point to ensure error information is properly
257
        // propagated to the client.
258
33
        stream_load_ctx->error_url = get_load_error_url();
259
33
        stream_load_ctx->first_error_msg = get_first_error_msg();
260
33
    }
261
262
25.6k
    for (auto& tasks : _tasks) {
263
53.6k
        for (auto& task : tasks) {
264
53.6k
            task.first->unblock_all_dependencies();
265
53.6k
        }
266
25.6k
    }
267
5.69k
}
268
269
615k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
270
615k
    PipelineId id = _next_pipeline_id++;
271
615k
    auto pipeline = std::make_shared<Pipeline>(
272
615k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
273
615k
            parent ? parent->num_tasks() : _num_instances);
274
615k
    if (idx >= 0) {
275
758
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
276
614k
    } else {
277
614k
        _pipelines.emplace_back(pipeline);
278
614k
    }
279
615k
    if (parent) {
280
216k
        parent->set_children(pipeline);
281
216k
    }
282
615k
    return pipeline;
283
615k
}
284
285
394k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
286
394k
    {
287
394k
        SCOPED_TIMER(_build_pipelines_timer);
288
        // 2. Build pipelines with operators in this fragment.
289
394k
        auto root_pipeline = add_pipeline();
290
394k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
291
394k
                                         &_root_op, root_pipeline));
292
293
        // Propagate _num_instances from LOCAL_EXCHANGE pipelines to ancestor pipelines
294
        // that inherited reduced num_tasks from a serial operator.
295
394k
        _propagate_local_exchange_num_tasks();
296
297
        // Create deferred local exchangers now that all pipelines have final num_tasks.
298
394k
        RETURN_IF_ERROR(_create_deferred_local_exchangers());
299
300
        // Raise num_tasks for pipelines whose serial non-scan operators (e.g.,
301
        // UNPARTITIONED Exchange) reduced num_tasks below _num_instances.
302
        // Without this, fragment instances 1+ have no task for these pipelines
303
        // and downstream operators fail with "must set shared state".
304
        //
305
        // This applies to ALL pipelines (not just deferred exchanger upstreams):
306
        // fragments with UNION/INTERSECT/EXCEPT + serial Exchange in child
307
        // pipelines also need the raise, even without FE-planned local exchange.
308
        //
309
        // Exception: serial scan sources (pooling scan) keep num_tasks=1 — the
310
        // PassthroughExchanger(1, N) handles the fan-out correctly.
311
        // NOTE: Do NOT raise pipelines whose source is a serial operator
312
        // (Exchange or scan) — they legitimately have 1 task, and raising
313
        // them causes crashes (e.g., 4 Exchange tasks but only 1 receives
314
        // data).  The correct fix for shared state injection across
315
        // instances is handled by the FE: it inserts local exchange nodes
316
        // between serial operators and their downstream consumers, creating
317
        // proper pipeline boundaries with _num_instances tasks.
318
319
        // 3. Create sink operator
320
394k
        if (!_params.fragment.__isset.output_sink) {
321
0
            return Status::InternalError("No output sink in this fragment!");
322
0
        }
323
394k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
324
394k
                                          _params.fragment.output_exprs, _params,
325
394k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
326
394k
                                          *_desc_tbl, root_pipeline->id()));
327
394k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
328
394k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
329
330
614k
        for (PipelinePtr& pipeline : _pipelines) {
331
614k
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
332
614k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
333
614k
        }
334
394k
    }
335
    // 4. Build local exchanger
336
394k
    if (_runtime_state->plan_local_shuffle()) {
337
116k
        SCOPED_TIMER(_plan_local_exchanger_timer);
338
116k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
339
116k
                                             _params.bucket_seq_to_instance_idx,
340
116k
                                             _params.shuffle_idx_to_instance_idx));
341
116k
    }
342
343
    // 5. Initialize global states in pipelines.
344
615k
    for (PipelinePtr& pipeline : _pipelines) {
345
615k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
346
615k
        pipeline->children().clear();
347
615k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
348
615k
    }
349
350
393k
    {
351
393k
        SCOPED_TIMER(_build_tasks_timer);
352
        // 6. Build pipeline tasks and initialize local state.
353
393k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
354
393k
    }
355
356
393k
    return Status::OK();
357
393k
}
358
359
394k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
360
394k
    if (_prepared) {
361
0
        return Status::InternalError("Already prepared");
362
0
    }
363
394k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
364
394k
        _timeout = _params.query_options.execution_timeout;
365
394k
    }
366
367
394k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
368
394k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
369
394k
    SCOPED_TIMER(_prepare_timer);
370
394k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
371
394k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
372
394k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
373
394k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
374
394k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
375
394k
    {
376
394k
        SCOPED_TIMER(_init_context_timer);
377
394k
        cast_set(_num_instances, _params.local_params.size());
378
394k
        _total_instances =
379
394k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
380
381
394k
        auto* fragment_context = this;
382
383
394k
        if (_params.query_options.__isset.is_report_success) {
384
393k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
385
393k
        }
386
387
        // 1. Set up the global runtime state.
388
394k
        _runtime_state = RuntimeState::create_unique(
389
394k
                _params.query_id, _params.fragment_id, _params.query_options,
390
394k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
391
394k
        _runtime_state->set_task_execution_context(shared_from_this());
392
394k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
393
394k
        if (_params.__isset.backend_id) {
394
390k
            _runtime_state->set_backend_id(_params.backend_id);
395
390k
        }
396
394k
        if (_params.__isset.import_label) {
397
240
            _runtime_state->set_import_label(_params.import_label);
398
240
        }
399
394k
        if (_params.__isset.db_name) {
400
191
            _runtime_state->set_db_name(_params.db_name);
401
191
        }
402
394k
        if (_params.__isset.load_job_id) {
403
0
            _runtime_state->set_load_job_id(_params.load_job_id);
404
0
        }
405
406
394k
        if (_params.is_simplified_param) {
407
136k
            _desc_tbl = _query_ctx->desc_tbl;
408
258k
        } else {
409
258k
            DCHECK(_params.__isset.desc_tbl);
410
258k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
411
258k
                                                  &_desc_tbl));
412
258k
        }
413
394k
        _runtime_state->set_desc_tbl(_desc_tbl);
414
394k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
415
394k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
416
394k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
417
394k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
418
419
        // init fragment_instance_ids
420
394k
        const auto target_size = _params.local_params.size();
421
394k
        _fragment_instance_ids.resize(target_size);
422
1.48M
        for (size_t i = 0; i < _params.local_params.size(); i++) {
423
1.08M
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
424
1.08M
            _fragment_instance_ids[i] = fragment_instance_id;
425
1.08M
        }
426
394k
    }
427
428
394k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
429
430
393k
    _init_next_report_time();
431
432
393k
    _prepared = true;
433
393k
    return Status::OK();
434
394k
}
435
436
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
437
        int instance_idx,
438
1.08M
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
439
1.08M
    const auto& local_params = _params.local_params[instance_idx];
440
1.08M
    auto fragment_instance_id = local_params.fragment_instance_id;
441
1.08M
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
442
1.08M
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
443
1.08M
    auto get_shared_state = [&](PipelinePtr pipeline)
444
1.08M
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
445
1.75M
                                       std::vector<std::shared_ptr<Dependency>>>> {
446
1.75M
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
447
1.75M
                                std::vector<std::shared_ptr<Dependency>>>>
448
1.75M
                shared_state_map;
449
2.29M
        for (auto& op : pipeline->operators()) {
450
2.29M
            auto source_id = op->operator_id();
451
2.29M
            if (auto iter = _op_id_to_shared_state.find(source_id);
452
2.29M
                iter != _op_id_to_shared_state.end()) {
453
689k
                shared_state_map.insert({source_id, iter->second});
454
689k
            }
455
2.29M
        }
456
1.75M
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
457
1.75M
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
458
1.75M
                iter != _op_id_to_shared_state.end()) {
459
277k
                shared_state_map.insert({sink_to_source_id, iter->second});
460
277k
            }
461
1.75M
        }
462
1.75M
        return shared_state_map;
463
1.75M
    };
464
465
3.24M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
466
2.16M
        auto& pipeline = _pipelines[pip_idx];
467
2.16M
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
468
1.75M
            auto task_runtime_state = RuntimeState::create_unique(
469
1.75M
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
470
1.75M
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
471
1.75M
            {
472
                // Initialize runtime state for this task
473
1.75M
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
474
475
1.75M
                task_runtime_state->set_task_execution_context(shared_from_this());
476
1.75M
                task_runtime_state->set_be_number(local_params.backend_num);
477
478
1.75M
                if (_params.__isset.backend_id) {
479
1.75M
                    task_runtime_state->set_backend_id(_params.backend_id);
480
1.75M
                }
481
1.75M
                if (_params.__isset.import_label) {
482
241
                    task_runtime_state->set_import_label(_params.import_label);
483
241
                }
484
1.75M
                if (_params.__isset.db_name) {
485
192
                    task_runtime_state->set_db_name(_params.db_name);
486
192
                }
487
1.75M
                if (_params.__isset.load_job_id) {
488
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
489
0
                }
490
1.75M
                if (_params.__isset.wal_id) {
491
113
                    task_runtime_state->set_wal_id(_params.wal_id);
492
113
                }
493
1.75M
                if (_params.__isset.content_length) {
494
34
                    task_runtime_state->set_content_length(_params.content_length);
495
34
                }
496
497
1.75M
                task_runtime_state->set_desc_tbl(_desc_tbl);
498
1.75M
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
499
1.75M
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
500
1.75M
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
501
1.75M
                task_runtime_state->set_max_operator_id(max_operator_id());
502
1.75M
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
503
1.75M
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
504
1.75M
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
505
506
1.75M
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
507
1.75M
            }
508
1.75M
            auto cur_task_id = _total_tasks++;
509
1.75M
            task_runtime_state->set_task_id(cur_task_id);
510
1.75M
            task_runtime_state->set_task_num(pipeline->num_tasks());
511
1.75M
            auto task = std::make_shared<PipelineTask>(
512
1.75M
                    pipeline, cur_task_id, task_runtime_state.get(),
513
1.75M
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
514
1.75M
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
515
1.75M
                    instance_idx);
516
1.75M
            pipeline->incr_created_tasks(instance_idx, task.get());
517
1.75M
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
518
1.75M
            _tasks[instance_idx].emplace_back(
519
1.75M
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
520
1.75M
                            std::move(task), std::move(task_runtime_state)});
521
1.75M
        }
522
2.16M
    }
523
524
    /**
525
         * Build DAG for pipeline tasks.
526
         * For example, we have
527
         *
528
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
529
         *            \                      /
530
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
531
         *                 \                          /
532
         *               JoinProbeOperator2 (Pipeline1)
533
         *
534
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
535
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
536
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
537
         *
538
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
539
         * and JoinProbeOperator2.
540
         */
541
2.16M
    for (auto& _pipeline : _pipelines) {
542
2.16M
        if (pipeline_id_to_task.contains(_pipeline->id())) {
543
1.74M
            auto* task = pipeline_id_to_task[_pipeline->id()];
544
1.74M
            DCHECK(task != nullptr);
545
546
            // If this task has upstream dependency, then inject it into this task.
547
1.74M
            if (_dag.contains(_pipeline->id())) {
548
1.08M
                auto& deps = _dag[_pipeline->id()];
549
1.08M
                for (auto& dep : deps) {
550
1.08M
                    if (pipeline_id_to_task.contains(dep)) {
551
672k
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
552
672k
                        if (ss) {
553
383k
                            task->inject_shared_state(ss);
554
383k
                        } else {
555
289k
                            pipeline_id_to_task[dep]->inject_shared_state(
556
289k
                                    task->get_source_shared_state());
557
289k
                        }
558
672k
                    }
559
1.08M
                }
560
1.08M
            }
561
1.74M
        }
562
2.16M
    }
563
3.24M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
564
2.16M
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
565
1.75M
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
566
1.75M
            DCHECK(pipeline_id_to_profile[pip_idx]);
567
1.75M
            std::vector<TScanRangeParams> scan_ranges;
568
1.75M
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
569
1.75M
            if (local_params.per_node_scan_ranges.contains(node_id)) {
570
293k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
571
293k
            }
572
1.75M
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
573
1.75M
                                                             _params.fragment.output_sink));
574
1.75M
        }
575
2.16M
    }
576
1.08M
    {
577
1.08M
        std::lock_guard<std::mutex> l(_state_map_lock);
578
1.08M
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
579
1.08M
    }
580
1.08M
    return Status::OK();
581
1.08M
}
582
583
393k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
584
393k
    _total_tasks = 0;
585
393k
    _closed_tasks = 0;
586
393k
    const auto target_size = _params.local_params.size();
587
393k
    _tasks.resize(target_size);
588
393k
    _runtime_filter_mgr_map.resize(target_size);
589
1.00M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
590
614k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
591
614k
    }
592
393k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
593
594
393k
    if (target_size > 1 &&
595
393k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
596
141k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
597
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
598
20.1k
        std::vector<Status> prepare_status(target_size);
599
20.1k
        int submitted_tasks = 0;
600
20.1k
        Status submit_status;
601
20.1k
        CountDownLatch latch((int)target_size);
602
237k
        for (int i = 0; i < target_size; i++) {
603
216k
            submit_status = thread_pool->submit_func([&, i]() {
604
216k
                SCOPED_ATTACH_TASK(_query_ctx.get());
605
216k
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
606
216k
                latch.count_down();
607
216k
            });
608
216k
            if (LIKELY(submit_status.ok())) {
609
216k
                submitted_tasks++;
610
18.4E
            } else {
611
18.4E
                break;
612
18.4E
            }
613
216k
        }
614
20.1k
        latch.arrive_and_wait(target_size - submitted_tasks);
615
20.1k
        if (UNLIKELY(!submit_status.ok())) {
616
0
            return submit_status;
617
0
        }
618
237k
        for (int i = 0; i < submitted_tasks; i++) {
619
216k
            if (!prepare_status[i].ok()) {
620
0
                return prepare_status[i];
621
0
            }
622
216k
        }
623
373k
    } else {
624
1.24M
        for (int i = 0; i < target_size; i++) {
625
869k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
626
869k
        }
627
373k
    }
628
393k
    _pipeline_parent_map.clear();
629
393k
    _op_id_to_shared_state.clear();
630
    // Record task cardinality once when this fragment context finishes task initialization.
631
393k
    _query_ctx->add_total_task_num(_total_tasks.load(std::memory_order_relaxed));
632
633
393k
    return Status::OK();
634
393k
}
635
636
393k
void PipelineFragmentContext::_init_next_report_time() {
637
393k
    auto interval_s = config::pipeline_status_report_interval;
638
393k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
639
37.8k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
640
37.8k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
641
        // We don't want to wait longer than it takes to run the entire fragment.
642
37.8k
        _previous_report_time =
643
37.8k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
644
37.8k
        _disable_period_report = false;
645
37.8k
    }
646
393k
}
647
648
4.49k
void PipelineFragmentContext::refresh_next_report_time() {
649
4.49k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
650
4.49k
    DCHECK(disable == true);
651
4.49k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
652
4.49k
    _disable_period_report.compare_exchange_strong(disable, false);
653
4.49k
}
654
655
6.43M
void PipelineFragmentContext::trigger_report_if_necessary() {
656
6.43M
    if (!_is_report_success) {
657
6.01M
        return;
658
6.01M
    }
659
418k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
660
418k
    if (disable) {
661
7.42k
        return;
662
7.42k
    }
663
410k
    int32_t interval_s = config::pipeline_status_report_interval;
664
410k
    if (interval_s <= 0) {
665
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
666
0
                        "trigger "
667
0
                        "report.";
668
0
    }
669
410k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
670
410k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
671
410k
    if (MonotonicNanos() > next_report_time) {
672
4.49k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
673
4.49k
                                                            std::memory_order_acq_rel)) {
674
9
            return;
675
9
        }
676
4.49k
        if (VLOG_FILE_IS_ON) {
677
0
            VLOG_FILE << "Reporting "
678
0
                      << "profile for query_id " << print_id(_query_id)
679
0
                      << ", fragment id: " << _fragment_id;
680
681
0
            std::stringstream ss;
682
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
683
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
684
0
            if (_runtime_state->load_channel_profile()) {
685
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
686
0
            }
687
688
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
689
0
                      << " profile:\n"
690
0
                      << ss.str();
691
0
        }
692
4.49k
        auto st = send_report(false);
693
4.49k
        if (!st.ok()) {
694
0
            disable = true;
695
0
            _disable_period_report.compare_exchange_strong(disable, false,
696
0
                                                           std::memory_order_acq_rel);
697
0
        }
698
4.49k
    }
699
410k
}
700
701
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
702
393k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
703
393k
    if (_params.fragment.plan.nodes.empty()) {
704
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
705
0
    }
706
707
393k
    int node_idx = 0;
708
709
393k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
710
393k
                                        &node_idx, root, cur_pipe, 0, false, false));
711
712
393k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
713
0
        return Status::InternalError(
714
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
715
0
    }
716
393k
    return Status::OK();
717
393k
}
718
719
393k
Status PipelineFragmentContext::_create_deferred_local_exchangers() {
720
393k
    for (auto& info : _deferred_exchangers) {
721
        // DANGER ZONE — do not "fix" this line without reading the history.
722
        //
723
        // sender_count seeds Exchanger::_running_sink_operators, which the source side
724
        // waits to reach 0 via sub_running_sink_operators on each sink LocalState close.
725
        // The correct value is THIS pipeline-instance's sink task count, which is exactly
726
        // info.upstream_pipe->num_tasks() — one PipelineTask per task, one close per task.
727
        //
728
        // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to mirror the
729
        //   BE-planned path in _add_local_exchange_impl (~line 1023).  THIS BREAKS the
730
        //   common FE-planned shape of `serial scan → LE(PT) → ...`: upstream_pipe
731
        //   genuinely has num_tasks=1, only 1 close arrives, but seed becomes
732
        //   _num_instances so _running_sink_operators never reaches 0 — downstream
733
        //   sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
734
        //   mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING and regressed
735
        //   exactly this way).  BE-planned mode uses max() because its
736
        //   `cur_pipe` is the source-side pipeline (always raised to _num_instances by
737
        //   add_pipeline) — not analogous to our `upstream_pipe` here, which is the
738
        //   sink-side pipeline that may legitimately stay at 1 for serial sources.
739
        //
740
        // Tempting wrong fix #2: multiply by _num_instances on the theory shared_state
741
        //   is shared across all instances.  Same hang — each fragment-instance
742
        //   PipelineFragmentContext has its OWN _op_id_to_shared_state map, so the
743
        //   exchanger is per-instance, not per-BE.  num_tasks() is already the right
744
        //   close-count for one instance.
745
        //
746
        // If a hang shows up with `_running_sink_operators < 0`, the bug is upstream:
747
        // _propagate_local_exchange_num_tasks left num_tasks too low (or too high) for
748
        // this fragment shape.  Fix THAT pass, not this seed value.
749
113k
        const int sender_count = info.upstream_pipe->num_tasks();
750
113k
        switch (info.partition_type) {
751
20.9k
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
752
20.9k
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
753
20.9k
            info.shared_state->exchanger = ShuffleExchanger::create_unique(
754
20.9k
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit,
755
20.9k
                    info.partition_type);
756
20.9k
            break;
757
456
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
758
456
            info.shared_state->exchanger = BucketShuffleExchanger::create_unique(
759
456
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit);
760
456
            break;
761
88.5k
        case TLocalPartitionType::PASSTHROUGH:
762
88.5k
            info.shared_state->exchanger = PassthroughExchanger::create_unique(
763
88.5k
                    sender_count, _num_instances, info.free_blocks_limit);
764
88.5k
            break;
765
292
        case TLocalPartitionType::BROADCAST:
766
292
            info.shared_state->exchanger = BroadcastExchanger::create_unique(
767
292
                    sender_count, _num_instances, info.free_blocks_limit);
768
292
            break;
769
2.48k
        case TLocalPartitionType::PASS_TO_ONE:
770
2.48k
            if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
771
1.38k
                info.shared_state->exchanger = PassToOneExchanger::create_unique(
772
1.38k
                        sender_count, _num_instances, info.free_blocks_limit);
773
1.38k
            } else {
774
1.10k
                info.shared_state->exchanger = BroadcastExchanger::create_unique(
775
1.10k
                        sender_count, _num_instances, info.free_blocks_limit);
776
1.10k
            }
777
2.48k
            break;
778
710
        case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
779
710
            info.shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
780
710
                    sender_count, _num_instances, info.free_blocks_limit);
781
710
            break;
782
0
        case TLocalPartitionType::NOOP:
783
0
        case TLocalPartitionType::LOCAL_MERGE_SORT:
784
            // FE-planned LocalExchangeNode currently never emits NOOP or LOCAL_MERGE_SORT
785
            // through the deferred-exchanger path.  NOOP means "no exchange needed" and
786
            // is filtered out before reaching here; LOCAL_MERGE_SORT is planned by the
787
            // legacy BE path only.  Crash in debug to surface the protocol violation if
788
            // that ever changes; return an error in release to avoid silently corrupting
789
            // execution.
790
0
            DCHECK(false) << "FE-planned local exchange should not emit partition_type="
791
0
                          << static_cast<int>(info.partition_type);
792
0
            return Status::InternalError("FE-planned local exchange emitted unsupported type: " +
793
0
                                         std::to_string(static_cast<int>(info.partition_type)));
794
0
        default:
795
            // New TLocalPartitionType added on FE side without a BE handler here.
796
0
            DCHECK(false) << "Unhandled TLocalPartitionType in deferred exchangers: "
797
0
                          << static_cast<int>(info.partition_type);
798
0
            return Status::InternalError("Unsupported FE-planned local exchange type: " +
799
0
                                         std::to_string(static_cast<int>(info.partition_type)));
800
113k
        }
801
113k
    }
802
393k
    _deferred_exchangers.clear();
803
393k
    return Status::OK();
804
393k
}
805
806
394k
void PipelineFragmentContext::_propagate_local_exchange_num_tasks() {
807
    // Only runs when FE has planned local exchanges and BE deferred their construction.
808
    // In legacy mode (enable_local_shuffle_planner=false) BE plans LE itself via
809
    // _plan_local_exchange and _deferred_exchangers stays empty — the legacy path
810
    // already gets its num_tasks right at construction time, so the propagate passes
811
    // would be no-ops and are skipped.  This is a transitional design: once the FE
812
    // planner is the only planner, the propagation logic itself should degrade into
813
    // a pure assertion that the FE plan already wired the right num_tasks everywhere.
814
394k
    if (_deferred_exchangers.empty()) {
815
306k
        return;
816
306k
    }
817
    // Reconcile num_tasks across paired pipelines created by pipeline-splitting operators
818
    // (AGG, SORT, JOIN): they share state via inject_shared_state and must agree, or
819
    // instance 1+ tasks access null shared_state.  A pipeline's num_tasks is fully
820
    // determined by its source operator plus its upstreams:
821
    //   - LocalExchangeSource  -> _num_instances (the LE re-parallelizes)
822
    //   - serial source        -> its reduced count (kept as-is, typically 1)
823
    //   - otherwise (splitter) -> inherit from upstreams: raise to _num_instances if any
824
    //                             upstream was raised by an LE, then lower to a serial
825
    //                             upstream's count (lower wins).
826
    // Visiting each pipeline only after all its upstreams (topological order over _dag) lets
827
    // a single sweep reach the same fixpoint the previous two while-loops iterated to — those
828
    // only existed to reconcile the top-down build's parent-inherited num_tasks guesses.
829
87.3k
    std::map<PipelineId, PipelinePtr> id_to_pipe;
830
87.3k
    std::map<PipelineId, std::vector<PipelineId>> downstreams_of;
831
87.3k
    std::map<PipelineId, int> in_degree;
832
250k
    for (auto& p : _pipelines) {
833
250k
        id_to_pipe[p->id()] = p;
834
250k
        in_degree.try_emplace(p->id(), 0);
835
250k
    }
836
157k
    for (const auto& [downstream_id, upstream_ids] : _dag) {
837
162k
        for (auto upstream_id : upstream_ids) {
838
162k
            downstreams_of[upstream_id].push_back(downstream_id);
839
162k
            in_degree[downstream_id]++;
840
162k
        }
841
157k
    }
842
87.3k
    std::vector<PipelineId> ready;
843
250k
    for (const auto& [id, deg] : in_degree) {
844
250k
        if (deg == 0) {
845
92.5k
            ready.push_back(id);
846
92.5k
        }
847
250k
    }
848
87.3k
    size_t visited = 0;
849
337k
    while (!ready.empty()) {
850
250k
        const auto id = ready.back();
851
250k
        ready.pop_back();
852
250k
        visited++;
853
250k
        auto pit = id_to_pipe.find(id);
854
250k
        if (pit != id_to_pipe.end()) {
855
250k
            auto& pipe = pit->second;
856
250k
            const auto& ops = pipe->operators();
857
250k
            const bool le_source =
858
250k
                    !ops.empty() && dynamic_cast<LocalExchangeSourceOperatorX*>(ops.front().get());
859
250k
            const bool serial_source = !ops.empty() && ops.front()->is_serial_operator();
860
250k
            if (le_source) {
861
113k
                pipe->set_num_tasks(_num_instances);
862
136k
            } else if (!serial_source) {
863
60.4k
                int target = pipe->num_tasks();
864
60.4k
                const auto up_it = _dag.find(id);
865
60.4k
                if (up_it != _dag.end()) {
866
                    // raise: any upstream already at _num_instances (e.g. an LE source)
867
44.3k
                    for (auto upstream_id : up_it->second) {
868
44.3k
                        auto uit = id_to_pipe.find(upstream_id);
869
44.3k
                        if (uit != id_to_pipe.end() && uit->second->num_tasks() >= _num_instances) {
870
44.3k
                            target = _num_instances;
871
44.3k
                            break;
872
44.3k
                        }
873
44.3k
                    }
874
                    // lower: a serial upstream with fewer tasks (wins over the raise above)
875
44.9k
                    for (auto upstream_id : up_it->second) {
876
44.9k
                        auto uit = id_to_pipe.find(upstream_id);
877
44.9k
                        if (uit != id_to_pipe.end() && uit->second->num_tasks() < target &&
878
44.9k
                            !uit->second->operators().empty() &&
879
44.9k
                            uit->second->operators().front()->is_serial_operator()) {
880
0
                            target = uit->second->num_tasks();
881
0
                        }
882
44.9k
                    }
883
44.3k
                }
884
60.4k
                pipe->set_num_tasks(target);
885
60.4k
            }
886
250k
        }
887
250k
        for (auto down : downstreams_of[id]) {
888
162k
            if (--in_degree[down] == 0) {
889
157k
                ready.push_back(down);
890
157k
            }
891
162k
        }
892
250k
    }
893
    // The pipeline DAG is acyclic; if a future change introduces a back-edge, some pipelines
894
    // stay unvisited (in_degree never reaches 0) — fail loudly rather than silently leaving
895
    // their num_tasks unreconciled.
896
87.3k
    DCHECK_EQ(visited, in_degree.size())
897
0
            << "pipeline num_tasks topological sweep visited " << visited << " of "
898
0
            << in_degree.size() << " pipelines (cycle in _dag?)";
899
87.3k
}
900
901
Status PipelineFragmentContext::_create_tree_helper(
902
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
903
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
904
717k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
905
    // propagate error case
906
717k
    if (*node_idx >= tnodes.size()) {
907
0
        return Status::InternalError(
908
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
909
0
                *node_idx, tnodes.size());
910
0
    }
911
717k
    const TPlanNode& tnode = tnodes[*node_idx];
912
913
717k
    int num_children = tnodes[*node_idx].num_children;
914
717k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
915
717k
    bool current_require_bucket_distribution = require_bucket_distribution;
916
    // TODO: Create CacheOperator is confused now
917
717k
    OperatorPtr op = nullptr;
918
717k
    OperatorPtr cache_op = nullptr;
919
717k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
920
717k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
921
717k
                                     followed_by_shuffled_operator,
922
717k
                                     current_require_bucket_distribution, cache_op));
923
    // Initialization must be done here. For example, group by expressions in agg will be used to
924
    // decide if a local shuffle should be planed, so it must be initialized here.
925
717k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
926
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
927
717k
    if (parent != nullptr) {
928
        // add to parent's child(s)
929
324k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
930
393k
    } else {
931
393k
        *root = op;
932
393k
    }
933
    /**
934
     * `TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
935
     *
936
     * For plan:
937
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
938
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
939
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
940
     *
941
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
942
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
943
     */
944
717k
    auto required_data_distribution =
945
717k
            cur_pipe->operators().empty()
946
717k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
947
717k
                    : op->required_data_distribution(_runtime_state.get());
948
717k
    current_followed_by_shuffled_operator =
949
717k
            ((followed_by_shuffled_operator ||
950
717k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
951
651k
                                             : op->is_shuffled_operator())) &&
952
717k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
953
717k
            (followed_by_shuffled_operator &&
954
601k
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
955
956
717k
    current_require_bucket_distribution =
957
717k
            ((require_bucket_distribution ||
958
717k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
959
657k
                                             : op->is_colocated_operator())) &&
960
717k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
961
717k
            (require_bucket_distribution &&
962
609k
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
963
964
717k
    if (num_children == 0) {
965
409k
        _use_serial_source = op->is_serial_operator();
966
409k
    }
967
    // rely on that tnodes is preorder of the plan
968
1.04M
    for (int i = 0; i < num_children; i++) {
969
324k
        ++*node_idx;
970
324k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
971
324k
                                            current_followed_by_shuffled_operator,
972
324k
                                            current_require_bucket_distribution));
973
974
        // we are expecting a child, but have used all nodes
975
        // this means we have been given a bad tree and must fail
976
324k
        if (*node_idx >= tnodes.size()) {
977
0
            return Status::InternalError(
978
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
979
0
                    "nodes: {}",
980
0
                    *node_idx, tnodes.size());
981
0
        }
982
324k
    }
983
984
717k
    return Status::OK();
985
717k
}
986
987
void PipelineFragmentContext::_inherit_pipeline_properties(
988
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
989
758
        PipelinePtr pipe_with_sink) {
990
758
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
991
758
    pipe_with_source->set_num_tasks(_num_instances);
992
758
    pipe_with_source->set_data_distribution(data_distribution);
993
758
}
994
995
Status PipelineFragmentContext::_add_local_exchange_impl(
996
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
997
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
998
        const std::map<int, int>& bucket_seq_to_instance_idx,
999
758
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1000
758
    auto& operators = cur_pipe->operators();
1001
758
    const auto downstream_pipeline_id = cur_pipe->id();
1002
758
    auto local_exchange_id = next_operator_id();
1003
    // 1. Create a new pipeline with local exchange sink.
1004
758
    DataSinkOperatorPtr sink;
1005
758
    auto sink_id = next_sink_operator_id();
1006
1007
    /**
1008
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
1009
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
1010
     */
1011
758
    const bool followed_by_shuffled_operator =
1012
758
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
1013
758
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
1014
758
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
1015
758
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
1016
758
                                         followed_by_shuffled_operator && !_use_serial_source;
1017
758
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
1018
758
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
1019
758
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
1020
758
    if (bucket_seq_to_instance_idx.empty() &&
1021
758
        data_distribution.distribution_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
1022
0
        data_distribution.distribution_type =
1023
0
                use_global_hash_shuffle ? TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE
1024
0
                                        : TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1025
0
    }
1026
758
    if (!use_global_hash_shuffle &&
1027
758
        data_distribution.distribution_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
1028
42
        data_distribution.distribution_type = TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1029
42
    }
1030
758
    RETURN_IF_ERROR(new_pip->set_sink(sink));
1031
758
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
1032
758
                                          num_buckets, shuffle_idx_to_instance_idx));
1033
1034
    // 2. Create and initialize LocalExchangeSharedState.
1035
758
    std::shared_ptr<LocalExchangeSharedState> shared_state =
1036
758
            LocalExchangeSharedState::create_shared(_num_instances);
1037
758
    switch (data_distribution.distribution_type) {
1038
42
    case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
1039
42
    case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
1040
42
        shared_state->exchanger = ShuffleExchanger::create_unique(
1041
42
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1042
42
                use_global_hash_shuffle ? _total_instances : _num_instances,
1043
42
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1044
42
                        ? cast_set<int>(
1045
42
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1046
42
                        : 0,
1047
42
                data_distribution.distribution_type);
1048
42
        break;
1049
7
    case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
1050
7
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
1051
7
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
1052
7
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1053
7
                        ? cast_set<int>(
1054
7
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1055
7
                        : 0);
1056
7
        break;
1057
616
    case TLocalPartitionType::PASSTHROUGH:
1058
616
        shared_state->exchanger = PassthroughExchanger::create_unique(
1059
616
                cur_pipe->num_tasks(), _num_instances,
1060
616
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1061
616
                        ? cast_set<int>(
1062
616
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1063
616
                        : 0);
1064
616
        break;
1065
8
    case TLocalPartitionType::BROADCAST:
1066
8
        shared_state->exchanger = BroadcastExchanger::create_unique(
1067
8
                cur_pipe->num_tasks(), _num_instances,
1068
8
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1069
8
                        ? cast_set<int>(
1070
8
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1071
8
                        : 0);
1072
8
        break;
1073
2
    case TLocalPartitionType::PASS_TO_ONE:
1074
2
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
1075
            // If shared hash table is enabled for BJ, hash table will be built by only one task
1076
2
            shared_state->exchanger = PassToOneExchanger::create_unique(
1077
2
                    cur_pipe->num_tasks(), _num_instances,
1078
2
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1079
2
                            ? cast_set<int>(_runtime_state->query_options()
1080
2
                                                    .local_exchange_free_blocks_limit)
1081
2
                            : 0);
1082
2
        } else {
1083
0
            shared_state->exchanger = BroadcastExchanger::create_unique(
1084
0
                    cur_pipe->num_tasks(), _num_instances,
1085
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1086
0
                            ? cast_set<int>(_runtime_state->query_options()
1087
0
                                                    .local_exchange_free_blocks_limit)
1088
0
                            : 0);
1089
0
        }
1090
2
        break;
1091
83
    case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
1092
83
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
1093
83
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1094
83
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1095
83
                        ? cast_set<int>(
1096
83
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1097
83
                        : 0);
1098
83
        break;
1099
0
    default:
1100
0
        return Status::InternalError("Unsupported local exchange type : " +
1101
0
                                     std::to_string((int)data_distribution.distribution_type));
1102
758
    }
1103
758
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
1104
758
                                             "LOCAL_EXCHANGE_OPERATOR");
1105
758
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
1106
758
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
1107
1108
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
1109
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
1110
1111
    // 3.1 Initialize new pipeline's operator list.
1112
758
    std::copy(operators.begin(), operators.begin() + idx,
1113
758
              std::inserter(new_pip->operators(), new_pip->operators().end()));
1114
1115
    // 3.2 Erase unused operators in previous pipeline.
1116
758
    operators.erase(operators.begin(), operators.begin() + idx);
1117
1118
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
1119
758
    OperatorPtr source_op;
1120
758
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
1121
758
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
1122
758
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
1123
758
    if (!operators.empty()) {
1124
217
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
1125
217
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
1126
217
    }
1127
758
    operators.insert(operators.begin(), source_op);
1128
1129
    // 5. Set children for two pipelines separately.
1130
758
    std::vector<std::shared_ptr<Pipeline>> new_children;
1131
758
    std::vector<PipelineId> edges_with_source;
1132
1.50k
    for (auto child : cur_pipe->children()) {
1133
1.50k
        bool found = false;
1134
2.08k
        for (auto op : new_pip->operators()) {
1135
2.08k
            if (child->sink()->node_id() == op->node_id()) {
1136
476
                new_pip->set_children(child);
1137
476
                found = true;
1138
476
            };
1139
2.08k
        }
1140
1.50k
        if (!found) {
1141
1.02k
            new_children.push_back(child);
1142
1.02k
            edges_with_source.push_back(child->id());
1143
1.02k
        }
1144
1.50k
    }
1145
758
    new_children.push_back(new_pip);
1146
758
    edges_with_source.push_back(new_pip->id());
1147
1148
    // 6. Set DAG for new pipelines.
1149
758
    if (!new_pip->children().empty()) {
1150
274
        std::vector<PipelineId> edges_with_sink;
1151
476
        for (auto child : new_pip->children()) {
1152
476
            edges_with_sink.push_back(child->id());
1153
476
        }
1154
274
        _dag.insert({new_pip->id(), edges_with_sink});
1155
274
    }
1156
758
    cur_pipe->set_children(new_children);
1157
758
    _dag[downstream_pipeline_id] = edges_with_source;
1158
758
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
1159
758
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
1160
758
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
1161
1162
    // 7. Inherit properties from current pipeline.
1163
758
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
1164
758
    return Status::OK();
1165
758
}
1166
1167
Status PipelineFragmentContext::_add_local_exchange(
1168
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
1169
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
1170
        const std::map<int, int>& bucket_seq_to_instance_idx,
1171
9.31k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1172
9.31k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
1173
7.86k
        return Status::OK();
1174
7.86k
    }
1175
1176
1.44k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
1177
720
        return Status::OK();
1178
720
    }
1179
727
    *do_local_exchange = true;
1180
1181
727
    auto& operators = cur_pipe->operators();
1182
727
    auto total_op_num = operators.size();
1183
727
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
1184
727
    RETURN_IF_ERROR(_add_local_exchange_impl(
1185
727
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
1186
727
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1187
1188
727
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
1189
0
            << "total_op_num: " << total_op_num
1190
0
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
1191
0
            << " new_pip->operators().size(): " << new_pip->operators().size();
1192
1193
    // There are some local shuffles with relatively heavy operations on the sink.
1194
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
1195
    // Therefore, local passthrough is used to increase the concurrency of the sink.
1196
    // op -> local sink(1) -> local source (n)
1197
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
1198
727
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
1199
727
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
1200
31
        RETURN_IF_ERROR(_add_local_exchange_impl(
1201
31
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
1202
31
                add_pipeline(new_pip, pip_idx + 2),
1203
31
                DataDistribution(TLocalPartitionType::PASSTHROUGH), do_local_exchange, num_buckets,
1204
31
                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1205
31
    }
1206
727
    return Status::OK();
1207
727
}
1208
1209
Status PipelineFragmentContext::_plan_local_exchange(
1210
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
1211
116k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1212
263k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
1213
146k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
1214
        // Set property if child pipeline is not join operator's child.
1215
146k
        if (!_pipelines[pip_idx]->children().empty()) {
1216
26.2k
            for (auto& child : _pipelines[pip_idx]->children()) {
1217
26.2k
                if (child->sink()->node_id() ==
1218
26.2k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
1219
21.0k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
1220
21.0k
                }
1221
26.2k
            }
1222
23.8k
        }
1223
1224
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1225
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1226
        // still keep colocate plan after local shuffle
1227
146k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1228
146k
                                             bucket_seq_to_instance_idx,
1229
146k
                                             shuffle_idx_to_instance_idx));
1230
146k
    }
1231
116k
    return Status::OK();
1232
116k
}
1233
1234
Status PipelineFragmentContext::_plan_local_exchange(
1235
        int num_buckets, int pip_idx, PipelinePtr pip,
1236
        const std::map<int, int>& bucket_seq_to_instance_idx,
1237
146k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1238
146k
    int idx = 1;
1239
146k
    bool do_local_exchange = false;
1240
147k
    do {
1241
147k
        auto& ops = pip->operators();
1242
147k
        do_local_exchange = false;
1243
        // Plan local exchange for each operator.
1244
153k
        for (; idx < ops.size();) {
1245
6.73k
            auto _le_req = ops[idx]->required_data_distribution(_runtime_state.get());
1246
6.73k
            if (_le_req.need_local_exchange()) {
1247
5.15k
                RETURN_IF_ERROR(_add_local_exchange(
1248
5.15k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, _le_req,
1249
5.15k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1250
5.15k
                        shuffle_idx_to_instance_idx));
1251
5.15k
            }
1252
6.73k
            if (do_local_exchange) {
1253
                // If local exchange is needed for current operator, we will split this pipeline to
1254
                // two pipelines by local exchange sink/source. And then we need to process remaining
1255
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1256
                // is current operator was already processed) and continue to plan local exchange.
1257
217
                idx = 2;
1258
217
                break;
1259
217
            }
1260
6.51k
            idx++;
1261
6.51k
        }
1262
147k
    } while (do_local_exchange);
1263
146k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1264
4.16k
        RETURN_IF_ERROR(_add_local_exchange(
1265
4.16k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1266
4.16k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1267
4.16k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1268
4.16k
    }
1269
146k
    return Status::OK();
1270
146k
}
1271
1272
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1273
                                                  const std::vector<TExpr>& output_exprs,
1274
                                                  const TPipelineFragmentParams& params,
1275
                                                  const RowDescriptor& row_desc,
1276
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1277
394k
                                                  PipelineId cur_pipeline_id) {
1278
394k
    switch (thrift_sink.type) {
1279
134k
    case TDataSinkType::DATA_STREAM_SINK: {
1280
134k
        if (!thrift_sink.__isset.stream_sink) {
1281
0
            return Status::InternalError("Missing data stream sink.");
1282
0
        }
1283
134k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1284
134k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1285
134k
                params.destinations, _fragment_instance_ids);
1286
134k
        break;
1287
134k
    }
1288
223k
    case TDataSinkType::RESULT_SINK: {
1289
223k
        if (!thrift_sink.__isset.result_sink) {
1290
0
            return Status::InternalError("Missing data buffer sink.");
1291
0
        }
1292
1293
223k
        auto& pipeline = _pipelines[cur_pipeline_id];
1294
223k
        int child_node_id = pipeline->operators().back()->node_id();
1295
223k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), child_node_id + 1,
1296
223k
                                                      row_desc, output_exprs,
1297
223k
                                                      thrift_sink.result_sink);
1298
223k
        break;
1299
223k
    }
1300
105
    case TDataSinkType::DICTIONARY_SINK: {
1301
105
        if (!thrift_sink.__isset.dictionary_sink) {
1302
0
            return Status::InternalError("Missing dict sink.");
1303
0
        }
1304
1305
105
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1306
105
                                                    thrift_sink.dictionary_sink);
1307
105
        break;
1308
105
    }
1309
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1310
32.0k
    case TDataSinkType::OLAP_TABLE_SINK: {
1311
32.0k
        auto& pipeline = _pipelines[cur_pipeline_id];
1312
32.0k
        int child_node_id = pipeline->operators().back()->node_id();
1313
32.0k
        if (state->query_options().enable_memtable_on_sink_node &&
1314
32.0k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1315
32.0k
            !_has_row_binlog(thrift_sink.olap_table_sink) && !config::is_cloud_mode()) {
1316
1.49k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(
1317
1.49k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1318
30.5k
        } else {
1319
30.5k
            _sink = std::make_shared<OlapTableSinkOperatorX>(
1320
30.5k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1321
30.5k
        }
1322
32.0k
        break;
1323
0
    }
1324
167
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1325
167
        DCHECK(thrift_sink.__isset.olap_table_sink);
1326
167
        DCHECK(state->get_query_ctx() != nullptr);
1327
167
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1328
167
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1329
167
                                                                output_exprs);
1330
167
        break;
1331
0
    }
1332
744
    case TDataSinkType::HIVE_TABLE_SINK: {
1333
744
        if (!thrift_sink.__isset.hive_table_sink) {
1334
0
            return Status::InternalError("Missing hive table sink.");
1335
0
        }
1336
744
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1337
744
                                                         output_exprs);
1338
744
        break;
1339
744
    }
1340
875
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1341
875
        if (!thrift_sink.__isset.iceberg_table_sink) {
1342
0
            return Status::InternalError("Missing iceberg table sink.");
1343
0
        }
1344
875
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1345
2
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1346
2
                                                                     row_desc, output_exprs);
1347
873
        } else {
1348
873
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1349
873
                                                                row_desc, output_exprs);
1350
873
        }
1351
875
        break;
1352
875
    }
1353
11
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1354
11
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1355
0
            return Status::InternalError("Missing iceberg delete sink.");
1356
0
        }
1357
11
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1358
11
                                                             row_desc, output_exprs);
1359
11
        break;
1360
11
    }
1361
40
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1362
40
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1363
0
            return Status::InternalError("Missing iceberg merge sink.");
1364
0
        }
1365
40
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1366
40
                                                            output_exprs);
1367
40
        break;
1368
40
    }
1369
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1370
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1371
0
            return Status::InternalError("Missing max compute table sink.");
1372
0
        }
1373
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1374
0
                                                       output_exprs);
1375
0
        break;
1376
0
    }
1377
44
    case TDataSinkType::JDBC_TABLE_SINK: {
1378
44
        if (!thrift_sink.__isset.jdbc_table_sink) {
1379
0
            return Status::InternalError("Missing data jdbc sink.");
1380
0
        }
1381
44
        if (config::enable_java_support) {
1382
44
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1383
44
                                                             output_exprs);
1384
44
        } else {
1385
0
            return Status::InternalError(
1386
0
                    "Jdbc table sink is not enabled, you can change be config "
1387
0
                    "enable_java_support to true and restart be.");
1388
0
        }
1389
44
        break;
1390
44
    }
1391
44
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1392
3
        if (!thrift_sink.__isset.memory_scratch_sink) {
1393
0
            return Status::InternalError("Missing data buffer sink.");
1394
0
        }
1395
1396
3
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1397
3
                                                             output_exprs);
1398
3
        break;
1399
3
    }
1400
421
    case TDataSinkType::RESULT_FILE_SINK: {
1401
421
        if (!thrift_sink.__isset.result_file_sink) {
1402
0
            return Status::InternalError("Missing result file sink.");
1403
0
        }
1404
1405
        // Result file sink is not the top sink
1406
421
        if (params.__isset.destinations && !params.destinations.empty()) {
1407
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1408
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1409
0
                    params.destinations, output_exprs, desc_tbl);
1410
421
        } else {
1411
421
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1412
421
                                                              output_exprs);
1413
421
        }
1414
421
        break;
1415
421
    }
1416
1.60k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1417
1.60k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1418
1.60k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1419
1.60k
        auto sink_id = next_sink_operator_id();
1420
1.60k
        const int multi_cast_node_id = sink_id;
1421
1.60k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1422
        // one sink has multiple sources.
1423
1.60k
        std::vector<int> sources;
1424
6.21k
        for (int i = 0; i < sender_size; ++i) {
1425
4.60k
            auto source_id = next_operator_id();
1426
4.60k
            sources.push_back(source_id);
1427
4.60k
        }
1428
1429
1.60k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1430
1.60k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1431
6.21k
        for (int i = 0; i < sender_size; ++i) {
1432
4.60k
            auto new_pipeline = add_pipeline();
1433
            // use to exchange sink
1434
4.60k
            RowDescriptor* exchange_row_desc = nullptr;
1435
4.60k
            {
1436
4.60k
                const auto& tmp_row_desc =
1437
4.60k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1438
4.60k
                                ? RowDescriptor(state->desc_tbl(),
1439
4.60k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1440
4.60k
                                                         .output_tuple_id})
1441
4.60k
                                : row_desc;
1442
4.60k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1443
4.60k
            }
1444
4.60k
            auto source_id = sources[i];
1445
4.60k
            OperatorPtr source_op;
1446
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1447
4.60k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1448
4.60k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1449
4.60k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1450
4.60k
                    /*operator_id=*/source_id);
1451
4.60k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1452
4.60k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1453
            // 2. create and set sink operator of data stream sender for new pipeline
1454
1455
4.60k
            DataSinkOperatorPtr sink_op;
1456
4.60k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1457
4.60k
                    state, *exchange_row_desc, next_sink_operator_id(),
1458
4.60k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1459
4.60k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1460
1461
4.60k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1462
4.60k
            {
1463
4.60k
                TDataSink* t = pool->add(new TDataSink());
1464
4.60k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1465
4.60k
                RETURN_IF_ERROR(sink_op->init(*t));
1466
4.60k
            }
1467
1468
            // 3. set dependency dag
1469
4.60k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1470
4.60k
        }
1471
1.60k
        if (sources.empty()) {
1472
0
            return Status::InternalError("size of sources must be greater than 0");
1473
0
        }
1474
1.60k
        break;
1475
1.60k
    }
1476
1.60k
    case TDataSinkType::BLACKHOLE_SINK: {
1477
8
        if (!thrift_sink.__isset.blackhole_sink) {
1478
0
            return Status::InternalError("Missing blackhole sink.");
1479
0
        }
1480
1481
8
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1482
8
        break;
1483
8
    }
1484
78
    case TDataSinkType::TVF_TABLE_SINK: {
1485
78
        if (!thrift_sink.__isset.tvf_table_sink) {
1486
0
            return Status::InternalError("Missing TVF table sink.");
1487
0
        }
1488
78
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1489
78
                                                        output_exprs);
1490
78
        break;
1491
78
    }
1492
0
    default:
1493
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1494
394k
    }
1495
393k
    return Status::OK();
1496
394k
}
1497
1498
// NOLINTBEGIN(readability-function-size)
1499
// NOLINTBEGIN(readability-function-cognitive-complexity)
1500
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1501
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1502
                                                 PipelinePtr& cur_pipe, int parent_idx,
1503
                                                 int child_idx,
1504
                                                 const bool followed_by_shuffled_operator,
1505
                                                 const bool require_bucket_distribution,
1506
718k
                                                 OperatorPtr& cache_op) {
1507
718k
    std::vector<DataSinkOperatorPtr> sink_ops;
1508
718k
    Defer defer = Defer([&]() {
1509
718k
        if (op) {
1510
718k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1511
718k
        }
1512
718k
        for (auto& s : sink_ops) {
1513
215k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1514
215k
        }
1515
718k
    });
1516
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1517
    // Therefore, here we need to use a stack-like structure.
1518
718k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1519
718k
    std::stringstream error_msg;
1520
718k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1521
1522
718k
    bool fe_with_old_version = false;
1523
718k
    switch (tnode.node_type) {
1524
193k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1525
193k
        op = std::make_shared<OlapScanOperatorX>(
1526
193k
                pool, tnode, next_operator_id(), descs, _num_instances,
1527
193k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1528
193k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1529
193k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1530
193k
        break;
1531
193k
    }
1532
79
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1533
79
        DCHECK(_query_ctx != nullptr);
1534
79
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1535
79
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1536
79
                                                    _num_instances);
1537
79
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1538
79
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1539
79
        break;
1540
79
    }
1541
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1542
0
        if (config::enable_java_support) {
1543
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1544
0
                                                     _num_instances);
1545
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1546
0
        } else {
1547
0
            return Status::InternalError(
1548
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1549
0
                    "to true and restart be.");
1550
0
        }
1551
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1552
0
        break;
1553
0
    }
1554
14.3k
    case TPlanNodeType::FILE_SCAN_NODE: {
1555
14.3k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1556
14.3k
                                                 _num_instances);
1557
14.3k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1558
14.3k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1559
14.3k
        break;
1560
14.3k
    }
1561
136k
    case TPlanNodeType::EXCHANGE_NODE: {
1562
136k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1563
136k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1564
18.4E
                                  : 0;
1565
136k
        DCHECK_GT(num_senders, 0);
1566
136k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1567
136k
                                                       num_senders);
1568
136k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1569
136k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1570
136k
        break;
1571
136k
    }
1572
134k
    case TPlanNodeType::AGGREGATION_NODE: {
1573
134k
        if (tnode.agg_node.grouping_exprs.empty() &&
1574
134k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1575
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1576
0
                                         ": group by and output is empty");
1577
0
        }
1578
134k
        bool need_create_cache_op =
1579
134k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1580
134k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1581
10
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1582
10
            auto cache_source_id = next_operator_id();
1583
10
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1584
10
                                                        _params.fragment.query_cache_param);
1585
10
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1586
1587
10
            const auto downstream_pipeline_id = cur_pipe->id();
1588
10
            if (!_dag.contains(downstream_pipeline_id)) {
1589
10
                _dag.insert({downstream_pipeline_id, {}});
1590
10
            }
1591
10
            new_pipe = add_pipeline(cur_pipe);
1592
10
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1593
1594
10
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1595
10
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1596
10
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1597
10
            return Status::OK();
1598
10
        };
1599
134k
        const bool group_by_limit_opt =
1600
134k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1601
1602
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1603
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1604
134k
        const bool enable_spill = _runtime_state->enable_spill() &&
1605
134k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1606
134k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1607
134k
                                      tnode.agg_node.use_streaming_preaggregation &&
1608
134k
                                      !tnode.agg_node.grouping_exprs.empty();
1609
        // TODO: distinct streaming agg does not support spill.
1610
134k
        const bool can_use_distinct_streaming_agg =
1611
134k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1612
134k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1613
134k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1614
134k
                _params.query_options.enable_distinct_streaming_aggregation;
1615
1616
134k
        if (can_use_distinct_streaming_agg) {
1617
89.3k
            if (need_create_cache_op) {
1618
8
                PipelinePtr new_pipe;
1619
8
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1620
1621
8
                cache_op = op;
1622
8
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1623
8
                                                                     tnode, descs);
1624
8
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1625
8
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1626
8
                cur_pipe = new_pipe;
1627
89.3k
            } else {
1628
89.3k
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1629
89.3k
                                                                     tnode, descs);
1630
89.3k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1631
89.3k
            }
1632
89.3k
        } else if (is_streaming_agg) {
1633
1.66k
            if (need_create_cache_op) {
1634
0
                PipelinePtr new_pipe;
1635
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1636
0
                cache_op = op;
1637
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1638
0
                                                             descs);
1639
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1640
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1641
0
                cur_pipe = new_pipe;
1642
1.66k
            } else {
1643
1.66k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1644
1.66k
                                                             descs);
1645
1.66k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1646
1.66k
            }
1647
43.2k
        } else {
1648
            // create new pipeline to add query cache operator
1649
43.2k
            PipelinePtr new_pipe;
1650
43.2k
            if (need_create_cache_op) {
1651
2
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1652
2
                cache_op = op;
1653
2
            }
1654
1655
43.2k
            if (enable_spill) {
1656
118
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1657
118
                                                                     next_operator_id(), descs);
1658
43.0k
            } else {
1659
43.0k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1660
43.0k
            }
1661
43.2k
            if (need_create_cache_op) {
1662
2
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1663
2
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1664
2
                cur_pipe = new_pipe;
1665
43.2k
            } else {
1666
43.2k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1667
43.2k
            }
1668
1669
43.2k
            const auto downstream_pipeline_id = cur_pipe->id();
1670
43.2k
            if (!_dag.contains(downstream_pipeline_id)) {
1671
41.4k
                _dag.insert({downstream_pipeline_id, {}});
1672
41.4k
            }
1673
43.2k
            cur_pipe = add_pipeline(cur_pipe);
1674
43.2k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1675
1676
43.2k
            if (enable_spill) {
1677
118
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1678
118
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1679
43.0k
            } else {
1680
43.0k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1681
43.0k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1682
43.0k
            }
1683
43.2k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1684
43.2k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1685
43.2k
        }
1686
134k
        break;
1687
134k
    }
1688
134k
    case TPlanNodeType::BUCKETED_AGGREGATION_NODE: {
1689
69
        if (tnode.bucketed_agg_node.grouping_exprs.empty()) {
1690
0
            return Status::InternalError(
1691
0
                    "Bucketed aggregation node {} should not be used without group by keys",
1692
0
                    tnode.node_id);
1693
0
        }
1694
1695
        // Create source operator (goes on the current / downstream pipeline).
1696
69
        op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1697
69
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1698
1699
        // Create a new pipeline for the sink side.
1700
69
        const auto downstream_pipeline_id = cur_pipe->id();
1701
69
        if (!_dag.contains(downstream_pipeline_id)) {
1702
69
            _dag.insert({downstream_pipeline_id, {}});
1703
69
        }
1704
69
        cur_pipe = add_pipeline(cur_pipe);
1705
69
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1706
1707
        // Create sink operator.
1708
69
        sink_ops.push_back(std::make_shared<BucketedAggSinkOperatorX>(
1709
69
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1710
69
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1711
69
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1712
1713
        // Pre-register a single shared state for ALL instances so that every
1714
        // sink instance writes its per-instance hash table into the same
1715
        // BucketedAggSharedState and every source instance can merge across
1716
        // all of them.
1717
69
        {
1718
69
            auto shared_state = BucketedAggSharedState::create_shared();
1719
69
            shared_state->id = op->operator_id();
1720
69
            shared_state->related_op_ids.insert(op->operator_id());
1721
1722
473
            for (int i = 0; i < _num_instances; i++) {
1723
404
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1724
404
                                                             "BUCKETED_AGG_SINK_DEPENDENCY");
1725
404
                sink_dep->set_shared_state(shared_state.get());
1726
404
                shared_state->sink_deps.push_back(sink_dep);
1727
404
            }
1728
69
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1729
69
                                                     op->node_id(), "BUCKETED_AGG_SOURCE");
1730
69
            _op_id_to_shared_state.insert(
1731
69
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1732
69
        }
1733
69
        break;
1734
69
    }
1735
10.1k
    case TPlanNodeType::HASH_JOIN_NODE: {
1736
10.1k
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1737
10.1k
                                       tnode.hash_join_node.is_broadcast_join;
1738
10.1k
        const auto enable_spill = _runtime_state->enable_spill();
1739
10.1k
        if (enable_spill && !is_broadcast_join) {
1740
0
            auto tnode_ = tnode;
1741
0
            tnode_.runtime_filters.clear();
1742
0
            auto inner_probe_operator =
1743
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1744
1745
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1746
            // So here use `tnode_` which has no runtime filters.
1747
0
            auto probe_side_inner_sink_operator =
1748
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1749
1750
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1751
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1752
1753
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1754
0
                    pool, tnode_, next_operator_id(), descs);
1755
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1756
0
                                                inner_probe_operator);
1757
0
            op = std::move(probe_operator);
1758
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1759
1760
0
            const auto downstream_pipeline_id = cur_pipe->id();
1761
0
            if (!_dag.contains(downstream_pipeline_id)) {
1762
0
                _dag.insert({downstream_pipeline_id, {}});
1763
0
            }
1764
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1765
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1766
1767
0
            auto inner_sink_operator =
1768
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1769
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1770
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1771
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1772
1773
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1774
0
            sink_ops.push_back(std::move(sink_operator));
1775
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1776
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1777
1778
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1779
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1780
10.1k
        } else {
1781
10.1k
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1782
10.1k
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1783
1784
10.1k
            const auto downstream_pipeline_id = cur_pipe->id();
1785
10.1k
            if (!_dag.contains(downstream_pipeline_id)) {
1786
8.45k
                _dag.insert({downstream_pipeline_id, {}});
1787
8.45k
            }
1788
10.1k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1789
10.1k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1790
1791
10.1k
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1792
10.1k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1793
10.1k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1794
10.1k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1795
1796
10.1k
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1797
10.1k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1798
10.1k
        }
1799
10.1k
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1800
4.89k
            std::shared_ptr<HashJoinSharedState> shared_state =
1801
4.89k
                    HashJoinSharedState::create_shared(_num_instances);
1802
21.9k
            for (int i = 0; i < _num_instances; i++) {
1803
17.0k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1804
17.0k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1805
17.0k
                sink_dep->set_shared_state(shared_state.get());
1806
17.0k
                shared_state->sink_deps.push_back(sink_dep);
1807
17.0k
            }
1808
4.89k
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1809
4.89k
                                                     op->node_id(), "HASH_JOIN_PROBE");
1810
4.89k
            _op_id_to_shared_state.insert(
1811
4.89k
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1812
4.89k
        }
1813
10.1k
        break;
1814
10.1k
    }
1815
4.19k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1816
4.19k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1817
4.19k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1818
1819
4.19k
        const auto downstream_pipeline_id = cur_pipe->id();
1820
4.19k
        if (!_dag.contains(downstream_pipeline_id)) {
1821
3.94k
            _dag.insert({downstream_pipeline_id, {}});
1822
3.94k
        }
1823
4.19k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1824
4.19k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1825
1826
4.19k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1827
4.19k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1828
4.19k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1829
4.19k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1830
4.19k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1831
4.19k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1832
4.19k
        break;
1833
4.19k
    }
1834
52.6k
    case TPlanNodeType::UNION_NODE: {
1835
52.6k
        int child_count = tnode.num_children;
1836
52.6k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1837
52.6k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1838
1839
52.6k
        const auto downstream_pipeline_id = cur_pipe->id();
1840
52.6k
        if (!_dag.contains(downstream_pipeline_id)) {
1841
52.4k
            _dag.insert({downstream_pipeline_id, {}});
1842
52.4k
        }
1843
53.9k
        for (int i = 0; i < child_count; i++) {
1844
1.30k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1845
1.30k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1846
1.30k
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1847
1.30k
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1848
1.30k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1849
1.30k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1850
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1851
1.30k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1852
1.30k
        }
1853
52.6k
        break;
1854
52.6k
    }
1855
52.6k
    case TPlanNodeType::SORT_NODE: {
1856
40.4k
        const auto should_spill = _runtime_state->enable_spill() &&
1857
40.4k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1858
40.4k
        const bool use_local_merge =
1859
40.4k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1860
40.4k
        if (should_spill) {
1861
9
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1862
40.4k
        } else if (use_local_merge) {
1863
38.2k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1864
38.2k
                                                                 descs);
1865
38.2k
        } else {
1866
2.22k
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1867
2.22k
        }
1868
40.4k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1869
1870
40.4k
        const auto downstream_pipeline_id = cur_pipe->id();
1871
40.4k
        if (!_dag.contains(downstream_pipeline_id)) {
1872
40.4k
            _dag.insert({downstream_pipeline_id, {}});
1873
40.4k
        }
1874
40.4k
        cur_pipe = add_pipeline(cur_pipe);
1875
40.4k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1876
1877
40.4k
        if (should_spill) {
1878
9
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1879
9
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1880
40.4k
        } else {
1881
40.4k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1882
40.4k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1883
40.4k
        }
1884
40.4k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1885
40.4k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1886
40.4k
        break;
1887
40.4k
    }
1888
40.4k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1889
70
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1890
70
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1891
1892
70
        const auto downstream_pipeline_id = cur_pipe->id();
1893
70
        if (!_dag.contains(downstream_pipeline_id)) {
1894
70
            _dag.insert({downstream_pipeline_id, {}});
1895
70
        }
1896
70
        cur_pipe = add_pipeline(cur_pipe);
1897
70
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1898
1899
70
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1900
70
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1901
70
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1902
70
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1903
70
        break;
1904
70
    }
1905
1.81k
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1906
1.81k
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1907
1.81k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1908
1909
1.81k
        const auto downstream_pipeline_id = cur_pipe->id();
1910
1.81k
        if (!_dag.contains(downstream_pipeline_id)) {
1911
1.80k
            _dag.insert({downstream_pipeline_id, {}});
1912
1.80k
        }
1913
1.81k
        cur_pipe = add_pipeline(cur_pipe);
1914
1.81k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1915
1916
1.81k
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1917
1.81k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1918
1.81k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1919
1.81k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1920
1.81k
        break;
1921
1.81k
    }
1922
1.81k
    case TPlanNodeType::MATERIALIZATION_NODE: {
1923
1.26k
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1924
1.26k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1925
1.26k
        break;
1926
1.26k
    }
1927
1.26k
    case TPlanNodeType::INTERSECT_NODE: {
1928
168
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1929
168
                                                                      cur_pipe, sink_ops));
1930
168
        break;
1931
168
    }
1932
168
    case TPlanNodeType::EXCEPT_NODE: {
1933
159
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1934
159
                                                                       cur_pipe, sink_ops));
1935
159
        break;
1936
159
    }
1937
337
    case TPlanNodeType::REPEAT_NODE: {
1938
337
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1939
337
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1940
337
        break;
1941
337
    }
1942
918
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1943
918
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1944
918
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1945
918
        break;
1946
918
    }
1947
918
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1948
118
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1949
118
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1950
118
        break;
1951
118
    }
1952
1.58k
    case TPlanNodeType::EMPTY_SET_NODE: {
1953
1.58k
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1954
1.58k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1955
1.58k
        break;
1956
1.58k
    }
1957
1.58k
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1958
387
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1959
387
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1960
387
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1961
387
        break;
1962
387
    }
1963
1.82k
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1964
1.82k
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1965
1.82k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1966
1.82k
        break;
1967
1.82k
    }
1968
6.50k
    case TPlanNodeType::META_SCAN_NODE: {
1969
6.50k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1970
6.50k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1971
6.50k
        break;
1972
6.50k
    }
1973
6.50k
    case TPlanNodeType::SELECT_NODE: {
1974
1.58k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1975
1.58k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1976
1.58k
        break;
1977
1.58k
    }
1978
1.58k
    case TPlanNodeType::REC_CTE_NODE: {
1979
163
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1980
163
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1981
1982
163
        const auto downstream_pipeline_id = cur_pipe->id();
1983
163
        if (!_dag.contains(downstream_pipeline_id)) {
1984
159
            _dag.insert({downstream_pipeline_id, {}});
1985
159
        }
1986
1987
163
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1988
163
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1989
1990
163
        DataSinkOperatorPtr anchor_sink;
1991
163
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1992
163
                                                                  op->operator_id(), tnode, descs);
1993
163
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1994
163
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1995
163
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1996
1997
163
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1998
163
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1999
2000
163
        DataSinkOperatorPtr rec_sink;
2001
163
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
2002
163
                                                         tnode, descs);
2003
163
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
2004
163
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
2005
163
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
2006
2007
163
        break;
2008
163
    }
2009
2.18k
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
2010
2.18k
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
2011
2.18k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2012
2.18k
        break;
2013
2.18k
    }
2014
113k
    case TPlanNodeType::LOCAL_EXCHANGE_NODE: {
2015
113k
        op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs);
2016
        // The downstream pipeline (containing LocalExchangeSource) must have
2017
        // _num_instances tasks — matching BE-native _inherit_pipeline_properties
2018
        // which sets pipe_with_source.set_num_tasks(_num_instances).
2019
        // Without this, when the parent pipeline was reduced by a serial operator
2020
        // (e.g., serial Exchange with use_serial_exchange=true, or UNPARTITIONED
2021
        // Exchange), the downstream inherits the reduced num_tasks via
2022
        // add_pipeline(parent).  The deferred exchanger creates _num_instances
2023
        // channels but only fewer source tasks initialize mem_counters — the
2024
        // sink round-robins to all channels and crashes on uninitialized ones.
2025
113k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2026
        // Restore downstream pipeline's num_tasks (mirroring _inherit_pipeline_properties:
2027
        // downstream keeps _num_instances, upstream gets the serial/reduced count)
2028
113k
        cur_pipe->set_num_tasks(_num_instances);
2029
2030
113k
        const auto downstream_pipeline_id = cur_pipe->id();
2031
113k
        if (!_dag.contains(downstream_pipeline_id)) {
2032
109k
            _dag.insert({downstream_pipeline_id, {}});
2033
109k
        }
2034
113k
        cur_pipe = add_pipeline(cur_pipe);
2035
        // If this local exchange was inserted because of a serial scan (is_serial_operator),
2036
        // the upstream pipeline (cur_pipe) should have num_tasks=1 (only 1 scan task).
2037
        // We set this now so the exchanger is created with the correct sender count.
2038
        // Child operators added later (serial scan) will also set num_tasks=1, which is
2039
        // consistent with this.
2040
113k
        if (op->is_serial_operator() && _parallel_instances > 0) {
2041
0
            cur_pipe->set_num_tasks(_parallel_instances);
2042
0
        }
2043
113k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
2044
113k
        int num_partitions = 0;
2045
113k
        std::map<int, int> shuffle_id_to_instance_idx;
2046
113k
        auto partition_type = tnode.local_exchange_node.partition_type;
2047
113k
        switch (partition_type) {
2048
456
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
2049
456
            num_partitions = _params.num_buckets;
2050
456
            shuffle_id_to_instance_idx = _params.bucket_seq_to_instance_idx;
2051
456
            break;
2052
20.9k
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
2053
121k
            for (int i = 0; i < _num_instances; i++) {
2054
100k
                shuffle_id_to_instance_idx[i] = i;
2055
100k
            }
2056
20.9k
            num_partitions = _num_instances;
2057
20.9k
            break;
2058
5
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
2059
5
            num_partitions = _total_instances;
2060
5
            shuffle_id_to_instance_idx = _params.shuffle_idx_to_instance_idx;
2061
5
            break;
2062
91.9k
        default:
2063
91.9k
            break;
2064
113k
        }
2065
113k
        auto local_exchange_id = op->operator_id();
2066
113k
        auto sink_id = next_sink_operator_id();
2067
113k
        DataSinkOperatorPtr sink = std::make_shared<LocalExchangeSinkOperatorX>(
2068
113k
                sink_id, local_exchange_id, tnode, num_partitions, shuffle_id_to_instance_idx);
2069
113k
        sink_ops.push_back(sink);
2070
113k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
2071
113k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
2072
2073
        // For FE-planned local exchange, we need to:
2074
        // 1. Initialize the partitioner for hash shuffle types
2075
        // 2. Defer exchanger creation until after the full plan tree is built
2076
        //    (child operators like serial ExchangeNode may change cur_pipe->num_tasks())
2077
        // 3. Register shared state so pipeline tasks can find it
2078
113k
        RETURN_IF_ERROR(static_cast<LocalExchangeSinkOperatorX*>(cur_pipe->sink())
2079
113k
                                ->init_partitioner(_runtime_state.get()));
2080
2081
113k
        int free_blocks_limit =
2082
113k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
2083
113k
                        ? cast_set<int>(
2084
113k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
2085
18.4E
                        : 0;
2086
113k
        auto shared_state = LocalExchangeSharedState::create_shared(_num_instances);
2087
113k
        shared_state->create_source_dependencies(_num_instances, local_exchange_id,
2088
113k
                                                 local_exchange_id, "LOCAL_EXCHANGE_OPERATOR");
2089
113k
        shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
2090
113k
        _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
2091
        // Defer exchanger creation: sender count depends on final upstream num_tasks
2092
113k
        _deferred_exchangers.push_back({shared_state, cur_pipe, partition_type, num_partitions,
2093
113k
                                        free_blocks_limit, local_exchange_id, sink_id});
2094
113k
        break;
2095
113k
    }
2096
0
    default:
2097
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
2098
0
                                     print_plan_node_type(tnode.node_type));
2099
718k
    }
2100
718k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
2101
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
2102
0
        op->set_serial_operator();
2103
0
    }
2104
2105
718k
    return Status::OK();
2106
718k
}
2107
// NOLINTEND(readability-function-cognitive-complexity)
2108
// NOLINTEND(readability-function-size)
2109
2110
template <bool is_intersect>
2111
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
2112
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
2113
327
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2114
327
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2115
327
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2116
2117
327
    const auto downstream_pipeline_id = cur_pipe->id();
2118
327
    if (!_dag.contains(downstream_pipeline_id)) {
2119
302
        _dag.insert({downstream_pipeline_id, {}});
2120
302
    }
2121
2122
1.07k
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2123
750
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2124
750
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2125
2126
750
        if (child_id == 0) {
2127
327
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2128
327
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2129
423
        } else {
2130
423
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2131
423
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2132
423
        }
2133
750
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2134
750
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2135
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2136
750
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2137
750
    }
2138
2139
327
    return Status::OK();
2140
327
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
2113
168
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2114
168
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2115
168
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2116
2117
168
    const auto downstream_pipeline_id = cur_pipe->id();
2118
168
    if (!_dag.contains(downstream_pipeline_id)) {
2119
155
        _dag.insert({downstream_pipeline_id, {}});
2120
155
    }
2121
2122
585
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2123
417
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2124
417
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2125
2126
417
        if (child_id == 0) {
2127
168
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2128
168
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2129
249
        } else {
2130
249
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2131
249
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2132
249
        }
2133
417
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2134
417
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2135
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2136
417
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2137
417
    }
2138
2139
168
    return Status::OK();
2140
168
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
2113
159
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2114
159
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2115
159
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2116
2117
159
    const auto downstream_pipeline_id = cur_pipe->id();
2118
159
    if (!_dag.contains(downstream_pipeline_id)) {
2119
147
        _dag.insert({downstream_pipeline_id, {}});
2120
147
    }
2121
2122
492
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2123
333
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2124
333
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2125
2126
333
        if (child_id == 0) {
2127
159
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2128
159
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2129
174
        } else {
2130
174
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2131
174
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2132
174
        }
2133
333
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2134
333
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2135
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2136
333
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2137
333
    }
2138
2139
159
    return Status::OK();
2140
159
}
2141
2142
392k
Status PipelineFragmentContext::submit() {
2143
392k
    if (_submitted) {
2144
0
        return Status::InternalError("submitted");
2145
0
    }
2146
392k
    _submitted = true;
2147
2148
392k
    int submit_tasks = 0;
2149
392k
    Status st;
2150
392k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
2151
1.08M
    for (auto& task : _tasks) {
2152
1.75M
        for (auto& t : task) {
2153
1.75M
            st = scheduler->submit(t.first);
2154
1.75M
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
2155
1.75M
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
2156
1.75M
            if (!st) {
2157
0
                cancel(Status::InternalError("submit context to executor fail"));
2158
0
                std::lock_guard<std::mutex> l(_task_mutex);
2159
0
                _total_tasks = submit_tasks;
2160
0
                break;
2161
0
            }
2162
1.75M
            submit_tasks++;
2163
1.75M
        }
2164
1.08M
    }
2165
392k
    if (!st.ok()) {
2166
0
        bool need_remove = false;
2167
0
        {
2168
0
            std::lock_guard<std::mutex> l(_task_mutex);
2169
0
            if (_closed_tasks >= _total_tasks) {
2170
0
                need_remove = _close_fragment_instance();
2171
0
            }
2172
0
        }
2173
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2174
0
        if (need_remove) {
2175
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2176
0
        }
2177
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
2178
0
                                     BackendOptions::get_localhost());
2179
392k
    } else {
2180
392k
        return st;
2181
392k
    }
2182
392k
}
2183
2184
0
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
2185
0
    if (_runtime_state->enable_profile()) {
2186
0
        std::stringstream ss;
2187
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2188
0
            runtime_profile_ptr->pretty_print(&ss);
2189
0
        }
2190
2191
0
        if (_runtime_state->load_channel_profile()) {
2192
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2193
0
        }
2194
2195
0
        auto profile_str =
2196
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
2197
0
                            this->_fragment_id, extra_info, ss.str());
2198
0
        LOG_LONG_STRING(INFO, profile_str);
2199
0
    }
2200
0
}
2201
// If all pipeline tasks binded to the fragment instance are finished, then we could
2202
// close the fragment instance.
2203
// Returns true if the caller should call remove_pipeline_context() **after** releasing
2204
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
2205
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
2206
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
2207
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
2208
// (via debug_string()).
2209
393k
bool PipelineFragmentContext::_close_fragment_instance() {
2210
393k
    if (_is_fragment_instance_closed) {
2211
0
        return false;
2212
0
    }
2213
393k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
2214
393k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
2215
393k
    if (!_need_notify_close) {
2216
390k
        auto st = send_report(true);
2217
390k
        if (!st) {
2218
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
2219
0
                                        print_id(_query_id), _fragment_id, st.to_string());
2220
0
        }
2221
390k
    }
2222
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
2223
    // Since stream load does not have someting like coordinator on FE, so
2224
    // backend can not report profile to FE, ant its profile can not be shown
2225
    // in the same way with other query. So we print the profile content to info log.
2226
2227
393k
    if (_runtime_state->enable_profile() &&
2228
393k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
2229
2.64k
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
2230
2.64k
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
2231
0
        std::stringstream ss;
2232
        // Compute the _local_time_percent before pretty_print the runtime_profile
2233
        // Before add this operation, the print out like that:
2234
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
2235
        // After add the operation, the print out like that:
2236
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
2237
        // We can easily know the exec node execute time without child time consumed.
2238
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2239
0
            runtime_profile_ptr->pretty_print(&ss);
2240
0
        }
2241
2242
0
        if (_runtime_state->load_channel_profile()) {
2243
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2244
0
        }
2245
2246
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
2247
0
    }
2248
2249
393k
    if (_query_ctx->enable_profile()) {
2250
2.64k
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
2251
2.64k
                                         collect_realtime_load_channel_profile());
2252
2.64k
    }
2253
2254
    // Return whether the caller needs to remove from the pipeline map.
2255
    // The caller must do this after releasing _task_mutex.
2256
393k
    return !_need_notify_close;
2257
393k
}
2258
2259
1.75M
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
2260
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
2261
1.75M
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
2262
1.75M
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
2263
614k
        if (_dag.contains(pipeline_id)) {
2264
263k
            for (auto dep : _dag[pipeline_id]) {
2265
221k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
2266
221k
            }
2267
263k
        }
2268
614k
    }
2269
1.75M
    bool need_remove = false;
2270
1.75M
    {
2271
1.75M
        std::lock_guard<std::mutex> l(_task_mutex);
2272
1.75M
        ++_closed_tasks;
2273
        // Update query-level finished task progress in real time.
2274
1.75M
        _query_ctx->inc_finished_task_num();
2275
1.75M
        if (_closed_tasks >= _total_tasks) {
2276
393k
            need_remove = _close_fragment_instance();
2277
393k
        }
2278
1.75M
    }
2279
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2280
1.75M
    if (need_remove) {
2281
390k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2282
390k
    }
2283
1.75M
}
2284
2285
49.5k
std::string PipelineFragmentContext::get_load_error_url() {
2286
49.5k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
2287
0
        return to_load_error_http_path(str);
2288
0
    }
2289
120k
    for (auto& tasks : _tasks) {
2290
190k
        for (auto& task : tasks) {
2291
190k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
2292
193
                return to_load_error_http_path(str);
2293
193
            }
2294
190k
        }
2295
120k
    }
2296
49.3k
    return "";
2297
49.5k
}
2298
2299
49.5k
std::string PipelineFragmentContext::get_first_error_msg() {
2300
49.5k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
2301
0
        return str;
2302
0
    }
2303
120k
    for (auto& tasks : _tasks) {
2304
189k
        for (auto& task : tasks) {
2305
189k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
2306
193
                return str;
2307
193
            }
2308
189k
        }
2309
120k
    }
2310
49.3k
    return "";
2311
49.5k
}
2312
2313
0
std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const {
2314
0
    std::stringstream url;
2315
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
2316
0
        << "/api/_download_load?"
2317
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
2318
0
    return url.str();
2319
0
}
2320
2321
43.8k
void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) {
2322
43.8k
    DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
2323
43.8k
        int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
2324
43.8k
        LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
2325
43.8k
        std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
2326
43.8k
        LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
2327
43.8k
    });
2328
2329
43.8k
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
2330
43.8k
    if (req.coord_addr.hostname == "external") {
2331
        // External query (flink/spark read tablets) not need to report to FE.
2332
0
        return;
2333
0
    }
2334
43.8k
    int callback_retries = 10;
2335
43.8k
    const int sleep_ms = 1000;
2336
43.8k
    Status exec_status = req.status;
2337
43.8k
    Status coord_status;
2338
43.8k
    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
2339
43.8k
    do {
2340
43.8k
        coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
2341
43.8k
                                                            req.coord_addr, &coord_status);
2342
43.8k
        if (!coord_status.ok()) {
2343
0
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
2344
0
        }
2345
43.8k
    } while (!coord_status.ok() && callback_retries-- > 0);
2346
2347
43.8k
    if (!coord_status.ok()) {
2348
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
2349
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
2350
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
2351
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
2352
0
        return;
2353
0
    }
2354
2355
43.8k
    TReportExecStatusParams params;
2356
43.8k
    params.protocol_version = FrontendServiceVersion::V1;
2357
43.8k
    params.__set_query_id(req.query_id);
2358
43.8k
    params.__set_backend_num(req.backend_num);
2359
43.8k
    params.__set_fragment_instance_id(req.fragment_instance_id);
2360
43.8k
    params.__set_fragment_id(req.fragment_id);
2361
43.8k
    params.__set_status(exec_status.to_thrift());
2362
43.8k
    params.__set_done(req.done);
2363
43.8k
    params.__set_query_type(req.runtime_state->query_type());
2364
43.8k
    params.__isset.profile = false;
2365
2366
43.8k
    DCHECK(req.runtime_state != nullptr);
2367
2368
43.8k
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
2369
39.2k
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2370
39.2k
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2371
39.2k
    } else {
2372
4.55k
        DCHECK(!req.runtime_states.empty());
2373
4.55k
        if (!req.runtime_state->output_files().empty()) {
2374
0
            params.__isset.delta_urls = true;
2375
0
            for (auto& it : req.runtime_state->output_files()) {
2376
0
                params.delta_urls.push_back(_to_http_path(it));
2377
0
            }
2378
0
        }
2379
4.55k
        if (!params.delta_urls.empty()) {
2380
0
            params.__isset.delta_urls = true;
2381
0
        }
2382
4.55k
    }
2383
2384
43.8k
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
2385
43.8k
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
2386
43.8k
    static std::string s_unselected_rows = "unselected.rows";
2387
43.8k
    int64_t num_rows_load_success = 0;
2388
43.8k
    int64_t num_rows_load_filtered = 0;
2389
43.8k
    int64_t num_rows_load_unselected = 0;
2390
43.8k
    if (req.runtime_state->num_rows_load_total() > 0 ||
2391
43.8k
        req.runtime_state->num_rows_load_filtered() > 0 ||
2392
43.8k
        req.runtime_state->num_finished_range() > 0) {
2393
0
        params.__isset.load_counters = true;
2394
2395
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
2396
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
2397
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
2398
0
        params.__isset.fragment_instance_reports = true;
2399
0
        TFragmentInstanceReport t;
2400
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
2401
0
        t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
2402
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2403
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2404
0
        params.fragment_instance_reports.push_back(t);
2405
43.8k
    } else if (!req.runtime_states.empty()) {
2406
136k
        for (auto* rs : req.runtime_states) {
2407
136k
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
2408
136k
                rs->num_finished_range() > 0) {
2409
34.8k
                params.__isset.load_counters = true;
2410
34.8k
                num_rows_load_success += rs->num_rows_load_success();
2411
34.8k
                num_rows_load_filtered += rs->num_rows_load_filtered();
2412
34.8k
                num_rows_load_unselected += rs->num_rows_load_unselected();
2413
34.8k
                params.__isset.fragment_instance_reports = true;
2414
34.8k
                TFragmentInstanceReport t;
2415
34.8k
                t.__set_fragment_instance_id(rs->fragment_instance_id());
2416
34.8k
                t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
2417
34.8k
                t.__set_loaded_rows(rs->num_rows_load_total());
2418
34.8k
                t.__set_loaded_bytes(rs->num_bytes_load_total());
2419
34.8k
                params.fragment_instance_reports.push_back(t);
2420
34.8k
            }
2421
136k
        }
2422
43.8k
    }
2423
43.8k
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
2424
43.8k
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
2425
43.8k
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
2426
2427
43.8k
    if (!req.load_error_url.empty()) {
2428
175
        params.__set_tracking_url(req.load_error_url);
2429
175
    }
2430
43.8k
    if (!req.first_error_msg.empty()) {
2431
175
        params.__set_first_error_msg(req.first_error_msg);
2432
175
    }
2433
136k
    for (auto* rs : req.runtime_states) {
2434
136k
        if (rs->wal_id() > 0) {
2435
111
            params.__set_txn_id(rs->wal_id());
2436
111
            params.__set_label(rs->import_label());
2437
111
        }
2438
136k
    }
2439
43.8k
    if (!req.runtime_state->export_output_files().empty()) {
2440
0
        params.__isset.export_files = true;
2441
0
        params.export_files = req.runtime_state->export_output_files();
2442
43.8k
    } else if (!req.runtime_states.empty()) {
2443
136k
        for (auto* rs : req.runtime_states) {
2444
136k
            if (!rs->export_output_files().empty()) {
2445
0
                params.__isset.export_files = true;
2446
0
                params.export_files.insert(params.export_files.end(),
2447
0
                                           rs->export_output_files().begin(),
2448
0
                                           rs->export_output_files().end());
2449
0
            }
2450
136k
        }
2451
43.8k
    }
2452
43.8k
    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
2453
0
        params.__isset.commitInfos = true;
2454
0
        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
2455
43.8k
    } else if (!req.runtime_states.empty()) {
2456
136k
        for (auto* rs : req.runtime_states) {
2457
136k
            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
2458
26.8k
                params.__isset.commitInfos = true;
2459
26.8k
                params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
2460
26.8k
            }
2461
136k
        }
2462
43.8k
    }
2463
43.8k
    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
2464
0
        params.__isset.errorTabletInfos = true;
2465
0
        params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
2466
43.8k
    } else if (!req.runtime_states.empty()) {
2467
136k
        for (auto* rs : req.runtime_states) {
2468
136k
            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
2469
0
                params.__isset.errorTabletInfos = true;
2470
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
2471
0
                                               rs_eti.end());
2472
0
            }
2473
136k
        }
2474
43.8k
    }
2475
43.8k
    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
2476
0
        params.__isset.hive_partition_updates = true;
2477
0
        params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
2478
0
                                             hpu.end());
2479
43.8k
    } else if (!req.runtime_states.empty()) {
2480
136k
        for (auto* rs : req.runtime_states) {
2481
136k
            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
2482
1.10k
                params.__isset.hive_partition_updates = true;
2483
1.10k
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
2484
1.10k
                                                     rs_hpu.begin(), rs_hpu.end());
2485
1.10k
            }
2486
136k
        }
2487
43.8k
    }
2488
43.8k
    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
2489
0
        params.__isset.iceberg_commit_datas = true;
2490
0
        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
2491
0
                                           icd.end());
2492
43.8k
    } else if (!req.runtime_states.empty()) {
2493
136k
        for (auto* rs : req.runtime_states) {
2494
136k
            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
2495
1.04k
                params.__isset.iceberg_commit_datas = true;
2496
1.04k
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
2497
1.04k
                                                   rs_icd.begin(), rs_icd.end());
2498
1.04k
            }
2499
136k
        }
2500
43.8k
    }
2501
2502
43.8k
    if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
2503
0
        params.__isset.mc_commit_datas = true;
2504
0
        params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
2505
43.8k
    } else if (!req.runtime_states.empty()) {
2506
136k
        for (auto* rs : req.runtime_states) {
2507
136k
            if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
2508
0
                params.__isset.mc_commit_datas = true;
2509
0
                params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
2510
0
                                              rs_mcd.end());
2511
0
            }
2512
136k
        }
2513
43.8k
    }
2514
2515
43.8k
    req.runtime_state->get_unreported_errors(&(params.error_log));
2516
43.8k
    params.__isset.error_log = (!params.error_log.empty());
2517
2518
43.8k
    if (_exec_env->cluster_info()->backend_id != 0) {
2519
43.8k
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
2520
43.8k
    }
2521
2522
43.8k
    TReportExecStatusResult res;
2523
43.8k
    Status rpc_status;
2524
2525
43.8k
    VLOG_DEBUG << "reportExecStatus params is "
2526
11
               << apache::thrift::ThriftDebugString(params).c_str();
2527
43.8k
    if (!exec_status.ok()) {
2528
1.61k
        LOG(WARNING) << "report error status: " << exec_status.msg()
2529
1.61k
                     << " to coordinator: " << req.coord_addr
2530
1.61k
                     << ", query id: " << print_id(req.query_id);
2531
1.61k
    }
2532
43.8k
    try {
2533
43.8k
        try {
2534
43.8k
            (*coord)->reportExecStatus(res, params);
2535
43.8k
        } catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) {
2536
#ifndef ADDRESS_SANITIZER
2537
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
2538
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
2539
                         << req.coord_addr << ", err: " << e.what();
2540
#endif
2541
0
            rpc_status = coord->reopen();
2542
2543
0
            if (!rpc_status.ok()) {
2544
0
                req.cancel_fn(rpc_status);
2545
0
                return;
2546
0
            }
2547
0
            (*coord)->reportExecStatus(res, params);
2548
0
        }
2549
2550
43.8k
        rpc_status = Status::create<false>(res.status);
2551
43.8k
    } catch (apache::thrift::TException& e) {
2552
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
2553
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
2554
0
    }
2555
2556
43.8k
    if (!rpc_status.ok()) {
2557
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
2558
0
                 print_id(req.query_id), rpc_status.to_string());
2559
0
        req.cancel_fn(rpc_status);
2560
0
    }
2561
43.8k
}
2562
2563
394k
Status PipelineFragmentContext::send_report(bool done) {
2564
394k
    Status exec_status = _query_ctx->exec_status();
2565
    // If plan is done successfully, but _is_report_success is false,
2566
    // no need to send report.
2567
    // Load will set _is_report_success to true because load wants to know
2568
    // the process.
2569
394k
    if (!_is_report_success && done && exec_status.ok()) {
2570
350k
        return Status::OK();
2571
350k
    }
2572
2573
    // If both _is_report_success and _is_report_on_cancel are false,
2574
    // which means no matter query is success or failed, no report is needed.
2575
    // This may happen when the query limit reached and
2576
    // a internal cancellation being processed
2577
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
2578
    // be set to false, to avoid sending fault report to FE.
2579
44.0k
    if (!_is_report_success && !_is_report_on_cancel) {
2580
194
        if (done) {
2581
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
2582
194
            return Status::OK();
2583
194
        }
2584
0
        return Status::NeedSendAgain("");
2585
194
    }
2586
2587
43.8k
    std::vector<RuntimeState*> runtime_states;
2588
2589
95.0k
    for (auto& tasks : _tasks) {
2590
136k
        for (auto& task : tasks) {
2591
136k
            runtime_states.push_back(task.second.get());
2592
136k
        }
2593
95.0k
    }
2594
2595
43.8k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
2596
43.8k
                                        ? get_load_error_url()
2597
43.8k
                                        : _query_ctx->get_load_error_url();
2598
43.8k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2599
43.8k
                                          ? get_first_error_msg()
2600
43.8k
                                          : _query_ctx->get_first_error_msg();
2601
2602
43.8k
    ReportStatusRequest req {.status = exec_status,
2603
43.8k
                             .runtime_states = runtime_states,
2604
43.8k
                             .done = done || !exec_status.ok(),
2605
43.8k
                             .coord_addr = _query_ctx->coord_addr,
2606
43.8k
                             .query_id = _query_id,
2607
43.8k
                             .fragment_id = _fragment_id,
2608
43.8k
                             .fragment_instance_id = TUniqueId(),
2609
43.8k
                             .backend_num = -1,
2610
43.8k
                             .runtime_state = _runtime_state.get(),
2611
43.8k
                             .load_error_url = load_eror_url,
2612
43.8k
                             .first_error_msg = first_error_msg,
2613
43.8k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2614
43.8k
    auto ctx = std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this());
2615
43.8k
    return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() {
2616
43.8k
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
2617
43.8k
        _coordinator_callback(req);
2618
43.8k
        if (!req.done) {
2619
4.49k
            ctx->refresh_next_report_time();
2620
4.49k
        }
2621
43.8k
    });
2622
44.0k
}
2623
2624
0
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2625
0
    size_t res = 0;
2626
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2627
    // here to traverse the vector.
2628
0
    for (const auto& task_instances : _tasks) {
2629
0
        for (const auto& task : task_instances) {
2630
0
            if (task.first->is_running()) {
2631
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2632
0
                                      << " is running, task: " << (void*)task.first.get()
2633
0
                                      << ", is_running: " << task.first->is_running();
2634
0
                *has_running_task = true;
2635
0
                return 0;
2636
0
            }
2637
2638
0
            size_t revocable_size = task.first->get_revocable_size();
2639
0
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2640
0
                res += revocable_size;
2641
0
            }
2642
0
        }
2643
0
    }
2644
0
    return res;
2645
0
}
2646
2647
0
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2648
0
    std::vector<PipelineTask*> revocable_tasks;
2649
0
    for (const auto& task_instances : _tasks) {
2650
0
        for (const auto& task : task_instances) {
2651
0
            size_t revocable_size_ = task.first->get_revocable_size();
2652
2653
0
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2654
0
                revocable_tasks.emplace_back(task.first.get());
2655
0
            }
2656
0
        }
2657
0
    }
2658
0
    return revocable_tasks;
2659
0
}
2660
2661
191
std::string PipelineFragmentContext::debug_string() {
2662
191
    std::lock_guard<std::mutex> l(_task_mutex);
2663
191
    fmt::memory_buffer debug_string_buffer;
2664
191
    fmt::format_to(debug_string_buffer,
2665
191
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2666
191
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2667
191
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2668
668
    for (size_t j = 0; j < _tasks.size(); j++) {
2669
477
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2670
1.53k
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2671
1.05k
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2672
1.05k
                           _tasks[j][i].first->debug_string());
2673
1.05k
        }
2674
477
    }
2675
2676
191
    return fmt::to_string(debug_string_buffer);
2677
191
}
2678
2679
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2680
2.64k
PipelineFragmentContext::collect_realtime_profile() const {
2681
2.64k
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2682
2683
    // we do not have mutex to protect pipeline_id_to_profile
2684
    // so we need to make sure this funciton is invoked after fragment context
2685
    // has already been prepared.
2686
2.64k
    if (!_prepared) {
2687
0
        std::string msg =
2688
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2689
0
        DCHECK(false) << msg;
2690
0
        LOG_ERROR(msg);
2691
0
        return res;
2692
0
    }
2693
2694
    // Make sure first profile is fragment level profile
2695
2.64k
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2696
2.64k
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2697
2.64k
    res.push_back(fragment_profile);
2698
2699
    // pipeline_id_to_profile is initialized in prepare stage
2700
5.28k
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2701
5.28k
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2702
5.28k
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2703
5.28k
        res.push_back(profile_ptr);
2704
5.28k
    }
2705
2706
2.64k
    return res;
2707
2.64k
}
2708
2709
std::shared_ptr<TRuntimeProfileTree>
2710
2.64k
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2711
    // we do not have mutex to protect pipeline_id_to_profile
2712
    // so we need to make sure this funciton is invoked after fragment context
2713
    // has already been prepared.
2714
2.64k
    if (!_prepared) {
2715
0
        std::string msg =
2716
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2717
0
        DCHECK(false) << msg;
2718
0
        LOG_ERROR(msg);
2719
0
        return nullptr;
2720
0
    }
2721
2722
7.90k
    for (const auto& tasks : _tasks) {
2723
17.0k
        for (const auto& task : tasks) {
2724
17.0k
            if (task.second->load_channel_profile() == nullptr) {
2725
0
                continue;
2726
0
            }
2727
2728
17.0k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2729
2730
17.0k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2731
17.0k
                                                           _runtime_state->profile_level());
2732
17.0k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2733
17.0k
        }
2734
7.90k
    }
2735
2736
2.64k
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2737
2.64k
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2738
2.64k
                                                      _runtime_state->profile_level());
2739
2.64k
    return load_channel_profile;
2740
2.64k
}
2741
2742
// Collect runtime filter IDs registered by all tasks in this PFC.
2743
// Used during recursive CTE stage transitions to know which filters to deregister
2744
// before creating the new PFC for the next recursion round.
2745
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2746
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2747
// the vector sizes do not change, and individual RuntimeState filter sets are
2748
// written only during open() which has completed by the time we reach rerun.
2749
3.50k
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2750
3.50k
    std::set<int> result;
2751
5.71k
    for (const auto& _task : _tasks) {
2752
9.79k
        for (const auto& task : _task) {
2753
9.79k
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2754
9.79k
            result.merge(set);
2755
9.79k
        }
2756
5.71k
    }
2757
3.50k
    if (_runtime_state) {
2758
3.50k
        auto set = _runtime_state->get_deregister_runtime_filter();
2759
3.50k
        result.merge(set);
2760
3.50k
    }
2761
3.50k
    return result;
2762
3.50k
}
2763
2764
395k
void PipelineFragmentContext::_release_resource() {
2765
395k
    std::lock_guard<std::mutex> l(_task_mutex);
2766
    // The memory released by the query end is recorded in the query mem tracker.
2767
395k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2768
395k
    auto st = _query_ctx->exec_status();
2769
1.08M
    for (auto& _task : _tasks) {
2770
1.08M
        if (!_task.empty()) {
2771
1.08M
            _call_back(_task.front().first->runtime_state(), &st);
2772
1.08M
        }
2773
1.08M
    }
2774
395k
    _tasks.clear();
2775
395k
    _dag.clear();
2776
395k
    _pip_id_to_pipeline.clear();
2777
395k
    _pipelines.clear();
2778
395k
    _sink.reset();
2779
395k
    _root_op.reset();
2780
395k
    _runtime_filter_mgr_map.clear();
2781
395k
    _op_id_to_shared_state.clear();
2782
395k
}
2783
2784
} // namespace doris