Coverage Report

Created: 2026-06-27 16:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/FrontendService.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/PlanNodes_types.h>
25
#include <pthread.h>
26
27
#include <algorithm>
28
#include <cstdlib>
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <fmt/format.h>
31
#include <thrift/Thrift.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
#include <thrift/transport/TTransportException.h>
34
35
#include <chrono> // IWYU pragma: keep
36
#include <map>
37
#include <memory>
38
#include <ostream>
39
#include <utility>
40
41
#include "cloud/config.h"
42
#include "common/cast_set.h"
43
#include "common/config.h"
44
#include "common/exception.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "exec/exchange/local_exchange_sink_operator.h"
48
#include "exec/exchange/local_exchange_source_operator.h"
49
#include "exec/exchange/local_exchanger.h"
50
#include "exec/exchange/vdata_stream_mgr.h"
51
#include "exec/operator/aggregation_sink_operator.h"
52
#include "exec/operator/aggregation_source_operator.h"
53
#include "exec/operator/analytic_sink_operator.h"
54
#include "exec/operator/analytic_source_operator.h"
55
#include "exec/operator/assert_num_rows_operator.h"
56
#include "exec/operator/blackhole_sink_operator.h"
57
#include "exec/operator/bucketed_aggregation_sink_operator.h"
58
#include "exec/operator/bucketed_aggregation_source_operator.h"
59
#include "exec/operator/cache_sink_operator.h"
60
#include "exec/operator/cache_source_operator.h"
61
#include "exec/operator/datagen_operator.h"
62
#include "exec/operator/dict_sink_operator.h"
63
#include "exec/operator/distinct_streaming_aggregation_operator.h"
64
#include "exec/operator/empty_set_operator.h"
65
#include "exec/operator/exchange_sink_operator.h"
66
#include "exec/operator/exchange_source_operator.h"
67
#include "exec/operator/file_scan_operator.h"
68
#include "exec/operator/group_commit_block_sink_operator.h"
69
#include "exec/operator/group_commit_scan_operator.h"
70
#include "exec/operator/hashjoin_build_sink.h"
71
#include "exec/operator/hashjoin_probe_operator.h"
72
#include "exec/operator/hive_table_sink_operator.h"
73
#include "exec/operator/iceberg_delete_sink_operator.h"
74
#include "exec/operator/iceberg_merge_sink_operator.h"
75
#include "exec/operator/iceberg_table_sink_operator.h"
76
#include "exec/operator/jdbc_scan_operator.h"
77
#include "exec/operator/jdbc_table_sink_operator.h"
78
#include "exec/operator/local_merge_sort_source_operator.h"
79
#include "exec/operator/materialization_opertor.h"
80
#include "exec/operator/maxcompute_table_sink_operator.h"
81
#include "exec/operator/memory_scratch_sink_operator.h"
82
#include "exec/operator/meta_scan_operator.h"
83
#include "exec/operator/multi_cast_data_stream_sink.h"
84
#include "exec/operator/multi_cast_data_stream_source.h"
85
#include "exec/operator/nested_loop_join_build_operator.h"
86
#include "exec/operator/nested_loop_join_probe_operator.h"
87
#include "exec/operator/olap_scan_operator.h"
88
#include "exec/operator/olap_table_sink_operator.h"
89
#include "exec/operator/olap_table_sink_v2_operator.h"
90
#include "exec/operator/partition_sort_sink_operator.h"
91
#include "exec/operator/partition_sort_source_operator.h"
92
#include "exec/operator/partitioned_aggregation_sink_operator.h"
93
#include "exec/operator/partitioned_aggregation_source_operator.h"
94
#include "exec/operator/partitioned_hash_join_probe_operator.h"
95
#include "exec/operator/partitioned_hash_join_sink_operator.h"
96
#include "exec/operator/rec_cte_anchor_sink_operator.h"
97
#include "exec/operator/rec_cte_scan_operator.h"
98
#include "exec/operator/rec_cte_sink_operator.h"
99
#include "exec/operator/rec_cte_source_operator.h"
100
#include "exec/operator/repeat_operator.h"
101
#include "exec/operator/result_file_sink_operator.h"
102
#include "exec/operator/result_sink_operator.h"
103
#include "exec/operator/schema_scan_operator.h"
104
#include "exec/operator/select_operator.h"
105
#include "exec/operator/set_probe_sink_operator.h"
106
#include "exec/operator/set_sink_operator.h"
107
#include "exec/operator/set_source_operator.h"
108
#include "exec/operator/sort_sink_operator.h"
109
#include "exec/operator/sort_source_operator.h"
110
#include "exec/operator/spill_iceberg_table_sink_operator.h"
111
#include "exec/operator/spill_sort_sink_operator.h"
112
#include "exec/operator/spill_sort_source_operator.h"
113
#include "exec/operator/streaming_aggregation_operator.h"
114
#include "exec/operator/table_function_operator.h"
115
#include "exec/operator/tvf_table_sink_operator.h"
116
#include "exec/operator/union_sink_operator.h"
117
#include "exec/operator/union_source_operator.h"
118
#include "exec/pipeline/dependency.h"
119
#include "exec/pipeline/pipeline_task.h"
120
#include "exec/pipeline/task_scheduler.h"
121
#include "exec/runtime_filter/runtime_filter_mgr.h"
122
#include "exec/sort/topn_sorter.h"
123
#include "exec/spill/spill_file.h"
124
#include "io/fs/stream_load_pipe.h"
125
#include "load/stream_load/new_load_stream_mgr.h"
126
#include "runtime/exec_env.h"
127
#include "runtime/fragment_mgr.h"
128
#include "runtime/result_buffer_mgr.h"
129
#include "runtime/runtime_state.h"
130
#include "runtime/thread_context.h"
131
#include "service/backend_options.h"
132
#include "util/client_cache.h"
133
#include "util/countdown_latch.h"
134
#include "util/debug_util.h"
135
#include "util/network_util.h"
136
#include "util/uid_util.h"
137
138
namespace doris {
139
PipelineFragmentContext::PipelineFragmentContext(
140
        TUniqueId query_id, const TPipelineFragmentParams& request,
141
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
142
        const std::function<void(RuntimeState*, Status*)>& call_back)
143
330k
        : _query_id(std::move(query_id)),
144
330k
          _fragment_id(request.fragment_id),
145
330k
          _exec_env(exec_env),
146
330k
          _query_ctx(std::move(query_ctx)),
147
330k
          _call_back(call_back),
148
330k
          _is_report_on_cancel(true),
149
330k
          _params(request),
150
330k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
151
330k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
152
330k
                                                               : false) {
153
330k
    _fragment_watcher.start();
154
330k
}
155
156
330k
PipelineFragmentContext::~PipelineFragmentContext() {
157
330k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
158
330k
            .tag("query_id", print_id(_query_id))
159
330k
            .tag("fragment_id", _fragment_id);
160
330k
    _release_resource();
161
330k
    {
162
        // The memory released by the query end is recorded in the query mem tracker.
163
330k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
164
330k
        _runtime_state.reset();
165
330k
        _query_ctx.reset();
166
330k
    }
167
330k
}
168
169
37
bool PipelineFragmentContext::is_timeout(timespec now) const {
170
37
    if (_timeout <= 0) {
171
0
        return false;
172
0
    }
173
37
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
174
37
}
175
176
// notify_close() transitions the PFC from "waiting for external close notification" to
177
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
178
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
179
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
180
9.04k
bool PipelineFragmentContext::notify_close() {
181
9.04k
    bool all_closed = false;
182
9.04k
    bool need_remove = false;
183
9.04k
    {
184
9.04k
        std::lock_guard<std::mutex> l(_task_mutex);
185
9.04k
        if (_closed_tasks >= _total_tasks) {
186
3.71k
            if (_need_notify_close) {
187
                // Fragment was cancelled and waiting for notify to close.
188
                // Record that we need to remove from fragment mgr, but do it
189
                // after releasing _task_mutex to avoid ABBA deadlock with
190
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
191
                // first, then _task_mutex via debug_string()).
192
3.67k
                need_remove = true;
193
3.67k
            }
194
3.71k
            all_closed = true;
195
3.71k
        }
196
        // make fragment release by self after cancel
197
9.04k
        _need_notify_close = false;
198
9.04k
    }
199
9.04k
    if (need_remove) {
200
3.67k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
201
3.67k
    }
202
9.04k
    return all_closed;
203
9.04k
}
204
205
// Must not add lock in this method. Because it will call query ctx cancel. And
206
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
207
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
208
// There maybe dead lock.
209
5.33k
void PipelineFragmentContext::cancel(const Status reason) {
210
5.33k
    LOG_INFO("PipelineFragmentContext::cancel")
211
5.33k
            .tag("query_id", print_id(_query_id))
212
5.33k
            .tag("fragment_id", _fragment_id)
213
5.33k
            .tag("reason", reason.to_string());
214
5.33k
    if (notify_close()) {
215
60
        return;
216
60
    }
217
    // Timeout is a special error code, we need print current stack to debug timeout issue.
218
5.27k
    if (reason.is<ErrorCode::TIMEOUT>()) {
219
1
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
220
1
                                   debug_string());
221
1
        LOG_LONG_STRING(WARNING, dbg_str);
222
1
    }
223
224
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
225
5.27k
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
226
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
227
0
                    debug_string());
228
0
    }
229
230
5.27k
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
231
0
        print_profile("cancel pipeline, reason: " + reason.to_string());
232
0
    }
233
234
5.27k
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
235
22
        _query_ctx->set_load_error_url(error_url);
236
22
    }
237
238
5.27k
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
239
22
        _query_ctx->set_first_error_msg(first_error_msg);
240
22
    }
241
242
5.27k
    _query_ctx->cancel(reason, _fragment_id);
243
5.27k
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
244
130
        _is_report_on_cancel = false;
245
5.14k
    } else {
246
22.2k
        for (auto& id : _fragment_instance_ids) {
247
22.2k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
248
22.2k
        }
249
5.14k
    }
250
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
251
    // For stream load the fragment's query_id == load id, it is set in FE.
252
5.27k
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
253
5.27k
    if (stream_load_ctx != nullptr) {
254
30
        stream_load_ctx->pipe->cancel(reason.to_string());
255
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
256
        // We need to set the error URL at this point to ensure error information is properly
257
        // propagated to the client.
258
30
        stream_load_ctx->error_url = get_load_error_url();
259
30
        stream_load_ctx->first_error_msg = get_first_error_msg();
260
30
    }
261
262
22.9k
    for (auto& tasks : _tasks) {
263
49.4k
        for (auto& task : tasks) {
264
49.4k
            task.first->unblock_all_dependencies();
265
49.4k
        }
266
22.9k
    }
267
5.27k
}
268
269
513k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
270
513k
    PipelineId id = _next_pipeline_id++;
271
513k
    auto pipeline = std::make_shared<Pipeline>(
272
513k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
273
513k
            parent ? parent->num_tasks() : _num_instances);
274
513k
    if (idx >= 0) {
275
601
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
276
512k
    } else {
277
512k
        _pipelines.emplace_back(pipeline);
278
512k
    }
279
513k
    if (parent) {
280
181k
        parent->set_children(pipeline);
281
181k
    }
282
513k
    return pipeline;
283
513k
}
284
285
330k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
286
330k
    {
287
330k
        SCOPED_TIMER(_build_pipelines_timer);
288
        // 2. Build pipelines with operators in this fragment.
289
330k
        auto root_pipeline = add_pipeline();
290
330k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
291
330k
                                         &_root_op, root_pipeline));
292
293
        // Propagate _num_instances from LOCAL_EXCHANGE pipelines to ancestor pipelines
294
        // that inherited reduced num_tasks from a serial operator.
295
330k
        _propagate_local_exchange_num_tasks();
296
297
        // Create deferred local exchangers now that all pipelines have final num_tasks.
298
330k
        RETURN_IF_ERROR(_create_deferred_local_exchangers());
299
300
        // Raise num_tasks for pipelines whose serial non-scan operators (e.g.,
301
        // UNPARTITIONED Exchange) reduced num_tasks below _num_instances.
302
        // Without this, fragment instances 1+ have no task for these pipelines
303
        // and downstream operators fail with "must set shared state".
304
        //
305
        // This applies to ALL pipelines (not just deferred exchanger upstreams):
306
        // fragments with UNION/INTERSECT/EXCEPT + serial Exchange in child
307
        // pipelines also need the raise, even without FE-planned local exchange.
308
        //
309
        // Exception: serial scan sources (pooling scan) keep num_tasks=1 — the
310
        // PassthroughExchanger(1, N) handles the fan-out correctly.
311
        // NOTE: Do NOT raise pipelines whose source is a serial operator
312
        // (Exchange or scan) — they legitimately have 1 task, and raising
313
        // them causes crashes (e.g., 4 Exchange tasks but only 1 receives
314
        // data).  The correct fix for shared state injection across
315
        // instances is handled by the FE: it inserts local exchange nodes
316
        // between serial operators and their downstream consumers, creating
317
        // proper pipeline boundaries with _num_instances tasks.
318
319
        // 3. Create sink operator
320
330k
        if (!_params.fragment.__isset.output_sink) {
321
0
            return Status::InternalError("No output sink in this fragment!");
322
0
        }
323
330k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
324
330k
                                          _params.fragment.output_exprs, _params,
325
330k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
326
330k
                                          *_desc_tbl, root_pipeline->id()));
327
330k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
328
330k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
329
330
512k
        for (PipelinePtr& pipeline : _pipelines) {
331
512k
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
332
512k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
333
512k
        }
334
330k
    }
335
    // 4. Build local exchanger
336
330k
    if (_runtime_state->plan_local_shuffle()) {
337
84.6k
        SCOPED_TIMER(_plan_local_exchanger_timer);
338
84.6k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
339
84.6k
                                             _params.bucket_seq_to_instance_idx,
340
84.6k
                                             _params.shuffle_idx_to_instance_idx));
341
84.6k
    }
342
343
    // 5. Initialize global states in pipelines.
344
513k
    for (PipelinePtr& pipeline : _pipelines) {
345
513k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
346
513k
        pipeline->children().clear();
347
513k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
348
513k
    }
349
350
329k
    {
351
329k
        SCOPED_TIMER(_build_tasks_timer);
352
        // 6. Build pipeline tasks and initialize local state.
353
329k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
354
329k
    }
355
356
329k
    return Status::OK();
357
329k
}
358
359
330k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
360
330k
    if (_prepared) {
361
0
        return Status::InternalError("Already prepared");
362
0
    }
363
330k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
364
330k
        _timeout = _params.query_options.execution_timeout;
365
330k
    }
366
367
330k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
368
330k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
369
330k
    SCOPED_TIMER(_prepare_timer);
370
330k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
371
330k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
372
330k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
373
330k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
374
330k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
375
330k
    {
376
330k
        SCOPED_TIMER(_init_context_timer);
377
330k
        cast_set(_num_instances, _params.local_params.size());
378
330k
        _total_instances =
379
330k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
380
381
330k
        auto* fragment_context = this;
382
383
330k
        if (_params.query_options.__isset.is_report_success) {
384
328k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
385
328k
        }
386
387
        // 1. Set up the global runtime state.
388
330k
        _runtime_state = RuntimeState::create_unique(
389
330k
                _params.query_id, _params.fragment_id, _params.query_options,
390
330k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
391
330k
        _runtime_state->set_task_execution_context(shared_from_this());
392
330k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
393
330k
        if (_params.__isset.backend_id) {
394
328k
            _runtime_state->set_backend_id(_params.backend_id);
395
328k
        }
396
330k
        if (_params.__isset.import_label) {
397
241
            _runtime_state->set_import_label(_params.import_label);
398
241
        }
399
330k
        if (_params.__isset.db_name) {
400
192
            _runtime_state->set_db_name(_params.db_name);
401
192
        }
402
330k
        if (_params.__isset.load_job_id) {
403
0
            _runtime_state->set_load_job_id(_params.load_job_id);
404
0
        }
405
406
330k
        if (_params.is_simplified_param) {
407
116k
            _desc_tbl = _query_ctx->desc_tbl;
408
213k
        } else {
409
213k
            DCHECK(_params.__isset.desc_tbl);
410
213k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
411
213k
                                                  &_desc_tbl));
412
213k
        }
413
330k
        _runtime_state->set_desc_tbl(_desc_tbl);
414
330k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
415
330k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
416
330k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
417
330k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
418
419
        // init fragment_instance_ids
420
330k
        const auto target_size = _params.local_params.size();
421
330k
        _fragment_instance_ids.resize(target_size);
422
1.29M
        for (size_t i = 0; i < _params.local_params.size(); i++) {
423
967k
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
424
967k
            _fragment_instance_ids[i] = fragment_instance_id;
425
967k
        }
426
330k
    }
427
428
330k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
429
430
329k
    _init_next_report_time();
431
432
329k
    _prepared = true;
433
329k
    return Status::OK();
434
330k
}
435
436
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
437
        int instance_idx,
438
968k
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
439
968k
    const auto& local_params = _params.local_params[instance_idx];
440
968k
    auto fragment_instance_id = local_params.fragment_instance_id;
441
968k
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
442
968k
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
443
968k
    auto get_shared_state = [&](PipelinePtr pipeline)
444
968k
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
445
1.50M
                                       std::vector<std::shared_ptr<Dependency>>>> {
446
1.50M
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
447
1.50M
                                std::vector<std::shared_ptr<Dependency>>>>
448
1.50M
                shared_state_map;
449
2.06M
        for (auto& op : pipeline->operators()) {
450
2.06M
            auto source_id = op->operator_id();
451
2.06M
            if (auto iter = _op_id_to_shared_state.find(source_id);
452
2.06M
                iter != _op_id_to_shared_state.end()) {
453
600k
                shared_state_map.insert({source_id, iter->second});
454
600k
            }
455
2.06M
        }
456
1.50M
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
457
1.50M
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
458
1.50M
                iter != _op_id_to_shared_state.end()) {
459
238k
                shared_state_map.insert({sink_to_source_id, iter->second});
460
238k
            }
461
1.50M
        }
462
1.50M
        return shared_state_map;
463
1.50M
    };
464
465
2.82M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
466
1.86M
        auto& pipeline = _pipelines[pip_idx];
467
1.86M
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
468
1.50M
            auto task_runtime_state = RuntimeState::create_unique(
469
1.50M
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
470
1.50M
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
471
1.50M
            {
472
                // Initialize runtime state for this task
473
1.50M
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
474
475
1.50M
                task_runtime_state->set_task_execution_context(shared_from_this());
476
1.50M
                task_runtime_state->set_be_number(local_params.backend_num);
477
478
1.50M
                if (_params.__isset.backend_id) {
479
1.50M
                    task_runtime_state->set_backend_id(_params.backend_id);
480
1.50M
                }
481
1.50M
                if (_params.__isset.import_label) {
482
242
                    task_runtime_state->set_import_label(_params.import_label);
483
242
                }
484
1.50M
                if (_params.__isset.db_name) {
485
193
                    task_runtime_state->set_db_name(_params.db_name);
486
193
                }
487
1.50M
                if (_params.__isset.load_job_id) {
488
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
489
0
                }
490
1.50M
                if (_params.__isset.wal_id) {
491
115
                    task_runtime_state->set_wal_id(_params.wal_id);
492
115
                }
493
1.50M
                if (_params.__isset.content_length) {
494
34
                    task_runtime_state->set_content_length(_params.content_length);
495
34
                }
496
497
1.50M
                task_runtime_state->set_desc_tbl(_desc_tbl);
498
1.50M
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
499
1.50M
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
500
1.50M
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
501
1.50M
                task_runtime_state->set_max_operator_id(max_operator_id());
502
1.50M
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
503
1.50M
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
504
1.50M
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
505
506
1.50M
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
507
1.50M
            }
508
1.50M
            auto cur_task_id = _total_tasks++;
509
1.50M
            task_runtime_state->set_task_id(cur_task_id);
510
1.50M
            task_runtime_state->set_task_num(pipeline->num_tasks());
511
1.50M
            auto task = std::make_shared<PipelineTask>(
512
1.50M
                    pipeline, cur_task_id, task_runtime_state.get(),
513
1.50M
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
514
1.50M
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
515
1.50M
                    instance_idx);
516
1.50M
            pipeline->incr_created_tasks(instance_idx, task.get());
517
1.50M
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
518
1.50M
            _tasks[instance_idx].emplace_back(
519
1.50M
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
520
1.50M
                            std::move(task), std::move(task_runtime_state)});
521
1.50M
        }
522
1.86M
    }
523
524
    /**
525
         * Build DAG for pipeline tasks.
526
         * For example, we have
527
         *
528
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
529
         *            \                      /
530
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
531
         *                 \                          /
532
         *               JoinProbeOperator2 (Pipeline1)
533
         *
534
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
535
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
536
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
537
         *
538
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
539
         * and JoinProbeOperator2.
540
         */
541
1.86M
    for (auto& _pipeline : _pipelines) {
542
1.86M
        if (pipeline_id_to_task.contains(_pipeline->id())) {
543
1.49M
            auto* task = pipeline_id_to_task[_pipeline->id()];
544
1.49M
            DCHECK(task != nullptr);
545
546
            // If this task has upstream dependency, then inject it into this task.
547
1.49M
            if (_dag.contains(_pipeline->id())) {
548
905k
                auto& deps = _dag[_pipeline->id()];
549
905k
                for (auto& dep : deps) {
550
898k
                    if (pipeline_id_to_task.contains(dep)) {
551
537k
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
552
537k
                        if (ss) {
553
287k
                            task->inject_shared_state(ss);
554
287k
                        } else {
555
249k
                            pipeline_id_to_task[dep]->inject_shared_state(
556
249k
                                    task->get_source_shared_state());
557
249k
                        }
558
537k
                    }
559
898k
                }
560
905k
            }
561
1.49M
        }
562
1.86M
    }
563
2.83M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
564
1.86M
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
565
1.49M
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
566
1.49M
            DCHECK(pipeline_id_to_profile[pip_idx]);
567
1.49M
            std::vector<TScanRangeParams> scan_ranges;
568
1.49M
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
569
1.49M
            if (local_params.per_node_scan_ranges.contains(node_id)) {
570
235k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
571
235k
            }
572
1.49M
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
573
1.49M
                                                             _params.fragment.output_sink));
574
1.49M
        }
575
1.86M
    }
576
971k
    {
577
971k
        std::lock_guard<std::mutex> l(_state_map_lock);
578
971k
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
579
971k
    }
580
971k
    return Status::OK();
581
968k
}
582
583
329k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
584
329k
    _total_tasks = 0;
585
329k
    _closed_tasks = 0;
586
329k
    const auto target_size = _params.local_params.size();
587
329k
    _tasks.resize(target_size);
588
329k
    _runtime_filter_mgr_map.resize(target_size);
589
842k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
590
513k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
591
513k
    }
592
329k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
593
594
329k
    if (target_size > 1 &&
595
329k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
596
125k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
597
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
598
22.6k
        std::vector<Status> prepare_status(target_size);
599
22.6k
        int submitted_tasks = 0;
600
22.6k
        Status submit_status;
601
22.6k
        CountDownLatch latch((int)target_size);
602
272k
        for (int i = 0; i < target_size; i++) {
603
250k
            submit_status = thread_pool->submit_func([&, i]() {
604
250k
                SCOPED_ATTACH_TASK(_query_ctx.get());
605
250k
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
606
250k
                latch.count_down();
607
250k
            });
608
250k
            if (LIKELY(submit_status.ok())) {
609
250k
                submitted_tasks++;
610
18.4E
            } else {
611
18.4E
                break;
612
18.4E
            }
613
250k
        }
614
22.6k
        latch.arrive_and_wait(target_size - submitted_tasks);
615
22.6k
        if (UNLIKELY(!submit_status.ok())) {
616
0
            return submit_status;
617
0
        }
618
272k
        for (int i = 0; i < submitted_tasks; i++) {
619
250k
            if (!prepare_status[i].ok()) {
620
0
                return prepare_status[i];
621
0
            }
622
250k
        }
623
307k
    } else {
624
1.02M
        for (int i = 0; i < target_size; i++) {
625
719k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
626
719k
        }
627
307k
    }
628
329k
    _pipeline_parent_map.clear();
629
329k
    _op_id_to_shared_state.clear();
630
    // Record task cardinality once when this fragment context finishes task initialization.
631
329k
    _query_ctx->add_total_task_num(_total_tasks.load(std::memory_order_relaxed));
632
633
329k
    return Status::OK();
634
329k
}
635
636
329k
void PipelineFragmentContext::_init_next_report_time() {
637
329k
    auto interval_s = config::pipeline_status_report_interval;
638
329k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
639
32.1k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
640
32.1k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
641
        // We don't want to wait longer than it takes to run the entire fragment.
642
32.1k
        _previous_report_time =
643
32.1k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
644
32.1k
        _disable_period_report = false;
645
32.1k
    }
646
329k
}
647
648
3.62k
void PipelineFragmentContext::refresh_next_report_time() {
649
3.62k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
650
3.62k
    DCHECK(disable == true);
651
3.62k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
652
3.62k
    _disable_period_report.compare_exchange_strong(disable, false);
653
3.62k
}
654
655
5.45M
void PipelineFragmentContext::trigger_report_if_necessary() {
656
5.45M
    if (!_is_report_success) {
657
5.11M
        return;
658
5.11M
    }
659
347k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
660
347k
    if (disable) {
661
5.58k
        return;
662
5.58k
    }
663
341k
    int32_t interval_s = config::pipeline_status_report_interval;
664
341k
    if (interval_s <= 0) {
665
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
666
0
                        "trigger "
667
0
                        "report.";
668
0
    }
669
341k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
670
341k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
671
341k
    if (MonotonicNanos() > next_report_time) {
672
3.62k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
673
3.62k
                                                            std::memory_order_acq_rel)) {
674
5
            return;
675
5
        }
676
3.62k
        if (VLOG_FILE_IS_ON) {
677
0
            VLOG_FILE << "Reporting "
678
0
                      << "profile for query_id " << print_id(_query_id)
679
0
                      << ", fragment id: " << _fragment_id;
680
681
0
            std::stringstream ss;
682
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
683
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
684
0
            if (_runtime_state->load_channel_profile()) {
685
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
686
0
            }
687
688
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
689
0
                      << " profile:\n"
690
0
                      << ss.str();
691
0
        }
692
3.62k
        auto st = send_report(false);
693
3.62k
        if (!st.ok()) {
694
0
            disable = true;
695
0
            _disable_period_report.compare_exchange_strong(disable, false,
696
0
                                                           std::memory_order_acq_rel);
697
0
        }
698
3.62k
    }
699
341k
}
700
701
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
702
327k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
703
327k
    if (_params.fragment.plan.nodes.empty()) {
704
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
705
0
    }
706
707
327k
    int node_idx = 0;
708
709
327k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
710
327k
                                        &node_idx, root, cur_pipe, 0, false, false));
711
712
327k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
713
0
        return Status::InternalError(
714
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
715
0
    }
716
327k
    return Status::OK();
717
327k
}
718
719
329k
Status PipelineFragmentContext::_create_deferred_local_exchangers() {
720
329k
    for (auto& info : _deferred_exchangers) {
721
        // DANGER ZONE — do not "fix" this line without reading the history.
722
        //
723
        // sender_count seeds Exchanger::_running_sink_operators, which the source side
724
        // waits to reach 0 via sub_running_sink_operators on each sink LocalState close.
725
        // The correct value is THIS pipeline-instance's sink task count, which is exactly
726
        // info.upstream_pipe->num_tasks() — one PipelineTask per task, one close per task.
727
        //
728
        // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to mirror the
729
        //   BE-planned path in _add_local_exchange_impl (~line 1023).  THIS BREAKS the
730
        //   common FE-planned shape of `serial scan → LE(PT) → ...`: upstream_pipe
731
        //   genuinely has num_tasks=1, only 1 close arrives, but seed becomes
732
        //   _num_instances so _running_sink_operators never reaches 0 — downstream
733
        //   sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
734
        //   mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING and regressed
735
        //   exactly this way).  BE-planned mode uses max() because its
736
        //   `cur_pipe` is the source-side pipeline (always raised to _num_instances by
737
        //   add_pipeline) — not analogous to our `upstream_pipe` here, which is the
738
        //   sink-side pipeline that may legitimately stay at 1 for serial sources.
739
        //
740
        // Tempting wrong fix #2: multiply by _num_instances on the theory shared_state
741
        //   is shared across all instances.  Same hang — each fragment-instance
742
        //   PipelineFragmentContext has its OWN _op_id_to_shared_state map, so the
743
        //   exchanger is per-instance, not per-BE.  num_tasks() is already the right
744
        //   close-count for one instance.
745
        //
746
        // If a hang shows up with `_running_sink_operators < 0`, the bug is upstream:
747
        // _propagate_local_exchange_num_tasks left num_tasks too low (or too high) for
748
        // this fragment shape.  Fix THAT pass, not this seed value.
749
101k
        const int sender_count = info.upstream_pipe->num_tasks();
750
101k
        switch (info.partition_type) {
751
21.7k
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
752
21.7k
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
753
21.7k
            info.shared_state->exchanger = ShuffleExchanger::create_unique(
754
21.7k
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit,
755
21.7k
                    info.partition_type);
756
21.7k
            break;
757
386
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
758
386
            info.shared_state->exchanger = BucketShuffleExchanger::create_unique(
759
386
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit);
760
386
            break;
761
75.9k
        case TLocalPartitionType::PASSTHROUGH:
762
75.9k
            info.shared_state->exchanger = PassthroughExchanger::create_unique(
763
75.9k
                    sender_count, _num_instances, info.free_blocks_limit);
764
75.9k
            break;
765
265
        case TLocalPartitionType::BROADCAST:
766
265
            info.shared_state->exchanger = BroadcastExchanger::create_unique(
767
265
                    sender_count, _num_instances, info.free_blocks_limit);
768
265
            break;
769
2.00k
        case TLocalPartitionType::PASS_TO_ONE:
770
2.00k
            if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
771
1.24k
                info.shared_state->exchanger = PassToOneExchanger::create_unique(
772
1.24k
                        sender_count, _num_instances, info.free_blocks_limit);
773
1.24k
            } else {
774
762
                info.shared_state->exchanger = BroadcastExchanger::create_unique(
775
762
                        sender_count, _num_instances, info.free_blocks_limit);
776
762
            }
777
2.00k
            break;
778
829
        case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
779
829
            info.shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
780
829
                    sender_count, _num_instances, info.free_blocks_limit);
781
829
            break;
782
0
        case TLocalPartitionType::NOOP:
783
0
        case TLocalPartitionType::LOCAL_MERGE_SORT:
784
            // FE-planned LocalExchangeNode currently never emits NOOP or LOCAL_MERGE_SORT
785
            // through the deferred-exchanger path.  NOOP means "no exchange needed" and
786
            // is filtered out before reaching here; LOCAL_MERGE_SORT is planned by the
787
            // legacy BE path only.  Crash in debug to surface the protocol violation if
788
            // that ever changes; return an error in release to avoid silently corrupting
789
            // execution.
790
0
            DCHECK(false) << "FE-planned local exchange should not emit partition_type="
791
0
                          << static_cast<int>(info.partition_type);
792
0
            return Status::InternalError("FE-planned local exchange emitted unsupported type: " +
793
0
                                         std::to_string(static_cast<int>(info.partition_type)));
794
0
        default:
795
            // New TLocalPartitionType added on FE side without a BE handler here.
796
0
            DCHECK(false) << "Unhandled TLocalPartitionType in deferred exchangers: "
797
0
                          << static_cast<int>(info.partition_type);
798
0
            return Status::InternalError("Unsupported FE-planned local exchange type: " +
799
0
                                         std::to_string(static_cast<int>(info.partition_type)));
800
101k
        }
801
101k
    }
802
329k
    _deferred_exchangers.clear();
803
329k
    return Status::OK();
804
329k
}
805
806
329k
void PipelineFragmentContext::_propagate_local_exchange_num_tasks() {
807
    // Only runs when FE has planned local exchanges and BE deferred their construction.
808
    // In legacy mode (enable_local_shuffle_planner=false) BE plans LE itself via
809
    // _plan_local_exchange and _deferred_exchangers stays empty — the legacy path
810
    // already gets its num_tasks right at construction time, so the propagate passes
811
    // would be no-ops and are skipped.  This is a transitional design: once the FE
812
    // planner is the only planner, the propagation logic itself should degrade into
813
    // a pure assertion that the FE plan already wired the right num_tasks everywhere.
814
329k
    if (_deferred_exchangers.empty()) {
815
254k
        return;
816
254k
    }
817
    // Reconcile num_tasks across paired pipelines created by pipeline-splitting operators
818
    // (AGG, SORT, JOIN): they share state via inject_shared_state and must agree, or
819
    // instance 1+ tasks access null shared_state.  A pipeline's num_tasks is fully
820
    // determined by its source operator plus its upstreams:
821
    //   - LocalExchangeSource  -> _num_instances (the LE re-parallelizes)
822
    //   - serial source        -> its reduced count (kept as-is, typically 1)
823
    //   - otherwise (splitter) -> inherit from upstreams: raise to _num_instances if any
824
    //                             upstream was raised by an LE, then lower to a serial
825
    //                             upstream's count (lower wins).
826
    // Visiting each pipeline only after all its upstreams (topological order over _dag) lets
827
    // a single sweep reach the same fixpoint the previous two while-loops iterated to — those
828
    // only existed to reconcile the top-down build's parent-inherited num_tasks guesses.
829
74.9k
    std::map<PipelineId, PipelinePtr> id_to_pipe;
830
74.9k
    std::map<PipelineId, std::vector<PipelineId>> downstreams_of;
831
74.9k
    std::map<PipelineId, int> in_degree;
832
216k
    for (auto& p : _pipelines) {
833
216k
        id_to_pipe[p->id()] = p;
834
216k
        in_degree.try_emplace(p->id(), 0);
835
216k
    }
836
136k
    for (const auto& [downstream_id, upstream_ids] : _dag) {
837
141k
        for (auto upstream_id : upstream_ids) {
838
141k
            downstreams_of[upstream_id].push_back(downstream_id);
839
141k
            in_degree[downstream_id]++;
840
141k
        }
841
136k
    }
842
74.9k
    std::vector<PipelineId> ready;
843
216k
    for (const auto& [id, deg] : in_degree) {
844
216k
        if (deg == 0) {
845
80.1k
            ready.push_back(id);
846
80.1k
        }
847
216k
    }
848
74.9k
    size_t visited = 0;
849
291k
    while (!ready.empty()) {
850
216k
        const auto id = ready.back();
851
216k
        ready.pop_back();
852
216k
        visited++;
853
216k
        auto pit = id_to_pipe.find(id);
854
216k
        if (pit != id_to_pipe.end()) {
855
216k
            auto& pipe = pit->second;
856
216k
            const auto& ops = pipe->operators();
857
216k
            const bool le_source =
858
216k
                    !ops.empty() && dynamic_cast<LocalExchangeSourceOperatorX*>(ops.front().get());
859
216k
            const bool serial_source = !ops.empty() && ops.front()->is_serial_operator();
860
216k
            if (le_source) {
861
101k
                pipe->set_num_tasks(_num_instances);
862
115k
            } else if (!serial_source) {
863
52.9k
                int target = pipe->num_tasks();
864
52.9k
                const auto up_it = _dag.find(id);
865
52.9k
                if (up_it != _dag.end()) {
866
                    // raise: any upstream already at _num_instances (e.g. an LE source)
867
35.5k
                    for (auto upstream_id : up_it->second) {
868
35.5k
                        auto uit = id_to_pipe.find(upstream_id);
869
35.5k
                        if (uit != id_to_pipe.end() && uit->second->num_tasks() >= _num_instances) {
870
35.5k
                            target = _num_instances;
871
35.5k
                            break;
872
35.5k
                        }
873
35.5k
                    }
874
                    // lower: a serial upstream with fewer tasks (wins over the raise above)
875
36.1k
                    for (auto upstream_id : up_it->second) {
876
36.1k
                        auto uit = id_to_pipe.find(upstream_id);
877
36.1k
                        if (uit != id_to_pipe.end() && uit->second->num_tasks() < target &&
878
36.1k
                            !uit->second->operators().empty() &&
879
36.1k
                            uit->second->operators().front()->is_serial_operator()) {
880
0
                            target = uit->second->num_tasks();
881
0
                        }
882
36.1k
                    }
883
35.5k
                }
884
52.9k
                pipe->set_num_tasks(target);
885
52.9k
            }
886
216k
        }
887
216k
        for (auto down : downstreams_of[id]) {
888
141k
            if (--in_degree[down] == 0) {
889
136k
                ready.push_back(down);
890
136k
            }
891
141k
        }
892
216k
    }
893
    // The pipeline DAG is acyclic; if a future change introduces a back-edge, some pipelines
894
    // stay unvisited (in_degree never reaches 0) — fail loudly rather than silently leaving
895
    // their num_tasks unreconciled.
896
74.9k
    DCHECK_EQ(visited, in_degree.size())
897
0
            << "pipeline num_tasks topological sweep visited " << visited << " of "
898
0
            << in_degree.size() << " pipelines (cycle in _dag?)";
899
74.9k
}
900
901
Status PipelineFragmentContext::_create_tree_helper(
902
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
903
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
904
610k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
905
    // propagate error case
906
610k
    if (*node_idx >= tnodes.size()) {
907
0
        return Status::InternalError(
908
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
909
0
                *node_idx, tnodes.size());
910
0
    }
911
610k
    const TPlanNode& tnode = tnodes[*node_idx];
912
913
610k
    int num_children = tnodes[*node_idx].num_children;
914
610k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
915
610k
    bool current_require_bucket_distribution = require_bucket_distribution;
916
    // TODO: Create CacheOperator is confused now
917
610k
    OperatorPtr op = nullptr;
918
610k
    OperatorPtr cache_op = nullptr;
919
610k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
920
610k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
921
610k
                                     followed_by_shuffled_operator,
922
610k
                                     current_require_bucket_distribution, cache_op));
923
    // Initialization must be done here. For example, group by expressions in agg will be used to
924
    // decide if a local shuffle should be planed, so it must be initialized here.
925
610k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
926
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
927
610k
    if (parent != nullptr) {
928
        // add to parent's child(s)
929
282k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
930
328k
    } else {
931
328k
        *root = op;
932
328k
    }
933
    /**
934
     * `TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
935
     *
936
     * For plan:
937
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
938
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
939
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
940
     *
941
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
942
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
943
     */
944
610k
    auto required_data_distribution =
945
610k
            cur_pipe->operators().empty()
946
610k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
947
610k
                    : op->required_data_distribution(_runtime_state.get());
948
610k
    current_followed_by_shuffled_operator =
949
610k
            ((followed_by_shuffled_operator ||
950
610k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
951
547k
                                             : op->is_shuffled_operator())) &&
952
610k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
953
610k
            (followed_by_shuffled_operator &&
954
499k
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
955
956
610k
    current_require_bucket_distribution =
957
610k
            ((require_bucket_distribution ||
958
610k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
959
553k
                                             : op->is_colocated_operator())) &&
960
610k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
961
610k
            (require_bucket_distribution &&
962
506k
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
963
964
610k
    if (num_children == 0) {
965
342k
        _use_serial_source = op->is_serial_operator();
966
342k
    }
967
    // rely on that tnodes is preorder of the plan
968
893k
    for (int i = 0; i < num_children; i++) {
969
282k
        ++*node_idx;
970
282k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
971
282k
                                            current_followed_by_shuffled_operator,
972
282k
                                            current_require_bucket_distribution));
973
974
        // we are expecting a child, but have used all nodes
975
        // this means we have been given a bad tree and must fail
976
282k
        if (*node_idx >= tnodes.size()) {
977
0
            return Status::InternalError(
978
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
979
0
                    "nodes: {}",
980
0
                    *node_idx, tnodes.size());
981
0
        }
982
282k
    }
983
984
610k
    return Status::OK();
985
610k
}
986
987
void PipelineFragmentContext::_inherit_pipeline_properties(
988
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
989
601
        PipelinePtr pipe_with_sink) {
990
601
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
991
601
    pipe_with_source->set_num_tasks(_num_instances);
992
601
    pipe_with_source->set_data_distribution(data_distribution);
993
601
}
994
995
Status PipelineFragmentContext::_add_local_exchange_impl(
996
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
997
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
998
        const std::map<int, int>& bucket_seq_to_instance_idx,
999
601
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1000
601
    auto& operators = cur_pipe->operators();
1001
601
    const auto downstream_pipeline_id = cur_pipe->id();
1002
601
    auto local_exchange_id = next_operator_id();
1003
    // 1. Create a new pipeline with local exchange sink.
1004
601
    DataSinkOperatorPtr sink;
1005
601
    auto sink_id = next_sink_operator_id();
1006
1007
    /**
1008
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
1009
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
1010
     */
1011
601
    const bool followed_by_shuffled_operator =
1012
601
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
1013
601
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
1014
601
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
1015
601
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
1016
601
                                         followed_by_shuffled_operator && !_use_serial_source;
1017
601
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
1018
601
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
1019
601
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
1020
601
    if (bucket_seq_to_instance_idx.empty() &&
1021
601
        data_distribution.distribution_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
1022
0
        data_distribution.distribution_type =
1023
0
                use_global_hash_shuffle ? TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE
1024
0
                                        : TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1025
0
    }
1026
601
    if (!use_global_hash_shuffle &&
1027
601
        data_distribution.distribution_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
1028
94
        data_distribution.distribution_type = TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1029
94
    }
1030
601
    RETURN_IF_ERROR(new_pip->set_sink(sink));
1031
601
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
1032
601
                                          num_buckets, shuffle_idx_to_instance_idx));
1033
1034
    // 2. Create and initialize LocalExchangeSharedState.
1035
601
    std::shared_ptr<LocalExchangeSharedState> shared_state =
1036
601
            LocalExchangeSharedState::create_shared(_num_instances);
1037
601
    switch (data_distribution.distribution_type) {
1038
94
    case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
1039
94
    case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
1040
94
        shared_state->exchanger = ShuffleExchanger::create_unique(
1041
94
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1042
94
                use_global_hash_shuffle ? _total_instances : _num_instances,
1043
94
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1044
94
                        ? cast_set<int>(
1045
94
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1046
94
                        : 0,
1047
94
                data_distribution.distribution_type);
1048
94
        break;
1049
7
    case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
1050
7
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
1051
7
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
1052
7
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1053
7
                        ? cast_set<int>(
1054
7
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1055
7
                        : 0);
1056
7
        break;
1057
407
    case TLocalPartitionType::PASSTHROUGH:
1058
407
        shared_state->exchanger = PassthroughExchanger::create_unique(
1059
407
                cur_pipe->num_tasks(), _num_instances,
1060
407
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1061
407
                        ? cast_set<int>(
1062
407
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1063
407
                        : 0);
1064
407
        break;
1065
8
    case TLocalPartitionType::BROADCAST:
1066
8
        shared_state->exchanger = BroadcastExchanger::create_unique(
1067
8
                cur_pipe->num_tasks(), _num_instances,
1068
8
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1069
8
                        ? cast_set<int>(
1070
8
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1071
8
                        : 0);
1072
8
        break;
1073
2
    case TLocalPartitionType::PASS_TO_ONE:
1074
2
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
1075
            // If shared hash table is enabled for BJ, hash table will be built by only one task
1076
2
            shared_state->exchanger = PassToOneExchanger::create_unique(
1077
2
                    cur_pipe->num_tasks(), _num_instances,
1078
2
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1079
2
                            ? cast_set<int>(_runtime_state->query_options()
1080
2
                                                    .local_exchange_free_blocks_limit)
1081
2
                            : 0);
1082
2
        } else {
1083
0
            shared_state->exchanger = BroadcastExchanger::create_unique(
1084
0
                    cur_pipe->num_tasks(), _num_instances,
1085
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1086
0
                            ? cast_set<int>(_runtime_state->query_options()
1087
0
                                                    .local_exchange_free_blocks_limit)
1088
0
                            : 0);
1089
0
        }
1090
2
        break;
1091
83
    case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
1092
83
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
1093
83
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1094
83
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1095
83
                        ? cast_set<int>(
1096
83
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1097
83
                        : 0);
1098
83
        break;
1099
0
    default:
1100
0
        return Status::InternalError("Unsupported local exchange type : " +
1101
0
                                     std::to_string((int)data_distribution.distribution_type));
1102
601
    }
1103
601
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
1104
601
                                             "LOCAL_EXCHANGE_OPERATOR");
1105
601
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
1106
601
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
1107
1108
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
1109
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
1110
1111
    // 3.1 Initialize new pipeline's operator list.
1112
601
    std::copy(operators.begin(), operators.begin() + idx,
1113
601
              std::inserter(new_pip->operators(), new_pip->operators().end()));
1114
1115
    // 3.2 Erase unused operators in previous pipeline.
1116
601
    operators.erase(operators.begin(), operators.begin() + idx);
1117
1118
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
1119
601
    OperatorPtr source_op;
1120
601
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
1121
601
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
1122
601
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
1123
601
    if (!operators.empty()) {
1124
242
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
1125
242
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
1126
242
    }
1127
601
    operators.insert(operators.begin(), source_op);
1128
1129
    // 5. Set children for two pipelines separately.
1130
601
    std::vector<std::shared_ptr<Pipeline>> new_children;
1131
601
    std::vector<PipelineId> edges_with_source;
1132
1.37k
    for (auto child : cur_pipe->children()) {
1133
1.37k
        bool found = false;
1134
1.97k
        for (auto op : new_pip->operators()) {
1135
1.97k
            if (child->sink()->node_id() == op->node_id()) {
1136
487
                new_pip->set_children(child);
1137
487
                found = true;
1138
487
            };
1139
1.97k
        }
1140
1.37k
        if (!found) {
1141
890
            new_children.push_back(child);
1142
890
            edges_with_source.push_back(child->id());
1143
890
        }
1144
1.37k
    }
1145
601
    new_children.push_back(new_pip);
1146
601
    edges_with_source.push_back(new_pip->id());
1147
1148
    // 6. Set DAG for new pipelines.
1149
601
    if (!new_pip->children().empty()) {
1150
279
        std::vector<PipelineId> edges_with_sink;
1151
487
        for (auto child : new_pip->children()) {
1152
487
            edges_with_sink.push_back(child->id());
1153
487
        }
1154
279
        _dag.insert({new_pip->id(), edges_with_sink});
1155
279
    }
1156
601
    cur_pipe->set_children(new_children);
1157
601
    _dag[downstream_pipeline_id] = edges_with_source;
1158
601
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
1159
601
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
1160
601
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
1161
1162
    // 7. Inherit properties from current pipeline.
1163
601
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
1164
601
    return Status::OK();
1165
601
}
1166
1167
Status PipelineFragmentContext::_add_local_exchange(
1168
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
1169
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
1170
        const std::map<int, int>& bucket_seq_to_instance_idx,
1171
3.11k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1172
3.11k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
1173
1.90k
        return Status::OK();
1174
1.90k
    }
1175
1176
1.20k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
1177
692
        return Status::OK();
1178
692
    }
1179
516
    *do_local_exchange = true;
1180
1181
516
    auto& operators = cur_pipe->operators();
1182
516
    auto total_op_num = operators.size();
1183
516
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
1184
516
    RETURN_IF_ERROR(_add_local_exchange_impl(
1185
516
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
1186
516
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1187
1188
516
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
1189
0
            << "total_op_num: " << total_op_num
1190
0
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
1191
0
            << " new_pip->operators().size(): " << new_pip->operators().size();
1192
1193
    // There are some local shuffles with relatively heavy operations on the sink.
1194
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
1195
    // Therefore, local passthrough is used to increase the concurrency of the sink.
1196
    // op -> local sink(1) -> local source (n)
1197
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
1198
516
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
1199
516
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
1200
85
        RETURN_IF_ERROR(_add_local_exchange_impl(
1201
85
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
1202
85
                add_pipeline(new_pip, pip_idx + 2),
1203
85
                DataDistribution(TLocalPartitionType::PASSTHROUGH), do_local_exchange, num_buckets,
1204
85
                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1205
85
    }
1206
516
    return Status::OK();
1207
516
}
1208
1209
Status PipelineFragmentContext::_plan_local_exchange(
1210
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
1211
84.6k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1212
187k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
1213
102k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
1214
        // Set property if child pipeline is not join operator's child.
1215
102k
        if (!_pipelines[pip_idx]->children().empty()) {
1216
17.1k
            for (auto& child : _pipelines[pip_idx]->children()) {
1217
17.1k
                if (child->sink()->node_id() ==
1218
17.1k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
1219
13.9k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
1220
13.9k
                }
1221
17.1k
            }
1222
15.7k
        }
1223
1224
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1225
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1226
        // still keep colocate plan after local shuffle
1227
102k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1228
102k
                                             bucket_seq_to_instance_idx,
1229
102k
                                             shuffle_idx_to_instance_idx));
1230
102k
    }
1231
84.6k
    return Status::OK();
1232
84.6k
}
1233
1234
Status PipelineFragmentContext::_plan_local_exchange(
1235
        int num_buckets, int pip_idx, PipelinePtr pip,
1236
        const std::map<int, int>& bucket_seq_to_instance_idx,
1237
102k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1238
102k
    int idx = 1;
1239
102k
    bool do_local_exchange = false;
1240
102k
    do {
1241
102k
        auto& ops = pip->operators();
1242
102k
        do_local_exchange = false;
1243
        // Plan local exchange for each operator.
1244
106k
        for (; idx < ops.size();) {
1245
3.74k
            auto _le_req = ops[idx]->required_data_distribution(_runtime_state.get());
1246
3.74k
            if (_le_req.need_local_exchange()) {
1247
1.37k
                RETURN_IF_ERROR(_add_local_exchange(
1248
1.37k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, _le_req,
1249
1.37k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1250
1.37k
                        shuffle_idx_to_instance_idx));
1251
1.37k
            }
1252
3.74k
            if (do_local_exchange) {
1253
                // If local exchange is needed for current operator, we will split this pipeline to
1254
                // two pipelines by local exchange sink/source. And then we need to process remaining
1255
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1256
                // is current operator was already processed) and continue to plan local exchange.
1257
242
                idx = 2;
1258
242
                break;
1259
242
            }
1260
3.50k
            idx++;
1261
3.50k
        }
1262
102k
    } while (do_local_exchange);
1263
102k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1264
1.74k
        RETURN_IF_ERROR(_add_local_exchange(
1265
1.74k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1266
1.74k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1267
1.74k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1268
1.74k
    }
1269
102k
    return Status::OK();
1270
102k
}
1271
1272
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1273
                                                  const std::vector<TExpr>& output_exprs,
1274
                                                  const TPipelineFragmentParams& params,
1275
                                                  const RowDescriptor& row_desc,
1276
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1277
330k
                                                  PipelineId cur_pipeline_id) {
1278
330k
    switch (thrift_sink.type) {
1279
116k
    case TDataSinkType::DATA_STREAM_SINK: {
1280
116k
        if (!thrift_sink.__isset.stream_sink) {
1281
0
            return Status::InternalError("Missing data stream sink.");
1282
0
        }
1283
116k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1284
116k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1285
116k
                params.destinations, _fragment_instance_ids);
1286
116k
        break;
1287
116k
    }
1288
183k
    case TDataSinkType::RESULT_SINK: {
1289
183k
        if (!thrift_sink.__isset.result_sink) {
1290
0
            return Status::InternalError("Missing data buffer sink.");
1291
0
        }
1292
1293
183k
        auto& pipeline = _pipelines[cur_pipeline_id];
1294
183k
        int child_node_id = pipeline->operators().back()->node_id();
1295
183k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), child_node_id + 1,
1296
183k
                                                      row_desc, output_exprs,
1297
183k
                                                      thrift_sink.result_sink);
1298
183k
        break;
1299
183k
    }
1300
56
    case TDataSinkType::DICTIONARY_SINK: {
1301
56
        if (!thrift_sink.__isset.dictionary_sink) {
1302
0
            return Status::InternalError("Missing dict sink.");
1303
0
        }
1304
1305
56
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1306
56
                                                    thrift_sink.dictionary_sink);
1307
56
        break;
1308
56
    }
1309
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1310
29.2k
    case TDataSinkType::OLAP_TABLE_SINK: {
1311
29.2k
        auto& pipeline = _pipelines[cur_pipeline_id];
1312
29.2k
        int child_node_id = pipeline->operators().back()->node_id();
1313
29.2k
        if (state->query_options().enable_memtable_on_sink_node &&
1314
29.2k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1315
29.2k
            !_has_row_binlog(thrift_sink.olap_table_sink) && !config::is_cloud_mode()) {
1316
56
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(
1317
56
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1318
29.2k
        } else {
1319
29.2k
            _sink = std::make_shared<OlapTableSinkOperatorX>(
1320
29.2k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1321
29.2k
        }
1322
29.2k
        break;
1323
0
    }
1324
168
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1325
168
        DCHECK(thrift_sink.__isset.olap_table_sink);
1326
168
        DCHECK(state->get_query_ctx() != nullptr);
1327
168
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1328
168
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1329
168
                                                                output_exprs);
1330
168
        break;
1331
0
    }
1332
0
    case TDataSinkType::HIVE_TABLE_SINK: {
1333
0
        if (!thrift_sink.__isset.hive_table_sink) {
1334
0
            return Status::InternalError("Missing hive table sink.");
1335
0
        }
1336
0
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1337
0
                                                         output_exprs);
1338
0
        break;
1339
0
    }
1340
0
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1341
0
        if (!thrift_sink.__isset.iceberg_table_sink) {
1342
0
            return Status::InternalError("Missing iceberg table sink.");
1343
0
        }
1344
0
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1345
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1346
0
                                                                     row_desc, output_exprs);
1347
0
        } else {
1348
0
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1349
0
                                                                row_desc, output_exprs);
1350
0
        }
1351
0
        break;
1352
0
    }
1353
0
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1354
0
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1355
0
            return Status::InternalError("Missing iceberg delete sink.");
1356
0
        }
1357
0
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1358
0
                                                             row_desc, output_exprs);
1359
0
        break;
1360
0
    }
1361
0
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1362
0
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1363
0
            return Status::InternalError("Missing iceberg merge sink.");
1364
0
        }
1365
0
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1366
0
                                                            output_exprs);
1367
0
        break;
1368
0
    }
1369
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1370
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1371
0
            return Status::InternalError("Missing max compute table sink.");
1372
0
        }
1373
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1374
0
                                                       output_exprs);
1375
0
        break;
1376
0
    }
1377
0
    case TDataSinkType::JDBC_TABLE_SINK: {
1378
0
        if (!thrift_sink.__isset.jdbc_table_sink) {
1379
0
            return Status::InternalError("Missing data jdbc sink.");
1380
0
        }
1381
0
        if (config::enable_java_support) {
1382
0
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1383
0
                                                             output_exprs);
1384
0
        } else {
1385
0
            return Status::InternalError(
1386
0
                    "Jdbc table sink is not enabled, you can change be config "
1387
0
                    "enable_java_support to true and restart be.");
1388
0
        }
1389
0
        break;
1390
0
    }
1391
3
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1392
3
        if (!thrift_sink.__isset.memory_scratch_sink) {
1393
0
            return Status::InternalError("Missing data buffer sink.");
1394
0
        }
1395
1396
3
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1397
3
                                                             output_exprs);
1398
3
        break;
1399
3
    }
1400
339
    case TDataSinkType::RESULT_FILE_SINK: {
1401
339
        if (!thrift_sink.__isset.result_file_sink) {
1402
0
            return Status::InternalError("Missing result file sink.");
1403
0
        }
1404
1405
        // Result file sink is not the top sink
1406
339
        if (params.__isset.destinations && !params.destinations.empty()) {
1407
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1408
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1409
0
                    params.destinations, output_exprs, desc_tbl);
1410
339
        } else {
1411
339
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1412
339
                                                              output_exprs);
1413
339
        }
1414
339
        break;
1415
339
    }
1416
618
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1417
618
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1418
618
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1419
618
        auto sink_id = next_sink_operator_id();
1420
618
        const int multi_cast_node_id = sink_id;
1421
618
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1422
        // one sink has multiple sources.
1423
618
        std::vector<int> sources;
1424
2.27k
        for (int i = 0; i < sender_size; ++i) {
1425
1.65k
            auto source_id = next_operator_id();
1426
1.65k
            sources.push_back(source_id);
1427
1.65k
        }
1428
1429
618
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1430
618
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1431
2.27k
        for (int i = 0; i < sender_size; ++i) {
1432
1.65k
            auto new_pipeline = add_pipeline();
1433
            // use to exchange sink
1434
1.65k
            RowDescriptor* exchange_row_desc = nullptr;
1435
1.65k
            {
1436
1.65k
                const auto& tmp_row_desc =
1437
1.65k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1438
1.65k
                                ? RowDescriptor(state->desc_tbl(),
1439
1.65k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1440
1.65k
                                                         .output_tuple_id})
1441
1.65k
                                : row_desc;
1442
1.65k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1443
1.65k
            }
1444
1.65k
            auto source_id = sources[i];
1445
1.65k
            OperatorPtr source_op;
1446
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1447
1.65k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1448
1.65k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1449
1.65k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1450
1.65k
                    /*operator_id=*/source_id);
1451
1.65k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1452
1.65k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1453
            // 2. create and set sink operator of data stream sender for new pipeline
1454
1455
1.65k
            DataSinkOperatorPtr sink_op;
1456
1.65k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1457
1.65k
                    state, *exchange_row_desc, next_sink_operator_id(),
1458
1.65k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1459
1.65k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1460
1461
1.65k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1462
1.65k
            {
1463
1.65k
                TDataSink* t = pool->add(new TDataSink());
1464
1.65k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1465
1.65k
                RETURN_IF_ERROR(sink_op->init(*t));
1466
1.65k
            }
1467
1468
            // 3. set dependency dag
1469
1.65k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1470
1.65k
        }
1471
618
        if (sources.empty()) {
1472
0
            return Status::InternalError("size of sources must be greater than 0");
1473
0
        }
1474
618
        break;
1475
618
    }
1476
618
    case TDataSinkType::BLACKHOLE_SINK: {
1477
3
        if (!thrift_sink.__isset.blackhole_sink) {
1478
0
            return Status::InternalError("Missing blackhole sink.");
1479
0
        }
1480
1481
3
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1482
3
        break;
1483
3
    }
1484
0
    case TDataSinkType::TVF_TABLE_SINK: {
1485
0
        if (!thrift_sink.__isset.tvf_table_sink) {
1486
0
            return Status::InternalError("Missing TVF table sink.");
1487
0
        }
1488
0
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1489
0
                                                        output_exprs);
1490
0
        break;
1491
0
    }
1492
0
    default:
1493
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1494
330k
    }
1495
329k
    return Status::OK();
1496
330k
}
1497
1498
// NOLINTBEGIN(readability-function-size)
1499
// NOLINTBEGIN(readability-function-cognitive-complexity)
1500
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1501
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1502
                                                 PipelinePtr& cur_pipe, int parent_idx,
1503
                                                 int child_idx,
1504
                                                 const bool followed_by_shuffled_operator,
1505
                                                 const bool require_bucket_distribution,
1506
613k
                                                 OperatorPtr& cache_op) {
1507
613k
    std::vector<DataSinkOperatorPtr> sink_ops;
1508
613k
    Defer defer = Defer([&]() {
1509
612k
        if (op) {
1510
612k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1511
612k
        }
1512
611k
        for (auto& s : sink_ops) {
1513
180k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1514
180k
        }
1515
611k
    });
1516
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1517
    // Therefore, here we need to use a stack-like structure.
1518
613k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1519
613k
    std::stringstream error_msg;
1520
613k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1521
1522
613k
    bool fe_with_old_version = false;
1523
613k
    switch (tnode.node_type) {
1524
164k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1525
164k
        op = std::make_shared<OlapScanOperatorX>(
1526
164k
                pool, tnode, next_operator_id(), descs, _num_instances,
1527
164k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1528
164k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1529
164k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1530
164k
        break;
1531
164k
    }
1532
78
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1533
78
        DCHECK(_query_ctx != nullptr);
1534
78
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1535
78
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1536
78
                                                    _num_instances);
1537
78
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1538
78
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1539
78
        break;
1540
78
    }
1541
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1542
0
        if (config::enable_java_support) {
1543
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1544
0
                                                     _num_instances);
1545
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1546
0
        } else {
1547
0
            return Status::InternalError(
1548
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1549
0
                    "to true and restart be.");
1550
0
        }
1551
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1552
0
        break;
1553
0
    }
1554
2.69k
    case TPlanNodeType::FILE_SCAN_NODE: {
1555
2.69k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1556
2.69k
                                                 _num_instances);
1557
2.69k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1558
2.69k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1559
2.69k
        break;
1560
2.69k
    }
1561
115k
    case TPlanNodeType::EXCHANGE_NODE: {
1562
115k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1563
115k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1564
18.4E
                                  : 0;
1565
115k
        DCHECK_GT(num_senders, 0);
1566
115k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1567
115k
                                                       num_senders);
1568
115k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1569
115k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1570
115k
        break;
1571
115k
    }
1572
119k
    case TPlanNodeType::AGGREGATION_NODE: {
1573
119k
        if (tnode.agg_node.grouping_exprs.empty() &&
1574
119k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1575
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1576
0
                                         ": group by and output is empty");
1577
0
        }
1578
119k
        bool need_create_cache_op =
1579
119k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1580
119k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1581
10
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1582
10
            auto cache_source_id = next_operator_id();
1583
10
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1584
10
                                                        _params.fragment.query_cache_param);
1585
10
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1586
1587
10
            const auto downstream_pipeline_id = cur_pipe->id();
1588
10
            if (!_dag.contains(downstream_pipeline_id)) {
1589
10
                _dag.insert({downstream_pipeline_id, {}});
1590
10
            }
1591
10
            new_pipe = add_pipeline(cur_pipe);
1592
10
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1593
1594
10
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1595
10
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1596
10
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1597
10
            return Status::OK();
1598
10
        };
1599
119k
        const bool group_by_limit_opt =
1600
119k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1601
1602
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1603
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1604
119k
        const bool enable_spill = _runtime_state->enable_spill() &&
1605
119k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1606
119k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1607
119k
                                      tnode.agg_node.use_streaming_preaggregation &&
1608
119k
                                      !tnode.agg_node.grouping_exprs.empty();
1609
        // TODO: distinct streaming agg does not support spill.
1610
119k
        const bool can_use_distinct_streaming_agg =
1611
119k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1612
119k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1613
119k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1614
119k
                _params.query_options.enable_distinct_streaming_aggregation;
1615
1616
119k
        if (can_use_distinct_streaming_agg) {
1617
86.8k
            if (need_create_cache_op) {
1618
8
                PipelinePtr new_pipe;
1619
8
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1620
1621
8
                cache_op = op;
1622
8
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1623
8
                                                                     tnode, descs);
1624
8
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1625
8
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1626
8
                cur_pipe = new_pipe;
1627
86.8k
            } else {
1628
86.8k
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1629
86.8k
                                                                     tnode, descs);
1630
86.8k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1631
86.8k
            }
1632
86.8k
        } else if (is_streaming_agg) {
1633
1.28k
            if (need_create_cache_op) {
1634
0
                PipelinePtr new_pipe;
1635
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1636
0
                cache_op = op;
1637
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1638
0
                                                             descs);
1639
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1640
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1641
0
                cur_pipe = new_pipe;
1642
1.28k
            } else {
1643
1.28k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1644
1.28k
                                                             descs);
1645
1.28k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1646
1.28k
            }
1647
31.4k
        } else {
1648
            // create new pipeline to add query cache operator
1649
31.4k
            PipelinePtr new_pipe;
1650
31.4k
            if (need_create_cache_op) {
1651
2
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1652
2
                cache_op = op;
1653
2
            }
1654
1655
31.4k
            if (enable_spill) {
1656
120
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1657
120
                                                                     next_operator_id(), descs);
1658
31.2k
            } else {
1659
31.2k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1660
31.2k
            }
1661
31.4k
            if (need_create_cache_op) {
1662
2
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1663
2
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1664
2
                cur_pipe = new_pipe;
1665
31.4k
            } else {
1666
31.4k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1667
31.4k
            }
1668
1669
31.4k
            const auto downstream_pipeline_id = cur_pipe->id();
1670
31.4k
            if (!_dag.contains(downstream_pipeline_id)) {
1671
30.3k
                _dag.insert({downstream_pipeline_id, {}});
1672
30.3k
            }
1673
31.4k
            cur_pipe = add_pipeline(cur_pipe);
1674
31.4k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1675
1676
31.4k
            if (enable_spill) {
1677
120
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1678
120
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1679
31.2k
            } else {
1680
31.2k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1681
31.2k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1682
31.2k
            }
1683
31.4k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1684
31.4k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1685
31.4k
        }
1686
119k
        break;
1687
119k
    }
1688
119k
    case TPlanNodeType::BUCKETED_AGGREGATION_NODE: {
1689
92
        if (tnode.bucketed_agg_node.grouping_exprs.empty()) {
1690
0
            return Status::InternalError(
1691
0
                    "Bucketed aggregation node {} should not be used without group by keys",
1692
0
                    tnode.node_id);
1693
0
        }
1694
1695
        // Create source operator (goes on the current / downstream pipeline).
1696
92
        op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1697
92
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1698
1699
        // Create a new pipeline for the sink side.
1700
92
        const auto downstream_pipeline_id = cur_pipe->id();
1701
92
        if (!_dag.contains(downstream_pipeline_id)) {
1702
92
            _dag.insert({downstream_pipeline_id, {}});
1703
92
        }
1704
92
        cur_pipe = add_pipeline(cur_pipe);
1705
92
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1706
1707
        // Create sink operator.
1708
92
        sink_ops.push_back(std::make_shared<BucketedAggSinkOperatorX>(
1709
92
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1710
92
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1711
92
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1712
1713
        // Pre-register a single shared state for ALL instances so that every
1714
        // sink instance writes its per-instance hash table into the same
1715
        // BucketedAggSharedState and every source instance can merge across
1716
        // all of them.
1717
92
        {
1718
92
            auto shared_state = BucketedAggSharedState::create_shared();
1719
92
            shared_state->id = op->operator_id();
1720
92
            shared_state->related_op_ids.insert(op->operator_id());
1721
1722
641
            for (int i = 0; i < _num_instances; i++) {
1723
549
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1724
549
                                                             "BUCKETED_AGG_SINK_DEPENDENCY");
1725
549
                sink_dep->set_shared_state(shared_state.get());
1726
549
                shared_state->sink_deps.push_back(sink_dep);
1727
549
            }
1728
92
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1729
92
                                                     op->node_id(), "BUCKETED_AGG_SOURCE");
1730
92
            _op_id_to_shared_state.insert(
1731
92
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1732
92
        }
1733
92
        break;
1734
92
    }
1735
9.62k
    case TPlanNodeType::HASH_JOIN_NODE: {
1736
9.62k
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1737
9.62k
                                       tnode.hash_join_node.is_broadcast_join;
1738
9.62k
        const auto enable_spill = _runtime_state->enable_spill();
1739
9.62k
        if (enable_spill && !is_broadcast_join) {
1740
0
            auto tnode_ = tnode;
1741
0
            tnode_.runtime_filters.clear();
1742
0
            auto inner_probe_operator =
1743
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1744
1745
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1746
            // So here use `tnode_` which has no runtime filters.
1747
0
            auto probe_side_inner_sink_operator =
1748
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1749
1750
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1751
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1752
1753
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1754
0
                    pool, tnode_, next_operator_id(), descs);
1755
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1756
0
                                                inner_probe_operator);
1757
0
            op = std::move(probe_operator);
1758
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1759
1760
0
            const auto downstream_pipeline_id = cur_pipe->id();
1761
0
            if (!_dag.contains(downstream_pipeline_id)) {
1762
0
                _dag.insert({downstream_pipeline_id, {}});
1763
0
            }
1764
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1765
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1766
1767
0
            auto inner_sink_operator =
1768
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1769
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1770
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1771
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1772
1773
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1774
0
            sink_ops.push_back(std::move(sink_operator));
1775
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1776
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1777
1778
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1779
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1780
9.62k
        } else {
1781
9.62k
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1782
9.62k
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1783
1784
9.62k
            const auto downstream_pipeline_id = cur_pipe->id();
1785
9.62k
            if (!_dag.contains(downstream_pipeline_id)) {
1786
7.96k
                _dag.insert({downstream_pipeline_id, {}});
1787
7.96k
            }
1788
9.62k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1789
9.62k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1790
1791
9.62k
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1792
9.62k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1793
9.62k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1794
9.62k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1795
1796
9.62k
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1797
9.62k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1798
9.62k
        }
1799
9.62k
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1800
4.29k
            std::shared_ptr<HashJoinSharedState> shared_state =
1801
4.29k
                    HashJoinSharedState::create_shared(_num_instances);
1802
20.3k
            for (int i = 0; i < _num_instances; i++) {
1803
16.0k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1804
16.0k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1805
16.0k
                sink_dep->set_shared_state(shared_state.get());
1806
16.0k
                shared_state->sink_deps.push_back(sink_dep);
1807
16.0k
            }
1808
4.29k
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1809
4.29k
                                                     op->node_id(), "HASH_JOIN_PROBE");
1810
4.29k
            _op_id_to_shared_state.insert(
1811
4.29k
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1812
4.29k
        }
1813
9.62k
        break;
1814
9.62k
    }
1815
2.18k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1816
2.18k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1817
2.18k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1818
1819
2.18k
        const auto downstream_pipeline_id = cur_pipe->id();
1820
2.18k
        if (!_dag.contains(downstream_pipeline_id)) {
1821
1.94k
            _dag.insert({downstream_pipeline_id, {}});
1822
1.94k
        }
1823
2.18k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1824
2.18k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1825
1826
2.18k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1827
2.18k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1828
2.18k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1829
2.18k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1830
2.18k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1831
2.18k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1832
2.18k
        break;
1833
2.18k
    }
1834
50.3k
    case TPlanNodeType::UNION_NODE: {
1835
50.3k
        int child_count = tnode.num_children;
1836
50.3k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1837
50.3k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1838
1839
50.3k
        const auto downstream_pipeline_id = cur_pipe->id();
1840
50.3k
        if (!_dag.contains(downstream_pipeline_id)) {
1841
49.9k
            _dag.insert({downstream_pipeline_id, {}});
1842
49.9k
        }
1843
51.4k
        for (int i = 0; i < child_count; i++) {
1844
1.08k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1845
1.08k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1846
1.08k
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1847
1.08k
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1848
1.08k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1849
1.08k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1850
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1851
1.08k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1852
1.08k
        }
1853
50.3k
        break;
1854
50.3k
    }
1855
50.3k
    case TPlanNodeType::SORT_NODE: {
1856
32.9k
        const auto should_spill = _runtime_state->enable_spill() &&
1857
32.9k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1858
32.9k
        const bool use_local_merge =
1859
32.9k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1860
32.9k
        if (should_spill) {
1861
7
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1862
32.9k
        } else if (use_local_merge) {
1863
30.9k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1864
30.9k
                                                                 descs);
1865
30.9k
        } else {
1866
2.02k
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1867
2.02k
        }
1868
32.9k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1869
1870
32.9k
        const auto downstream_pipeline_id = cur_pipe->id();
1871
32.9k
        if (!_dag.contains(downstream_pipeline_id)) {
1872
32.9k
            _dag.insert({downstream_pipeline_id, {}});
1873
32.9k
        }
1874
32.9k
        cur_pipe = add_pipeline(cur_pipe);
1875
32.9k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1876
1877
32.9k
        if (should_spill) {
1878
7
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1879
7
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1880
32.9k
        } else {
1881
32.9k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1882
32.9k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1883
32.9k
        }
1884
32.9k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1885
32.9k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1886
32.9k
        break;
1887
32.9k
    }
1888
32.9k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1889
70
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1890
70
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1891
1892
70
        const auto downstream_pipeline_id = cur_pipe->id();
1893
70
        if (!_dag.contains(downstream_pipeline_id)) {
1894
70
            _dag.insert({downstream_pipeline_id, {}});
1895
70
        }
1896
70
        cur_pipe = add_pipeline(cur_pipe);
1897
70
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1898
1899
70
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1900
70
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1901
70
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1902
70
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1903
70
        break;
1904
70
    }
1905
1.79k
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1906
1.79k
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1907
1.79k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1908
1909
1.79k
        const auto downstream_pipeline_id = cur_pipe->id();
1910
1.79k
        if (!_dag.contains(downstream_pipeline_id)) {
1911
1.77k
            _dag.insert({downstream_pipeline_id, {}});
1912
1.77k
        }
1913
1.79k
        cur_pipe = add_pipeline(cur_pipe);
1914
1.79k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1915
1916
1.79k
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1917
1.79k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1918
1.79k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1919
1.79k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1920
1.79k
        break;
1921
1.79k
    }
1922
1.79k
    case TPlanNodeType::MATERIALIZATION_NODE: {
1923
794
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1924
794
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1925
794
        break;
1926
794
    }
1927
794
    case TPlanNodeType::INTERSECT_NODE: {
1928
168
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1929
168
                                                                      cur_pipe, sink_ops));
1930
168
        break;
1931
168
    }
1932
168
    case TPlanNodeType::EXCEPT_NODE: {
1933
159
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1934
159
                                                                       cur_pipe, sink_ops));
1935
159
        break;
1936
159
    }
1937
327
    case TPlanNodeType::REPEAT_NODE: {
1938
327
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1939
327
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1940
327
        break;
1941
327
    }
1942
916
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1943
916
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1944
916
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1945
916
        break;
1946
916
    }
1947
916
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1948
18
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1949
18
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1950
18
        break;
1951
18
    }
1952
1.45k
    case TPlanNodeType::EMPTY_SET_NODE: {
1953
1.45k
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1954
1.45k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1955
1.45k
        break;
1956
1.45k
    }
1957
1.45k
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1958
288
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1959
288
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1960
288
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1961
288
        break;
1962
288
    }
1963
1.54k
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1964
1.54k
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1965
1.54k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1966
1.54k
        break;
1967
1.54k
    }
1968
4.58k
    case TPlanNodeType::META_SCAN_NODE: {
1969
4.58k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1970
4.58k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1971
4.58k
        break;
1972
4.58k
    }
1973
4.58k
    case TPlanNodeType::SELECT_NODE: {
1974
599
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1975
599
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1976
599
        break;
1977
599
    }
1978
599
    case TPlanNodeType::REC_CTE_NODE: {
1979
163
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1980
163
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1981
1982
163
        const auto downstream_pipeline_id = cur_pipe->id();
1983
163
        if (!_dag.contains(downstream_pipeline_id)) {
1984
159
            _dag.insert({downstream_pipeline_id, {}});
1985
159
        }
1986
1987
163
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1988
163
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1989
1990
163
        DataSinkOperatorPtr anchor_sink;
1991
163
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1992
163
                                                                  op->operator_id(), tnode, descs);
1993
163
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1994
163
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1995
163
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1996
1997
163
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1998
163
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1999
2000
163
        DataSinkOperatorPtr rec_sink;
2001
163
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
2002
163
                                                         tnode, descs);
2003
163
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
2004
163
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
2005
163
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
2006
2007
163
        break;
2008
163
    }
2009
2.18k
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
2010
2.18k
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
2011
2.18k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2012
2.18k
        break;
2013
2.18k
    }
2014
101k
    case TPlanNodeType::LOCAL_EXCHANGE_NODE: {
2015
101k
        op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs);
2016
        // The downstream pipeline (containing LocalExchangeSource) must have
2017
        // _num_instances tasks — matching BE-native _inherit_pipeline_properties
2018
        // which sets pipe_with_source.set_num_tasks(_num_instances).
2019
        // Without this, when the parent pipeline was reduced by a serial operator
2020
        // (e.g., serial Exchange with use_serial_exchange=true, or UNPARTITIONED
2021
        // Exchange), the downstream inherits the reduced num_tasks via
2022
        // add_pipeline(parent).  The deferred exchanger creates _num_instances
2023
        // channels but only fewer source tasks initialize mem_counters — the
2024
        // sink round-robins to all channels and crashes on uninitialized ones.
2025
101k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2026
        // Restore downstream pipeline's num_tasks (mirroring _inherit_pipeline_properties:
2027
        // downstream keeps _num_instances, upstream gets the serial/reduced count)
2028
101k
        cur_pipe->set_num_tasks(_num_instances);
2029
2030
101k
        const auto downstream_pipeline_id = cur_pipe->id();
2031
101k
        if (!_dag.contains(downstream_pipeline_id)) {
2032
97.1k
            _dag.insert({downstream_pipeline_id, {}});
2033
97.1k
        }
2034
101k
        cur_pipe = add_pipeline(cur_pipe);
2035
        // If this local exchange was inserted because of a serial scan (is_serial_operator),
2036
        // the upstream pipeline (cur_pipe) should have num_tasks=1 (only 1 scan task).
2037
        // We set this now so the exchanger is created with the correct sender count.
2038
        // Child operators added later (serial scan) will also set num_tasks=1, which is
2039
        // consistent with this.
2040
101k
        if (op->is_serial_operator() && _parallel_instances > 0) {
2041
0
            cur_pipe->set_num_tasks(_parallel_instances);
2042
0
        }
2043
101k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
2044
101k
        int num_partitions = 0;
2045
101k
        std::map<int, int> shuffle_id_to_instance_idx;
2046
101k
        auto partition_type = tnode.local_exchange_node.partition_type;
2047
101k
        switch (partition_type) {
2048
386
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
2049
386
            num_partitions = _params.num_buckets;
2050
386
            shuffle_id_to_instance_idx = _params.bucket_seq_to_instance_idx;
2051
386
            break;
2052
21.7k
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
2053
107k
            for (int i = 0; i < _num_instances; i++) {
2054
86.2k
                shuffle_id_to_instance_idx[i] = i;
2055
86.2k
            }
2056
21.7k
            num_partitions = _num_instances;
2057
21.7k
            break;
2058
5
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
2059
5
            num_partitions = _total_instances;
2060
5
            shuffle_id_to_instance_idx = _params.shuffle_idx_to_instance_idx;
2061
5
            break;
2062
78.9k
        default:
2063
78.9k
            break;
2064
101k
        }
2065
101k
        auto local_exchange_id = op->operator_id();
2066
101k
        auto sink_id = next_sink_operator_id();
2067
101k
        DataSinkOperatorPtr sink = std::make_shared<LocalExchangeSinkOperatorX>(
2068
101k
                sink_id, local_exchange_id, tnode, num_partitions, shuffle_id_to_instance_idx);
2069
101k
        sink_ops.push_back(sink);
2070
101k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
2071
101k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
2072
2073
        // For FE-planned local exchange, we need to:
2074
        // 1. Initialize the partitioner for hash shuffle types
2075
        // 2. Defer exchanger creation until after the full plan tree is built
2076
        //    (child operators like serial ExchangeNode may change cur_pipe->num_tasks())
2077
        // 3. Register shared state so pipeline tasks can find it
2078
101k
        RETURN_IF_ERROR(static_cast<LocalExchangeSinkOperatorX*>(cur_pipe->sink())
2079
101k
                                ->init_partitioner(_runtime_state.get()));
2080
2081
101k
        int free_blocks_limit =
2082
101k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
2083
101k
                        ? cast_set<int>(
2084
101k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
2085
101k
                        : 0;
2086
101k
        auto shared_state = LocalExchangeSharedState::create_shared(_num_instances);
2087
101k
        shared_state->create_source_dependencies(_num_instances, local_exchange_id,
2088
101k
                                                 local_exchange_id, "LOCAL_EXCHANGE_OPERATOR");
2089
101k
        shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
2090
101k
        _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
2091
        // Defer exchanger creation: sender count depends on final upstream num_tasks
2092
101k
        _deferred_exchangers.push_back({shared_state, cur_pipe, partition_type, num_partitions,
2093
101k
                                        free_blocks_limit, local_exchange_id, sink_id});
2094
101k
        break;
2095
101k
    }
2096
0
    default:
2097
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
2098
0
                                     print_plan_node_type(tnode.node_type));
2099
613k
    }
2100
611k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
2101
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
2102
0
        op->set_serial_operator();
2103
0
    }
2104
2105
611k
    return Status::OK();
2106
613k
}
2107
// NOLINTEND(readability-function-cognitive-complexity)
2108
// NOLINTEND(readability-function-size)
2109
2110
template <bool is_intersect>
2111
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
2112
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
2113
327
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2114
327
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2115
327
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2116
2117
327
    const auto downstream_pipeline_id = cur_pipe->id();
2118
327
    if (!_dag.contains(downstream_pipeline_id)) {
2119
297
        _dag.insert({downstream_pipeline_id, {}});
2120
297
    }
2121
2122
1.07k
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2123
750
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2124
750
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2125
2126
750
        if (child_id == 0) {
2127
327
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2128
327
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2129
423
        } else {
2130
423
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2131
423
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2132
423
        }
2133
750
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2134
750
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2135
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2136
750
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2137
750
    }
2138
2139
327
    return Status::OK();
2140
327
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
2113
168
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2114
168
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2115
168
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2116
2117
168
    const auto downstream_pipeline_id = cur_pipe->id();
2118
168
    if (!_dag.contains(downstream_pipeline_id)) {
2119
154
        _dag.insert({downstream_pipeline_id, {}});
2120
154
    }
2121
2122
585
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2123
417
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2124
417
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2125
2126
417
        if (child_id == 0) {
2127
168
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2128
168
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2129
249
        } else {
2130
249
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2131
249
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2132
249
        }
2133
417
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2134
417
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2135
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2136
417
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2137
417
    }
2138
2139
168
    return Status::OK();
2140
168
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
2113
159
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2114
159
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2115
159
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2116
2117
159
    const auto downstream_pipeline_id = cur_pipe->id();
2118
159
    if (!_dag.contains(downstream_pipeline_id)) {
2119
143
        _dag.insert({downstream_pipeline_id, {}});
2120
143
    }
2121
2122
492
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2123
333
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2124
333
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2125
2126
333
        if (child_id == 0) {
2127
159
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2128
159
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2129
174
        } else {
2130
174
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2131
174
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2132
174
        }
2133
333
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2134
333
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2135
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2136
333
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2137
333
    }
2138
2139
159
    return Status::OK();
2140
159
}
2141
2142
327k
Status PipelineFragmentContext::submit() {
2143
327k
    if (_submitted) {
2144
0
        return Status::InternalError("submitted");
2145
0
    }
2146
327k
    _submitted = true;
2147
2148
327k
    int submit_tasks = 0;
2149
327k
    Status st;
2150
327k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
2151
969k
    for (auto& task : _tasks) {
2152
1.50M
        for (auto& t : task) {
2153
1.50M
            st = scheduler->submit(t.first);
2154
1.50M
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
2155
1.50M
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
2156
1.50M
            if (!st) {
2157
0
                cancel(Status::InternalError("submit context to executor fail"));
2158
0
                std::lock_guard<std::mutex> l(_task_mutex);
2159
0
                _total_tasks = submit_tasks;
2160
0
                break;
2161
0
            }
2162
1.50M
            submit_tasks++;
2163
1.50M
        }
2164
969k
    }
2165
327k
    if (!st.ok()) {
2166
0
        bool need_remove = false;
2167
0
        {
2168
0
            std::lock_guard<std::mutex> l(_task_mutex);
2169
0
            if (_closed_tasks >= _total_tasks) {
2170
0
                need_remove = _close_fragment_instance();
2171
0
            }
2172
0
        }
2173
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2174
0
        if (need_remove) {
2175
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2176
0
        }
2177
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
2178
0
                                     BackendOptions::get_localhost());
2179
327k
    } else {
2180
327k
        return st;
2181
327k
    }
2182
327k
}
2183
2184
0
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
2185
0
    if (_runtime_state->enable_profile()) {
2186
0
        std::stringstream ss;
2187
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2188
0
            runtime_profile_ptr->pretty_print(&ss);
2189
0
        }
2190
2191
0
        if (_runtime_state->load_channel_profile()) {
2192
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2193
0
        }
2194
2195
0
        auto profile_str =
2196
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
2197
0
                            this->_fragment_id, extra_info, ss.str());
2198
0
        LOG_LONG_STRING(INFO, profile_str);
2199
0
    }
2200
0
}
2201
// If all pipeline tasks binded to the fragment instance are finished, then we could
2202
// close the fragment instance.
2203
// Returns true if the caller should call remove_pipeline_context() **after** releasing
2204
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
2205
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
2206
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
2207
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
2208
// (via debug_string()).
2209
329k
bool PipelineFragmentContext::_close_fragment_instance() {
2210
329k
    if (_is_fragment_instance_closed) {
2211
0
        return false;
2212
0
    }
2213
329k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
2214
329k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
2215
329k
    if (!_need_notify_close) {
2216
326k
        auto st = send_report(true);
2217
326k
        if (!st) {
2218
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
2219
0
                                        print_id(_query_id), _fragment_id, st.to_string());
2220
0
        }
2221
326k
    }
2222
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
2223
    // Since stream load does not have someting like coordinator on FE, so
2224
    // backend can not report profile to FE, ant its profile can not be shown
2225
    // in the same way with other query. So we print the profile content to info log.
2226
2227
329k
    if (_runtime_state->enable_profile() &&
2228
329k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
2229
2.41k
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
2230
2.41k
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
2231
0
        std::stringstream ss;
2232
        // Compute the _local_time_percent before pretty_print the runtime_profile
2233
        // Before add this operation, the print out like that:
2234
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
2235
        // After add the operation, the print out like that:
2236
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
2237
        // We can easily know the exec node execute time without child time consumed.
2238
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2239
0
            runtime_profile_ptr->pretty_print(&ss);
2240
0
        }
2241
2242
0
        if (_runtime_state->load_channel_profile()) {
2243
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2244
0
        }
2245
2246
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
2247
0
    }
2248
2249
329k
    if (_query_ctx->enable_profile()) {
2250
2.41k
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
2251
2.41k
                                         collect_realtime_load_channel_profile());
2252
2.41k
    }
2253
2254
    // Return whether the caller needs to remove from the pipeline map.
2255
    // The caller must do this after releasing _task_mutex.
2256
329k
    return !_need_notify_close;
2257
329k
}
2258
2259
1.49M
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
2260
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
2261
1.49M
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
2262
1.49M
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
2263
513k
        if (_dag.contains(pipeline_id)) {
2264
224k
            for (auto dep : _dag[pipeline_id]) {
2265
184k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
2266
184k
            }
2267
224k
        }
2268
513k
    }
2269
1.49M
    bool need_remove = false;
2270
1.49M
    {
2271
1.49M
        std::lock_guard<std::mutex> l(_task_mutex);
2272
1.49M
        ++_closed_tasks;
2273
        // Update query-level finished task progress in real time.
2274
1.49M
        _query_ctx->inc_finished_task_num();
2275
1.49M
        if (_closed_tasks >= _total_tasks) {
2276
329k
            need_remove = _close_fragment_instance();
2277
329k
        }
2278
1.49M
    }
2279
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2280
1.49M
    if (need_remove) {
2281
326k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2282
326k
    }
2283
1.49M
}
2284
2285
42.4k
std::string PipelineFragmentContext::get_load_error_url() {
2286
42.4k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
2287
0
        return to_load_error_http_path(str);
2288
0
    }
2289
94.8k
    for (auto& tasks : _tasks) {
2290
158k
        for (auto& task : tasks) {
2291
158k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
2292
182
                return to_load_error_http_path(str);
2293
182
            }
2294
158k
        }
2295
94.8k
    }
2296
42.2k
    return "";
2297
42.4k
}
2298
2299
42.4k
std::string PipelineFragmentContext::get_first_error_msg() {
2300
42.4k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
2301
0
        return str;
2302
0
    }
2303
94.8k
    for (auto& tasks : _tasks) {
2304
158k
        for (auto& task : tasks) {
2305
158k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
2306
182
                return str;
2307
182
            }
2308
158k
        }
2309
94.8k
    }
2310
42.2k
    return "";
2311
42.4k
}
2312
2313
0
std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const {
2314
0
    std::stringstream url;
2315
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
2316
0
        << "/api/_download_load?"
2317
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
2318
0
    return url.str();
2319
0
}
2320
2321
37.1k
void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) {
2322
37.1k
    DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
2323
37.1k
        int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
2324
37.1k
        LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
2325
37.1k
        std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
2326
37.1k
        LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
2327
37.1k
    });
2328
2329
37.1k
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
2330
37.1k
    if (req.coord_addr.hostname == "external") {
2331
        // External query (flink/spark read tablets) not need to report to FE.
2332
0
        return;
2333
0
    }
2334
37.1k
    int callback_retries = 10;
2335
37.1k
    const int sleep_ms = 1000;
2336
37.1k
    Status exec_status = req.status;
2337
37.1k
    Status coord_status;
2338
37.1k
    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
2339
37.1k
    do {
2340
37.1k
        coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
2341
37.1k
                                                            req.coord_addr, &coord_status);
2342
37.1k
        if (!coord_status.ok()) {
2343
0
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
2344
0
        }
2345
37.1k
    } while (!coord_status.ok() && callback_retries-- > 0);
2346
2347
37.1k
    if (!coord_status.ok()) {
2348
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
2349
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
2350
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
2351
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
2352
0
        return;
2353
0
    }
2354
2355
37.1k
    TReportExecStatusParams params;
2356
37.1k
    params.protocol_version = FrontendServiceVersion::V1;
2357
37.1k
    params.__set_query_id(req.query_id);
2358
37.1k
    params.__set_backend_num(req.backend_num);
2359
37.1k
    params.__set_fragment_instance_id(req.fragment_instance_id);
2360
37.1k
    params.__set_fragment_id(req.fragment_id);
2361
37.1k
    params.__set_status(exec_status.to_thrift());
2362
37.1k
    params.__set_done(req.done);
2363
37.1k
    params.__set_query_type(req.runtime_state->query_type());
2364
37.1k
    params.__isset.profile = false;
2365
2366
37.1k
    DCHECK(req.runtime_state != nullptr);
2367
2368
37.1k
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
2369
33.1k
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2370
33.1k
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2371
33.1k
    } else {
2372
4.00k
        DCHECK(!req.runtime_states.empty());
2373
4.00k
        if (!req.runtime_state->output_files().empty()) {
2374
0
            params.__isset.delta_urls = true;
2375
0
            for (auto& it : req.runtime_state->output_files()) {
2376
0
                params.delta_urls.push_back(_to_http_path(it));
2377
0
            }
2378
0
        }
2379
4.00k
        if (!params.delta_urls.empty()) {
2380
0
            params.__isset.delta_urls = true;
2381
0
        }
2382
4.00k
    }
2383
2384
37.1k
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
2385
37.1k
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
2386
37.1k
    static std::string s_unselected_rows = "unselected.rows";
2387
37.1k
    int64_t num_rows_load_success = 0;
2388
37.1k
    int64_t num_rows_load_filtered = 0;
2389
37.1k
    int64_t num_rows_load_unselected = 0;
2390
37.1k
    if (req.runtime_state->num_rows_load_total() > 0 ||
2391
37.1k
        req.runtime_state->num_rows_load_filtered() > 0 ||
2392
37.1k
        req.runtime_state->num_finished_range() > 0) {
2393
0
        params.__isset.load_counters = true;
2394
2395
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
2396
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
2397
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
2398
0
        params.__isset.fragment_instance_reports = true;
2399
0
        TFragmentInstanceReport t;
2400
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
2401
0
        t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
2402
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2403
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2404
0
        params.fragment_instance_reports.push_back(t);
2405
37.1k
    } else if (!req.runtime_states.empty()) {
2406
109k
        for (auto* rs : req.runtime_states) {
2407
109k
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
2408
109k
                rs->num_finished_range() > 0) {
2409
30.9k
                params.__isset.load_counters = true;
2410
30.9k
                num_rows_load_success += rs->num_rows_load_success();
2411
30.9k
                num_rows_load_filtered += rs->num_rows_load_filtered();
2412
30.9k
                num_rows_load_unselected += rs->num_rows_load_unselected();
2413
30.9k
                params.__isset.fragment_instance_reports = true;
2414
30.9k
                TFragmentInstanceReport t;
2415
30.9k
                t.__set_fragment_instance_id(rs->fragment_instance_id());
2416
30.9k
                t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
2417
30.9k
                t.__set_loaded_rows(rs->num_rows_load_total());
2418
30.9k
                t.__set_loaded_bytes(rs->num_bytes_load_total());
2419
30.9k
                params.fragment_instance_reports.push_back(t);
2420
30.9k
            }
2421
109k
        }
2422
37.1k
    }
2423
37.1k
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
2424
37.1k
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
2425
37.1k
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
2426
2427
37.1k
    if (!req.load_error_url.empty()) {
2428
169
        params.__set_tracking_url(req.load_error_url);
2429
169
    }
2430
37.1k
    if (!req.first_error_msg.empty()) {
2431
169
        params.__set_first_error_msg(req.first_error_msg);
2432
169
    }
2433
109k
    for (auto* rs : req.runtime_states) {
2434
109k
        if (rs->wal_id() > 0) {
2435
106
            params.__set_txn_id(rs->wal_id());
2436
106
            params.__set_label(rs->import_label());
2437
106
        }
2438
109k
    }
2439
37.1k
    if (!req.runtime_state->export_output_files().empty()) {
2440
0
        params.__isset.export_files = true;
2441
0
        params.export_files = req.runtime_state->export_output_files();
2442
37.1k
    } else if (!req.runtime_states.empty()) {
2443
109k
        for (auto* rs : req.runtime_states) {
2444
109k
            if (!rs->export_output_files().empty()) {
2445
0
                params.__isset.export_files = true;
2446
0
                params.export_files.insert(params.export_files.end(),
2447
0
                                           rs->export_output_files().begin(),
2448
0
                                           rs->export_output_files().end());
2449
0
            }
2450
109k
        }
2451
37.1k
    }
2452
37.1k
    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
2453
0
        params.__isset.commitInfos = true;
2454
0
        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
2455
37.1k
    } else if (!req.runtime_states.empty()) {
2456
109k
        for (auto* rs : req.runtime_states) {
2457
109k
            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
2458
25.3k
                params.__isset.commitInfos = true;
2459
25.3k
                params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
2460
25.3k
            }
2461
109k
        }
2462
37.1k
    }
2463
37.1k
    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
2464
0
        params.__isset.errorTabletInfos = true;
2465
0
        params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
2466
37.1k
    } else if (!req.runtime_states.empty()) {
2467
109k
        for (auto* rs : req.runtime_states) {
2468
109k
            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
2469
0
                params.__isset.errorTabletInfos = true;
2470
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
2471
0
                                               rs_eti.end());
2472
0
            }
2473
109k
        }
2474
37.1k
    }
2475
37.1k
    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
2476
0
        params.__isset.hive_partition_updates = true;
2477
0
        params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
2478
0
                                             hpu.end());
2479
37.1k
    } else if (!req.runtime_states.empty()) {
2480
109k
        for (auto* rs : req.runtime_states) {
2481
109k
            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
2482
0
                params.__isset.hive_partition_updates = true;
2483
0
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
2484
0
                                                     rs_hpu.begin(), rs_hpu.end());
2485
0
            }
2486
109k
        }
2487
37.1k
    }
2488
37.1k
    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
2489
0
        params.__isset.iceberg_commit_datas = true;
2490
0
        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
2491
0
                                           icd.end());
2492
37.1k
    } else if (!req.runtime_states.empty()) {
2493
109k
        for (auto* rs : req.runtime_states) {
2494
109k
            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
2495
0
                params.__isset.iceberg_commit_datas = true;
2496
0
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
2497
0
                                                   rs_icd.begin(), rs_icd.end());
2498
0
            }
2499
109k
        }
2500
37.1k
    }
2501
2502
37.1k
    if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
2503
0
        params.__isset.mc_commit_datas = true;
2504
0
        params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
2505
37.1k
    } else if (!req.runtime_states.empty()) {
2506
109k
        for (auto* rs : req.runtime_states) {
2507
109k
            if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
2508
0
                params.__isset.mc_commit_datas = true;
2509
0
                params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
2510
0
                                              rs_mcd.end());
2511
0
            }
2512
109k
        }
2513
37.1k
    }
2514
2515
37.1k
    req.runtime_state->get_unreported_errors(&(params.error_log));
2516
37.1k
    params.__isset.error_log = (!params.error_log.empty());
2517
2518
37.1k
    if (_exec_env->cluster_info()->backend_id != 0) {
2519
37.0k
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
2520
37.0k
    }
2521
2522
37.1k
    TReportExecStatusResult res;
2523
37.1k
    Status rpc_status;
2524
2525
37.1k
    VLOG_DEBUG << "reportExecStatus params is "
2526
60
               << apache::thrift::ThriftDebugString(params).c_str();
2527
37.1k
    if (!exec_status.ok()) {
2528
1.52k
        LOG(WARNING) << "report error status: " << exec_status.msg()
2529
1.52k
                     << " to coordinator: " << req.coord_addr
2530
1.52k
                     << ", query id: " << print_id(req.query_id);
2531
1.52k
    }
2532
37.1k
    try {
2533
37.1k
        try {
2534
37.1k
            (*coord)->reportExecStatus(res, params);
2535
37.1k
        } catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) {
2536
#ifndef ADDRESS_SANITIZER
2537
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
2538
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
2539
                         << req.coord_addr << ", err: " << e.what();
2540
#endif
2541
0
            rpc_status = coord->reopen();
2542
2543
0
            if (!rpc_status.ok()) {
2544
0
                req.cancel_fn(rpc_status);
2545
0
                return;
2546
0
            }
2547
0
            (*coord)->reportExecStatus(res, params);
2548
0
        }
2549
2550
37.1k
        rpc_status = Status::create<false>(res.status);
2551
37.1k
    } catch (apache::thrift::TException& e) {
2552
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
2553
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
2554
0
    }
2555
2556
37.1k
    if (!rpc_status.ok()) {
2557
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
2558
0
                 print_id(req.query_id), rpc_status.to_string());
2559
0
        req.cancel_fn(rpc_status);
2560
0
    }
2561
37.1k
}
2562
2563
329k
Status PipelineFragmentContext::send_report(bool done) {
2564
329k
    Status exec_status = _query_ctx->exec_status();
2565
    // If plan is done successfully, but _is_report_success is false,
2566
    // no need to send report.
2567
    // Load will set _is_report_success to true because load wants to know
2568
    // the process.
2569
329k
    if (!_is_report_success && done && exec_status.ok()) {
2570
292k
        return Status::OK();
2571
292k
    }
2572
2573
    // If both _is_report_success and _is_report_on_cancel are false,
2574
    // which means no matter query is success or failed, no report is needed.
2575
    // This may happen when the query limit reached and
2576
    // a internal cancellation being processed
2577
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
2578
    // be set to false, to avoid sending fault report to FE.
2579
37.2k
    if (!_is_report_success && !_is_report_on_cancel) {
2580
80
        if (done) {
2581
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
2582
80
            return Status::OK();
2583
80
        }
2584
0
        return Status::NeedSendAgain("");
2585
80
    }
2586
2587
37.1k
    std::vector<RuntimeState*> runtime_states;
2588
2589
72.1k
    for (auto& tasks : _tasks) {
2590
109k
        for (auto& task : tasks) {
2591
109k
            runtime_states.push_back(task.second.get());
2592
109k
        }
2593
72.1k
    }
2594
2595
37.1k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
2596
37.1k
                                        ? get_load_error_url()
2597
37.1k
                                        : _query_ctx->get_load_error_url();
2598
37.1k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2599
37.1k
                                          ? get_first_error_msg()
2600
37.1k
                                          : _query_ctx->get_first_error_msg();
2601
2602
37.1k
    ReportStatusRequest req {.status = exec_status,
2603
37.1k
                             .runtime_states = runtime_states,
2604
37.1k
                             .done = done || !exec_status.ok(),
2605
37.1k
                             .coord_addr = _query_ctx->coord_addr,
2606
37.1k
                             .query_id = _query_id,
2607
37.1k
                             .fragment_id = _fragment_id,
2608
37.1k
                             .fragment_instance_id = TUniqueId(),
2609
37.1k
                             .backend_num = -1,
2610
37.1k
                             .runtime_state = _runtime_state.get(),
2611
37.1k
                             .load_error_url = load_eror_url,
2612
37.1k
                             .first_error_msg = first_error_msg,
2613
37.1k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2614
37.1k
    auto ctx = std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this());
2615
37.1k
    return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() {
2616
37.1k
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
2617
37.1k
        _coordinator_callback(req);
2618
37.1k
        if (!req.done) {
2619
3.62k
            ctx->refresh_next_report_time();
2620
3.62k
        }
2621
37.1k
    });
2622
37.2k
}
2623
2624
0
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2625
0
    size_t res = 0;
2626
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2627
    // here to traverse the vector.
2628
0
    for (const auto& task_instances : _tasks) {
2629
0
        for (const auto& task : task_instances) {
2630
0
            if (task.first->is_running()) {
2631
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2632
0
                                      << " is running, task: " << (void*)task.first.get()
2633
0
                                      << ", is_running: " << task.first->is_running();
2634
0
                *has_running_task = true;
2635
0
                return 0;
2636
0
            }
2637
2638
0
            size_t revocable_size = task.first->get_revocable_size();
2639
0
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2640
0
                res += revocable_size;
2641
0
            }
2642
0
        }
2643
0
    }
2644
0
    return res;
2645
0
}
2646
2647
0
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2648
0
    std::vector<PipelineTask*> revocable_tasks;
2649
0
    for (const auto& task_instances : _tasks) {
2650
0
        for (const auto& task : task_instances) {
2651
0
            size_t revocable_size_ = task.first->get_revocable_size();
2652
2653
0
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2654
0
                revocable_tasks.emplace_back(task.first.get());
2655
0
            }
2656
0
        }
2657
0
    }
2658
0
    return revocable_tasks;
2659
0
}
2660
2661
38
std::string PipelineFragmentContext::debug_string() {
2662
38
    std::lock_guard<std::mutex> l(_task_mutex);
2663
38
    fmt::memory_buffer debug_string_buffer;
2664
38
    fmt::format_to(debug_string_buffer,
2665
38
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2666
38
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2667
38
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2668
134
    for (size_t j = 0; j < _tasks.size(); j++) {
2669
96
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2670
392
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2671
296
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2672
296
                           _tasks[j][i].first->debug_string());
2673
296
        }
2674
96
    }
2675
2676
38
    return fmt::to_string(debug_string_buffer);
2677
38
}
2678
2679
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2680
2.41k
PipelineFragmentContext::collect_realtime_profile() const {
2681
2.41k
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2682
2683
    // we do not have mutex to protect pipeline_id_to_profile
2684
    // so we need to make sure this funciton is invoked after fragment context
2685
    // has already been prepared.
2686
2.41k
    if (!_prepared) {
2687
0
        std::string msg =
2688
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2689
0
        DCHECK(false) << msg;
2690
0
        LOG_ERROR(msg);
2691
0
        return res;
2692
0
    }
2693
2694
    // Make sure first profile is fragment level profile
2695
2.41k
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2696
2.41k
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2697
2.41k
    res.push_back(fragment_profile);
2698
2699
    // pipeline_id_to_profile is initialized in prepare stage
2700
4.67k
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2701
4.67k
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2702
4.67k
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2703
4.67k
        res.push_back(profile_ptr);
2704
4.67k
    }
2705
2706
2.41k
    return res;
2707
2.41k
}
2708
2709
std::shared_ptr<TRuntimeProfileTree>
2710
2.41k
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2711
    // we do not have mutex to protect pipeline_id_to_profile
2712
    // so we need to make sure this funciton is invoked after fragment context
2713
    // has already been prepared.
2714
2.41k
    if (!_prepared) {
2715
0
        std::string msg =
2716
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2717
0
        DCHECK(false) << msg;
2718
0
        LOG_ERROR(msg);
2719
0
        return nullptr;
2720
0
    }
2721
2722
7.59k
    for (const auto& tasks : _tasks) {
2723
16.3k
        for (const auto& task : tasks) {
2724
16.3k
            if (task.second->load_channel_profile() == nullptr) {
2725
0
                continue;
2726
0
            }
2727
2728
16.3k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2729
2730
16.3k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2731
16.3k
                                                           _runtime_state->profile_level());
2732
16.3k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2733
16.3k
        }
2734
7.59k
    }
2735
2736
2.41k
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2737
2.41k
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2738
2.41k
                                                      _runtime_state->profile_level());
2739
2.41k
    return load_channel_profile;
2740
2.41k
}
2741
2742
// Collect runtime filter IDs registered by all tasks in this PFC.
2743
// Used during recursive CTE stage transitions to know which filters to deregister
2744
// before creating the new PFC for the next recursion round.
2745
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2746
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2747
// the vector sizes do not change, and individual RuntimeState filter sets are
2748
// written only during open() which has completed by the time we reach rerun.
2749
3.50k
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2750
3.50k
    std::set<int> result;
2751
9.76k
    for (const auto& _task : _tasks) {
2752
16.6k
        for (const auto& task : _task) {
2753
16.6k
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2754
16.6k
            result.merge(set);
2755
16.6k
        }
2756
9.76k
    }
2757
3.50k
    if (_runtime_state) {
2758
3.50k
        auto set = _runtime_state->get_deregister_runtime_filter();
2759
3.50k
        result.merge(set);
2760
3.50k
    }
2761
3.50k
    return result;
2762
3.50k
}
2763
2764
331k
void PipelineFragmentContext::_release_resource() {
2765
331k
    std::lock_guard<std::mutex> l(_task_mutex);
2766
    // The memory released by the query end is recorded in the query mem tracker.
2767
331k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2768
331k
    auto st = _query_ctx->exec_status();
2769
970k
    for (auto& _task : _tasks) {
2770
970k
        if (!_task.empty()) {
2771
970k
            _call_back(_task.front().first->runtime_state(), &st);
2772
970k
        }
2773
970k
    }
2774
331k
    _tasks.clear();
2775
331k
    _dag.clear();
2776
331k
    _pip_id_to_pipeline.clear();
2777
331k
    _pipelines.clear();
2778
331k
    _sink.reset();
2779
331k
    _root_op.reset();
2780
331k
    _runtime_filter_mgr_map.clear();
2781
331k
    _op_id_to_shared_state.clear();
2782
331k
}
2783
2784
} // namespace doris