Coverage Report

Created: 2026-06-29 02:19

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/FrontendService.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/PlanNodes_types.h>
25
#include <pthread.h>
26
27
#include <algorithm>
28
#include <cstdlib>
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <fmt/format.h>
31
#include <thrift/Thrift.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
#include <thrift/transport/TTransportException.h>
34
35
#include <chrono> // IWYU pragma: keep
36
#include <map>
37
#include <memory>
38
#include <ostream>
39
#include <utility>
40
41
#include "cloud/config.h"
42
#include "common/cast_set.h"
43
#include "common/config.h"
44
#include "common/exception.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "exec/exchange/local_exchange_sink_operator.h"
48
#include "exec/exchange/local_exchange_source_operator.h"
49
#include "exec/exchange/local_exchanger.h"
50
#include "exec/exchange/vdata_stream_mgr.h"
51
#include "exec/operator/aggregation_sink_operator.h"
52
#include "exec/operator/aggregation_source_operator.h"
53
#include "exec/operator/analytic_sink_operator.h"
54
#include "exec/operator/analytic_source_operator.h"
55
#include "exec/operator/assert_num_rows_operator.h"
56
#include "exec/operator/blackhole_sink_operator.h"
57
#include "exec/operator/bucketed_aggregation_sink_operator.h"
58
#include "exec/operator/bucketed_aggregation_source_operator.h"
59
#include "exec/operator/cache_sink_operator.h"
60
#include "exec/operator/cache_source_operator.h"
61
#include "exec/operator/datagen_operator.h"
62
#include "exec/operator/dict_sink_operator.h"
63
#include "exec/operator/distinct_streaming_aggregation_operator.h"
64
#include "exec/operator/empty_set_operator.h"
65
#include "exec/operator/exchange_sink_operator.h"
66
#include "exec/operator/exchange_source_operator.h"
67
#include "exec/operator/file_scan_operator.h"
68
#include "exec/operator/group_commit_block_sink_operator.h"
69
#include "exec/operator/group_commit_scan_operator.h"
70
#include "exec/operator/hashjoin_build_sink.h"
71
#include "exec/operator/hashjoin_probe_operator.h"
72
#include "exec/operator/hive_table_sink_operator.h"
73
#include "exec/operator/iceberg_delete_sink_operator.h"
74
#include "exec/operator/iceberg_merge_sink_operator.h"
75
#include "exec/operator/iceberg_table_sink_operator.h"
76
#include "exec/operator/jdbc_scan_operator.h"
77
#include "exec/operator/jdbc_table_sink_operator.h"
78
#include "exec/operator/local_merge_sort_source_operator.h"
79
#include "exec/operator/materialization_opertor.h"
80
#include "exec/operator/maxcompute_table_sink_operator.h"
81
#include "exec/operator/memory_scratch_sink_operator.h"
82
#include "exec/operator/meta_scan_operator.h"
83
#include "exec/operator/multi_cast_data_stream_sink.h"
84
#include "exec/operator/multi_cast_data_stream_source.h"
85
#include "exec/operator/nested_loop_join_build_operator.h"
86
#include "exec/operator/nested_loop_join_probe_operator.h"
87
#include "exec/operator/olap_scan_operator.h"
88
#include "exec/operator/olap_table_sink_operator.h"
89
#include "exec/operator/olap_table_sink_v2_operator.h"
90
#include "exec/operator/partition_sort_sink_operator.h"
91
#include "exec/operator/partition_sort_source_operator.h"
92
#include "exec/operator/partitioned_aggregation_sink_operator.h"
93
#include "exec/operator/partitioned_aggregation_source_operator.h"
94
#include "exec/operator/partitioned_hash_join_probe_operator.h"
95
#include "exec/operator/partitioned_hash_join_sink_operator.h"
96
#include "exec/operator/rec_cte_anchor_sink_operator.h"
97
#include "exec/operator/rec_cte_scan_operator.h"
98
#include "exec/operator/rec_cte_sink_operator.h"
99
#include "exec/operator/rec_cte_source_operator.h"
100
#include "exec/operator/repeat_operator.h"
101
#include "exec/operator/result_file_sink_operator.h"
102
#include "exec/operator/result_sink_operator.h"
103
#include "exec/operator/schema_scan_operator.h"
104
#include "exec/operator/select_operator.h"
105
#include "exec/operator/set_probe_sink_operator.h"
106
#include "exec/operator/set_sink_operator.h"
107
#include "exec/operator/set_source_operator.h"
108
#include "exec/operator/sort_sink_operator.h"
109
#include "exec/operator/sort_source_operator.h"
110
#include "exec/operator/spill_iceberg_table_sink_operator.h"
111
#include "exec/operator/spill_sort_sink_operator.h"
112
#include "exec/operator/spill_sort_source_operator.h"
113
#include "exec/operator/streaming_aggregation_operator.h"
114
#include "exec/operator/table_function_operator.h"
115
#include "exec/operator/tvf_table_sink_operator.h"
116
#include "exec/operator/union_sink_operator.h"
117
#include "exec/operator/union_source_operator.h"
118
#include "exec/pipeline/dependency.h"
119
#include "exec/pipeline/pipeline_task.h"
120
#include "exec/pipeline/task_scheduler.h"
121
#include "exec/runtime_filter/runtime_filter_mgr.h"
122
#include "exec/sort/topn_sorter.h"
123
#include "exec/spill/spill_file.h"
124
#include "io/fs/stream_load_pipe.h"
125
#include "load/stream_load/new_load_stream_mgr.h"
126
#include "runtime/exec_env.h"
127
#include "runtime/fragment_mgr.h"
128
#include "runtime/result_buffer_mgr.h"
129
#include "runtime/runtime_state.h"
130
#include "runtime/thread_context.h"
131
#include "service/backend_options.h"
132
#include "util/client_cache.h"
133
#include "util/countdown_latch.h"
134
#include "util/debug_util.h"
135
#include "util/network_util.h"
136
#include "util/uid_util.h"
137
138
namespace doris {
139
PipelineFragmentContext::PipelineFragmentContext(
140
        TUniqueId query_id, const TPipelineFragmentParams& request,
141
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
142
        const std::function<void(RuntimeState*, Status*)>& call_back)
143
61.6k
        : _query_id(std::move(query_id)),
144
61.6k
          _fragment_id(request.fragment_id),
145
61.6k
          _exec_env(exec_env),
146
61.6k
          _query_ctx(std::move(query_ctx)),
147
61.6k
          _call_back(call_back),
148
61.6k
          _is_report_on_cancel(true),
149
61.6k
          _params(request),
150
61.6k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
151
61.6k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
152
61.6k
                                                               : false) {
153
61.6k
    _fragment_watcher.start();
154
61.6k
}
155
156
61.6k
PipelineFragmentContext::~PipelineFragmentContext() {
157
61.6k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
158
61.6k
            .tag("query_id", print_id(_query_id))
159
61.6k
            .tag("fragment_id", _fragment_id);
160
61.6k
    _release_resource();
161
61.6k
    {
162
        // The memory released by the query end is recorded in the query mem tracker.
163
61.6k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
164
61.6k
        _runtime_state.reset();
165
61.6k
        _query_ctx.reset();
166
61.6k
    }
167
61.6k
}
168
169
0
bool PipelineFragmentContext::is_timeout(timespec now) const {
170
0
    if (_timeout <= 0) {
171
0
        return false;
172
0
    }
173
0
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
174
0
}
175
176
// notify_close() transitions the PFC from "waiting for external close notification" to
177
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
178
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
179
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
180
502
bool PipelineFragmentContext::notify_close() {
181
502
    bool all_closed = false;
182
502
    bool need_remove = false;
183
502
    {
184
502
        std::lock_guard<std::mutex> l(_task_mutex);
185
502
        if (_closed_tasks >= _total_tasks) {
186
11
            if (_need_notify_close) {
187
                // Fragment was cancelled and waiting for notify to close.
188
                // Record that we need to remove from fragment mgr, but do it
189
                // after releasing _task_mutex to avoid ABBA deadlock with
190
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
191
                // first, then _task_mutex via debug_string()).
192
0
                need_remove = true;
193
0
            }
194
11
            all_closed = true;
195
11
        }
196
        // make fragment release by self after cancel
197
502
        _need_notify_close = false;
198
502
    }
199
502
    if (need_remove) {
200
0
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
201
0
    }
202
502
    return all_closed;
203
502
}
204
205
// Must not add lock in this method. Because it will call query ctx cancel. And
206
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
207
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
208
// There maybe dead lock.
209
502
void PipelineFragmentContext::cancel(const Status reason) {
210
502
    LOG_INFO("PipelineFragmentContext::cancel")
211
502
            .tag("query_id", print_id(_query_id))
212
502
            .tag("fragment_id", _fragment_id)
213
502
            .tag("reason", reason.to_string());
214
502
    if (notify_close()) {
215
11
        return;
216
11
    }
217
    // Timeout is a special error code, we need print current stack to debug timeout issue.
218
491
    if (reason.is<ErrorCode::TIMEOUT>()) {
219
0
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
220
0
                                   debug_string());
221
0
        LOG_LONG_STRING(WARNING, dbg_str);
222
0
    }
223
224
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
225
491
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
226
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
227
0
                    debug_string());
228
0
    }
229
230
491
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
231
0
        print_profile("cancel pipeline, reason: " + reason.to_string());
232
0
    }
233
234
491
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
235
0
        _query_ctx->set_load_error_url(error_url);
236
0
    }
237
238
491
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
239
0
        _query_ctx->set_first_error_msg(first_error_msg);
240
0
    }
241
242
491
    _query_ctx->cancel(reason, _fragment_id);
243
491
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
244
100
        _is_report_on_cancel = false;
245
391
    } else {
246
2.06k
        for (auto& id : _fragment_instance_ids) {
247
2.06k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
248
2.06k
        }
249
391
    }
250
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
251
    // For stream load the fragment's query_id == load id, it is set in FE.
252
491
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
253
491
    if (stream_load_ctx != nullptr) {
254
0
        stream_load_ctx->pipe->cancel(reason.to_string());
255
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
256
        // We need to set the error URL at this point to ensure error information is properly
257
        // propagated to the client.
258
0
        stream_load_ctx->error_url = get_load_error_url();
259
0
        stream_load_ctx->first_error_msg = get_first_error_msg();
260
0
    }
261
262
2.18k
    for (auto& tasks : _tasks) {
263
4.78k
        for (auto& task : tasks) {
264
4.78k
            task.first->unblock_all_dependencies();
265
4.78k
        }
266
2.18k
    }
267
491
}
268
269
95.7k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
270
95.7k
    PipelineId id = _next_pipeline_id++;
271
95.7k
    auto pipeline = std::make_shared<Pipeline>(
272
95.7k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
273
95.7k
            parent ? parent->num_tasks() : _num_instances);
274
95.7k
    if (idx >= 0) {
275
264
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
276
95.4k
    } else {
277
95.4k
        _pipelines.emplace_back(pipeline);
278
95.4k
    }
279
95.7k
    if (parent) {
280
31.0k
        parent->set_children(pipeline);
281
31.0k
    }
282
95.7k
    return pipeline;
283
95.7k
}
284
285
61.6k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
286
61.6k
    {
287
61.6k
        SCOPED_TIMER(_build_pipelines_timer);
288
        // 2. Build pipelines with operators in this fragment.
289
61.6k
        auto root_pipeline = add_pipeline();
290
61.6k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
291
61.6k
                                         &_root_op, root_pipeline));
292
293
        // Propagate _num_instances from LOCAL_EXCHANGE pipelines to ancestor pipelines
294
        // that inherited reduced num_tasks from a serial operator.
295
61.6k
        _propagate_local_exchange_num_tasks();
296
297
        // Create deferred local exchangers now that all pipelines have final num_tasks.
298
61.6k
        RETURN_IF_ERROR(_create_deferred_local_exchangers());
299
300
        // Raise num_tasks for pipelines whose serial non-scan operators (e.g.,
301
        // UNPARTITIONED Exchange) reduced num_tasks below _num_instances.
302
        // Without this, fragment instances 1+ have no task for these pipelines
303
        // and downstream operators fail with "must set shared state".
304
        //
305
        // This applies to ALL pipelines (not just deferred exchanger upstreams):
306
        // fragments with UNION/INTERSECT/EXCEPT + serial Exchange in child
307
        // pipelines also need the raise, even without FE-planned local exchange.
308
        //
309
        // Exception: serial scan sources (pooling scan) keep num_tasks=1 — the
310
        // PassthroughExchanger(1, N) handles the fan-out correctly.
311
        // NOTE: Do NOT raise pipelines whose source is a serial operator
312
        // (Exchange or scan) — they legitimately have 1 task, and raising
313
        // them causes crashes (e.g., 4 Exchange tasks but only 1 receives
314
        // data).  The correct fix for shared state injection across
315
        // instances is handled by the FE: it inserts local exchange nodes
316
        // between serial operators and their downstream consumers, creating
317
        // proper pipeline boundaries with _num_instances tasks.
318
319
        // 3. Create sink operator
320
61.6k
        if (!_params.fragment.__isset.output_sink) {
321
0
            return Status::InternalError("No output sink in this fragment!");
322
0
        }
323
61.6k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
324
61.6k
                                          _params.fragment.output_exprs, _params,
325
61.6k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
326
61.6k
                                          *_desc_tbl, root_pipeline->id()));
327
61.6k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
328
61.6k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
329
330
95.4k
        for (PipelinePtr& pipeline : _pipelines) {
331
95.4k
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
332
95.4k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
333
95.4k
        }
334
61.6k
    }
335
    // 4. Build local exchanger
336
61.6k
    if (_runtime_state->plan_local_shuffle()) {
337
32.4k
        SCOPED_TIMER(_plan_local_exchanger_timer);
338
32.4k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
339
32.4k
                                             _params.bucket_seq_to_instance_idx,
340
32.4k
                                             _params.shuffle_idx_to_instance_idx));
341
32.4k
    }
342
343
    // 5. Initialize global states in pipelines.
344
95.7k
    for (PipelinePtr& pipeline : _pipelines) {
345
95.7k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
346
95.7k
        pipeline->children().clear();
347
95.7k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
348
95.7k
    }
349
350
61.6k
    {
351
61.6k
        SCOPED_TIMER(_build_tasks_timer);
352
        // 6. Build pipeline tasks and initialize local state.
353
61.6k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
354
61.6k
    }
355
356
61.6k
    return Status::OK();
357
61.6k
}
358
359
61.6k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
360
61.6k
    if (_prepared) {
361
0
        return Status::InternalError("Already prepared");
362
0
    }
363
61.6k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
364
61.6k
        _timeout = _params.query_options.execution_timeout;
365
61.6k
    }
366
367
61.6k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
368
61.6k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
369
61.6k
    SCOPED_TIMER(_prepare_timer);
370
61.6k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
371
61.6k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
372
61.6k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
373
61.6k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
374
61.6k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
375
61.6k
    {
376
61.6k
        SCOPED_TIMER(_init_context_timer);
377
61.6k
        cast_set(_num_instances, _params.local_params.size());
378
61.6k
        _total_instances =
379
61.6k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
380
381
61.6k
        auto* fragment_context = this;
382
383
61.6k
        if (_params.query_options.__isset.is_report_success) {
384
61.6k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
385
61.6k
        }
386
387
        // 1. Set up the global runtime state.
388
61.6k
        _runtime_state = RuntimeState::create_unique(
389
61.6k
                _params.query_id, _params.fragment_id, _params.query_options,
390
61.6k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
391
61.6k
        _runtime_state->set_task_execution_context(shared_from_this());
392
61.6k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
393
61.6k
        if (_params.__isset.backend_id) {
394
60.4k
            _runtime_state->set_backend_id(_params.backend_id);
395
60.4k
        }
396
61.6k
        if (_params.__isset.import_label) {
397
0
            _runtime_state->set_import_label(_params.import_label);
398
0
        }
399
61.6k
        if (_params.__isset.db_name) {
400
0
            _runtime_state->set_db_name(_params.db_name);
401
0
        }
402
61.6k
        if (_params.__isset.load_job_id) {
403
0
            _runtime_state->set_load_job_id(_params.load_job_id);
404
0
        }
405
406
61.6k
        if (_params.is_simplified_param) {
407
18.2k
            _desc_tbl = _query_ctx->desc_tbl;
408
43.3k
        } else {
409
43.3k
            DCHECK(_params.__isset.desc_tbl);
410
43.3k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
411
43.3k
                                                  &_desc_tbl));
412
43.3k
        }
413
61.6k
        _runtime_state->set_desc_tbl(_desc_tbl);
414
61.6k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
415
61.6k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
416
61.6k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
417
61.6k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
418
419
        // init fragment_instance_ids
420
61.6k
        const auto target_size = _params.local_params.size();
421
61.6k
        _fragment_instance_ids.resize(target_size);
422
196k
        for (size_t i = 0; i < _params.local_params.size(); i++) {
423
135k
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
424
135k
            _fragment_instance_ids[i] = fragment_instance_id;
425
135k
        }
426
61.6k
    }
427
428
61.6k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
429
430
61.6k
    _init_next_report_time();
431
432
61.6k
    _prepared = true;
433
61.6k
    return Status::OK();
434
61.6k
}
435
436
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
437
        int instance_idx,
438
135k
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
439
135k
    const auto& local_params = _params.local_params[instance_idx];
440
135k
    auto fragment_instance_id = local_params.fragment_instance_id;
441
135k
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
442
135k
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
443
135k
    auto get_shared_state = [&](PipelinePtr pipeline)
444
135k
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
445
241k
                                       std::vector<std::shared_ptr<Dependency>>>> {
446
241k
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
447
241k
                                std::vector<std::shared_ptr<Dependency>>>>
448
241k
                shared_state_map;
449
251k
        for (auto& op : pipeline->operators()) {
450
251k
            auto source_id = op->operator_id();
451
251k
            if (auto iter = _op_id_to_shared_state.find(source_id);
452
251k
                iter != _op_id_to_shared_state.end()) {
453
78.9k
                shared_state_map.insert({source_id, iter->second});
454
78.9k
            }
455
251k
        }
456
243k
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
457
243k
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
458
243k
                iter != _op_id_to_shared_state.end()) {
459
22.7k
                shared_state_map.insert({sink_to_source_id, iter->second});
460
22.7k
            }
461
243k
        }
462
241k
        return shared_state_map;
463
241k
    };
464
465
432k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
466
297k
        auto& pipeline = _pipelines[pip_idx];
467
297k
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
468
241k
            auto task_runtime_state = RuntimeState::create_unique(
469
241k
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
470
241k
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
471
241k
            {
472
                // Initialize runtime state for this task
473
241k
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
474
475
241k
                task_runtime_state->set_task_execution_context(shared_from_this());
476
241k
                task_runtime_state->set_be_number(local_params.backend_num);
477
478
241k
                if (_params.__isset.backend_id) {
479
239k
                    task_runtime_state->set_backend_id(_params.backend_id);
480
239k
                }
481
241k
                if (_params.__isset.import_label) {
482
0
                    task_runtime_state->set_import_label(_params.import_label);
483
0
                }
484
241k
                if (_params.__isset.db_name) {
485
0
                    task_runtime_state->set_db_name(_params.db_name);
486
0
                }
487
241k
                if (_params.__isset.load_job_id) {
488
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
489
0
                }
490
241k
                if (_params.__isset.wal_id) {
491
0
                    task_runtime_state->set_wal_id(_params.wal_id);
492
0
                }
493
241k
                if (_params.__isset.content_length) {
494
0
                    task_runtime_state->set_content_length(_params.content_length);
495
0
                }
496
497
241k
                task_runtime_state->set_desc_tbl(_desc_tbl);
498
241k
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
499
241k
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
500
241k
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
501
241k
                task_runtime_state->set_max_operator_id(max_operator_id());
502
241k
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
503
241k
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
504
241k
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
505
506
241k
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
507
241k
            }
508
241k
            auto cur_task_id = _total_tasks++;
509
241k
            task_runtime_state->set_task_id(cur_task_id);
510
241k
            task_runtime_state->set_task_num(pipeline->num_tasks());
511
241k
            auto task = std::make_shared<PipelineTask>(
512
241k
                    pipeline, cur_task_id, task_runtime_state.get(),
513
241k
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
514
241k
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
515
241k
                    instance_idx);
516
241k
            pipeline->incr_created_tasks(instance_idx, task.get());
517
241k
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
518
241k
            _tasks[instance_idx].emplace_back(
519
241k
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
520
241k
                            std::move(task), std::move(task_runtime_state)});
521
241k
        }
522
297k
    }
523
524
    /**
525
         * Build DAG for pipeline tasks.
526
         * For example, we have
527
         *
528
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
529
         *            \                      /
530
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
531
         *                 \                          /
532
         *               JoinProbeOperator2 (Pipeline1)
533
         *
534
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
535
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
536
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
537
         *
538
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
539
         * and JoinProbeOperator2.
540
         */
541
297k
    for (auto& _pipeline : _pipelines) {
542
297k
        if (pipeline_id_to_task.contains(_pipeline->id())) {
543
241k
            auto* task = pipeline_id_to_task[_pipeline->id()];
544
241k
            DCHECK(task != nullptr);
545
546
            // If this task has upstream dependency, then inject it into this task.
547
241k
            if (_dag.contains(_pipeline->id())) {
548
159k
                auto& deps = _dag[_pipeline->id()];
549
164k
                for (auto& dep : deps) {
550
164k
                    if (pipeline_id_to_task.contains(dep)) {
551
106k
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
552
106k
                        if (ss) {
553
82.8k
                            task->inject_shared_state(ss);
554
82.8k
                        } else {
555
23.3k
                            pipeline_id_to_task[dep]->inject_shared_state(
556
23.3k
                                    task->get_source_shared_state());
557
23.3k
                        }
558
106k
                    }
559
164k
                }
560
159k
            }
561
241k
        }
562
297k
    }
563
432k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
564
297k
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
565
241k
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
566
241k
            DCHECK(pipeline_id_to_profile[pip_idx]);
567
241k
            std::vector<TScanRangeParams> scan_ranges;
568
241k
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
569
241k
            if (local_params.per_node_scan_ranges.contains(node_id)) {
570
54.6k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
571
54.6k
            }
572
241k
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
573
241k
                                                             _params.fragment.output_sink));
574
241k
        }
575
297k
    }
576
135k
    {
577
135k
        std::lock_guard<std::mutex> l(_state_map_lock);
578
135k
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
579
135k
    }
580
135k
    return Status::OK();
581
135k
}
582
583
61.6k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
584
61.6k
    _total_tasks = 0;
585
61.6k
    _closed_tasks = 0;
586
61.6k
    const auto target_size = _params.local_params.size();
587
61.6k
    _tasks.resize(target_size);
588
61.6k
    _runtime_filter_mgr_map.resize(target_size);
589
157k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
590
95.7k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
591
95.7k
    }
592
61.6k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
593
594
61.6k
    if (target_size > 1 &&
595
61.6k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
596
10.5k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
597
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
598
2
        std::vector<Status> prepare_status(target_size);
599
2
        int submitted_tasks = 0;
600
2
        Status submit_status;
601
2
        CountDownLatch latch((int)target_size);
602
12
        for (int i = 0; i < target_size; i++) {
603
10
            submit_status = thread_pool->submit_func([&, i]() {
604
10
                SCOPED_ATTACH_TASK(_query_ctx.get());
605
10
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
606
10
                latch.count_down();
607
10
            });
608
10
            if (LIKELY(submit_status.ok())) {
609
10
                submitted_tasks++;
610
10
            } else {
611
0
                break;
612
0
            }
613
10
        }
614
2
        latch.arrive_and_wait(target_size - submitted_tasks);
615
2
        if (UNLIKELY(!submit_status.ok())) {
616
0
            return submit_status;
617
0
        }
618
12
        for (int i = 0; i < submitted_tasks; i++) {
619
10
            if (!prepare_status[i].ok()) {
620
0
                return prepare_status[i];
621
0
            }
622
10
        }
623
61.6k
    } else {
624
196k
        for (int i = 0; i < target_size; i++) {
625
135k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
626
135k
        }
627
61.6k
    }
628
61.6k
    _pipeline_parent_map.clear();
629
61.6k
    _op_id_to_shared_state.clear();
630
    // Record task cardinality once when this fragment context finishes task initialization.
631
61.6k
    _query_ctx->add_total_task_num(_total_tasks.load(std::memory_order_relaxed));
632
633
61.6k
    return Status::OK();
634
61.6k
}
635
636
61.6k
void PipelineFragmentContext::_init_next_report_time() {
637
61.6k
    auto interval_s = config::pipeline_status_report_interval;
638
61.6k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
639
5.75k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
640
5.75k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
641
        // We don't want to wait longer than it takes to run the entire fragment.
642
5.75k
        _previous_report_time =
643
5.75k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
644
5.75k
        _disable_period_report = false;
645
5.75k
    }
646
61.6k
}
647
648
711
void PipelineFragmentContext::refresh_next_report_time() {
649
711
    auto disable = _disable_period_report.load(std::memory_order_acquire);
650
711
    DCHECK(disable == true);
651
711
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
652
711
    _disable_period_report.compare_exchange_strong(disable, false);
653
711
}
654
655
936k
void PipelineFragmentContext::trigger_report_if_necessary() {
656
936k
    if (!_is_report_success) {
657
866k
        return;
658
866k
    }
659
69.5k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
660
69.5k
    if (disable) {
661
1.91k
        return;
662
1.91k
    }
663
67.6k
    int32_t interval_s = config::pipeline_status_report_interval;
664
67.6k
    if (interval_s <= 0) {
665
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
666
0
                        "trigger "
667
0
                        "report.";
668
0
    }
669
67.6k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
670
67.6k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
671
67.6k
    if (MonotonicNanos() > next_report_time) {
672
712
        if (!_disable_period_report.compare_exchange_strong(disable, true,
673
712
                                                            std::memory_order_acq_rel)) {
674
1
            return;
675
1
        }
676
711
        if (VLOG_FILE_IS_ON) {
677
0
            VLOG_FILE << "Reporting "
678
0
                      << "profile for query_id " << print_id(_query_id)
679
0
                      << ", fragment id: " << _fragment_id;
680
681
0
            std::stringstream ss;
682
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
683
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
684
0
            if (_runtime_state->load_channel_profile()) {
685
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
686
0
            }
687
688
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
689
0
                      << " profile:\n"
690
0
                      << ss.str();
691
0
        }
692
711
        auto st = send_report(false);
693
711
        if (!st.ok()) {
694
0
            disable = true;
695
0
            _disable_period_report.compare_exchange_strong(disable, false,
696
0
                                                           std::memory_order_acq_rel);
697
0
        }
698
711
    }
699
67.6k
}
700
701
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
702
61.5k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
703
61.5k
    if (_params.fragment.plan.nodes.empty()) {
704
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
705
0
    }
706
707
61.5k
    int node_idx = 0;
708
709
61.5k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
710
61.5k
                                        &node_idx, root, cur_pipe, 0, false, false));
711
712
61.5k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
713
0
        return Status::InternalError(
714
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
715
0
    }
716
61.5k
    return Status::OK();
717
61.5k
}
718
719
61.6k
Status PipelineFragmentContext::_create_deferred_local_exchangers() {
720
61.6k
    for (auto& info : _deferred_exchangers) {
721
        // DANGER ZONE — do not "fix" this line without reading the history.
722
        //
723
        // sender_count seeds Exchanger::_running_sink_operators, which the source side
724
        // waits to reach 0 via sub_running_sink_operators on each sink LocalState close.
725
        // The correct value is THIS pipeline-instance's sink task count, which is exactly
726
        // info.upstream_pipe->num_tasks() — one PipelineTask per task, one close per task.
727
        //
728
        // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to mirror the
729
        //   BE-planned path in _add_local_exchange_impl (~line 1023).  THIS BREAKS the
730
        //   common FE-planned shape of `serial scan → LE(PT) → ...`: upstream_pipe
731
        //   genuinely has num_tasks=1, only 1 close arrives, but seed becomes
732
        //   _num_instances so _running_sink_operators never reaches 0 — downstream
733
        //   sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
734
        //   mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING and regressed
735
        //   exactly this way).  BE-planned mode uses max() because its
736
        //   `cur_pipe` is the source-side pipeline (always raised to _num_instances by
737
        //   add_pipeline) — not analogous to our `upstream_pipe` here, which is the
738
        //   sink-side pipeline that may legitimately stay at 1 for serial sources.
739
        //
740
        // Tempting wrong fix #2: multiply by _num_instances on the theory shared_state
741
        //   is shared across all instances.  Same hang — each fragment-instance
742
        //   PipelineFragmentContext has its OWN _op_id_to_shared_state map, so the
743
        //   exchanger is per-instance, not per-BE.  num_tasks() is already the right
744
        //   close-count for one instance.
745
        //
746
        // If a hang shows up with `_running_sink_operators < 0`, the bug is upstream:
747
        // _propagate_local_exchange_num_tasks left num_tasks too low (or too high) for
748
        // this fragment shape.  Fix THAT pass, not this seed value.
749
9.24k
        const int sender_count = info.upstream_pipe->num_tasks();
750
9.24k
        switch (info.partition_type) {
751
340
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
752
340
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
753
340
            info.shared_state->exchanger = ShuffleExchanger::create_unique(
754
340
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit,
755
340
                    info.partition_type);
756
340
            break;
757
4
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
758
4
            info.shared_state->exchanger = BucketShuffleExchanger::create_unique(
759
4
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit);
760
4
            break;
761
8.50k
        case TLocalPartitionType::PASSTHROUGH:
762
8.50k
            info.shared_state->exchanger = PassthroughExchanger::create_unique(
763
8.50k
                    sender_count, _num_instances, info.free_blocks_limit);
764
8.50k
            break;
765
29
        case TLocalPartitionType::BROADCAST:
766
29
            info.shared_state->exchanger = BroadcastExchanger::create_unique(
767
29
                    sender_count, _num_instances, info.free_blocks_limit);
768
29
            break;
769
336
        case TLocalPartitionType::PASS_TO_ONE:
770
336
            if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
771
336
                info.shared_state->exchanger = PassToOneExchanger::create_unique(
772
336
                        sender_count, _num_instances, info.free_blocks_limit);
773
336
            } else {
774
0
                info.shared_state->exchanger = BroadcastExchanger::create_unique(
775
0
                        sender_count, _num_instances, info.free_blocks_limit);
776
0
            }
777
336
            break;
778
29
        case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
779
29
            info.shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
780
29
                    sender_count, _num_instances, info.free_blocks_limit);
781
29
            break;
782
0
        case TLocalPartitionType::NOOP:
783
0
        case TLocalPartitionType::LOCAL_MERGE_SORT:
784
            // FE-planned LocalExchangeNode currently never emits NOOP or LOCAL_MERGE_SORT
785
            // through the deferred-exchanger path.  NOOP means "no exchange needed" and
786
            // is filtered out before reaching here; LOCAL_MERGE_SORT is planned by the
787
            // legacy BE path only.  Crash in debug to surface the protocol violation if
788
            // that ever changes; return an error in release to avoid silently corrupting
789
            // execution.
790
0
            DCHECK(false) << "FE-planned local exchange should not emit partition_type="
791
0
                          << static_cast<int>(info.partition_type);
792
0
            return Status::InternalError("FE-planned local exchange emitted unsupported type: " +
793
0
                                         std::to_string(static_cast<int>(info.partition_type)));
794
0
        default:
795
            // New TLocalPartitionType added on FE side without a BE handler here.
796
0
            DCHECK(false) << "Unhandled TLocalPartitionType in deferred exchangers: "
797
0
                          << static_cast<int>(info.partition_type);
798
0
            return Status::InternalError("Unsupported FE-planned local exchange type: " +
799
0
                                         std::to_string(static_cast<int>(info.partition_type)));
800
9.24k
        }
801
9.24k
    }
802
61.6k
    _deferred_exchangers.clear();
803
61.6k
    return Status::OK();
804
61.6k
}
805
806
61.6k
void PipelineFragmentContext::_propagate_local_exchange_num_tasks() {
807
    // Only runs when FE has planned local exchanges and BE deferred their construction.
808
    // In legacy mode (enable_local_shuffle_planner=false) BE plans LE itself via
809
    // _plan_local_exchange and _deferred_exchangers stays empty — the legacy path
810
    // already gets its num_tasks right at construction time, so the propagate passes
811
    // would be no-ops and are skipped.  This is a transitional design: once the FE
812
    // planner is the only planner, the propagation logic itself should degrade into
813
    // a pure assertion that the FE plan already wired the right num_tasks everywhere.
814
61.6k
    if (_deferred_exchangers.empty()) {
815
53.1k
        return;
816
53.1k
    }
817
    // Reconcile num_tasks across paired pipelines created by pipeline-splitting operators
818
    // (AGG, SORT, JOIN): they share state via inject_shared_state and must agree, or
819
    // instance 1+ tasks access null shared_state.  A pipeline's num_tasks is fully
820
    // determined by its source operator plus its upstreams:
821
    //   - LocalExchangeSource  -> _num_instances (the LE re-parallelizes)
822
    //   - serial source        -> its reduced count (kept as-is, typically 1)
823
    //   - otherwise (splitter) -> inherit from upstreams: raise to _num_instances if any
824
    //                             upstream was raised by an LE, then lower to a serial
825
    //                             upstream's count (lower wins).
826
    // Visiting each pipeline only after all its upstreams (topological order over _dag) lets
827
    // a single sweep reach the same fixpoint the previous two while-loops iterated to — those
828
    // only existed to reconcile the top-down build's parent-inherited num_tasks guesses.
829
8.51k
    std::map<PipelineId, PipelinePtr> id_to_pipe;
830
8.51k
    std::map<PipelineId, std::vector<PipelineId>> downstreams_of;
831
8.51k
    std::map<PipelineId, int> in_degree;
832
25.2k
    for (auto& p : _pipelines) {
833
25.2k
        id_to_pipe[p->id()] = p;
834
25.2k
        in_degree.try_emplace(p->id(), 0);
835
25.2k
    }
836
16.3k
    for (const auto& [downstream_id, upstream_ids] : _dag) {
837
16.7k
        for (auto upstream_id : upstream_ids) {
838
16.7k
            downstreams_of[upstream_id].push_back(downstream_id);
839
16.7k
            in_degree[downstream_id]++;
840
16.7k
        }
841
16.3k
    }
842
8.51k
    std::vector<PipelineId> ready;
843
25.2k
    for (const auto& [id, deg] : in_degree) {
844
25.2k
        if (deg == 0) {
845
8.91k
            ready.push_back(id);
846
8.91k
        }
847
25.2k
    }
848
8.51k
    size_t visited = 0;
849
33.8k
    while (!ready.empty()) {
850
25.2k
        const auto id = ready.back();
851
25.2k
        ready.pop_back();
852
25.2k
        visited++;
853
25.2k
        auto pit = id_to_pipe.find(id);
854
25.2k
        if (pit != id_to_pipe.end()) {
855
25.2k
            auto& pipe = pit->second;
856
25.2k
            const auto& ops = pipe->operators();
857
25.2k
            const bool le_source =
858
25.2k
                    !ops.empty() && dynamic_cast<LocalExchangeSourceOperatorX*>(ops.front().get());
859
25.2k
            const bool serial_source = !ops.empty() && ops.front()->is_serial_operator();
860
25.2k
            if (le_source) {
861
9.24k
                pipe->set_num_tasks(_num_instances);
862
16.0k
            } else if (!serial_source) {
863
8.29k
                int target = pipe->num_tasks();
864
8.29k
                const auto up_it = _dag.find(id);
865
8.29k
                if (up_it != _dag.end()) {
866
                    // raise: any upstream already at _num_instances (e.g. an LE source)
867
7.14k
                    for (auto upstream_id : up_it->second) {
868
7.14k
                        auto uit = id_to_pipe.find(upstream_id);
869
7.14k
                        if (uit != id_to_pipe.end() && uit->second->num_tasks() >= _num_instances) {
870
7.14k
                            target = _num_instances;
871
7.14k
                            break;
872
7.14k
                        }
873
7.14k
                    }
874
                    // lower: a serial upstream with fewer tasks (wins over the raise above)
875
7.17k
                    for (auto upstream_id : up_it->second) {
876
7.17k
                        auto uit = id_to_pipe.find(upstream_id);
877
7.17k
                        if (uit != id_to_pipe.end() && uit->second->num_tasks() < target &&
878
7.17k
                            !uit->second->operators().empty() &&
879
7.17k
                            uit->second->operators().front()->is_serial_operator()) {
880
0
                            target = uit->second->num_tasks();
881
0
                        }
882
7.17k
                    }
883
7.14k
                }
884
8.29k
                pipe->set_num_tasks(target);
885
8.29k
            }
886
25.2k
        }
887
25.2k
        for (auto down : downstreams_of[id]) {
888
16.7k
            if (--in_degree[down] == 0) {
889
16.3k
                ready.push_back(down);
890
16.3k
            }
891
16.7k
        }
892
25.2k
    }
893
    // The pipeline DAG is acyclic; if a future change introduces a back-edge, some pipelines
894
    // stay unvisited (in_degree never reaches 0) — fail loudly rather than silently leaving
895
    // their num_tasks unreconciled.
896
8.51k
    DCHECK_EQ(visited, in_degree.size())
897
0
            << "pipeline num_tasks topological sweep visited " << visited << " of "
898
0
            << in_degree.size() << " pipelines (cycle in _dag?)";
899
8.51k
}
900
901
Status PipelineFragmentContext::_create_tree_helper(
902
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
903
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
904
96.8k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
905
    // propagate error case
906
96.8k
    if (*node_idx >= tnodes.size()) {
907
0
        return Status::InternalError(
908
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
909
0
                *node_idx, tnodes.size());
910
0
    }
911
96.8k
    const TPlanNode& tnode = tnodes[*node_idx];
912
913
96.8k
    int num_children = tnodes[*node_idx].num_children;
914
96.8k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
915
96.8k
    bool current_require_bucket_distribution = require_bucket_distribution;
916
    // TODO: Create CacheOperator is confused now
917
96.8k
    OperatorPtr op = nullptr;
918
96.8k
    OperatorPtr cache_op = nullptr;
919
96.8k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
920
96.8k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
921
96.8k
                                     followed_by_shuffled_operator,
922
96.8k
                                     current_require_bucket_distribution, cache_op));
923
    // Initialization must be done here. For example, group by expressions in agg will be used to
924
    // decide if a local shuffle should be planed, so it must be initialized here.
925
96.8k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
926
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
927
96.8k
    if (parent != nullptr) {
928
        // add to parent's child(s)
929
35.2k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
930
61.6k
    } else {
931
61.6k
        *root = op;
932
61.6k
    }
933
    /**
934
     * `TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
935
     *
936
     * For plan:
937
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
938
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
939
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
940
     *
941
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
942
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
943
     */
944
96.8k
    auto required_data_distribution =
945
96.8k
            cur_pipe->operators().empty()
946
96.8k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
947
96.8k
                    : op->required_data_distribution(_runtime_state.get());
948
96.8k
    current_followed_by_shuffled_operator =
949
96.8k
            ((followed_by_shuffled_operator ||
950
96.8k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
951
95.3k
                                             : op->is_shuffled_operator())) &&
952
96.8k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
953
96.8k
            (followed_by_shuffled_operator &&
954
93.9k
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
955
956
96.8k
    current_require_bucket_distribution =
957
96.8k
            ((require_bucket_distribution ||
958
96.8k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
959
95.4k
                                             : op->is_colocated_operator())) &&
960
96.8k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
961
96.8k
            (require_bucket_distribution &&
962
94.0k
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
963
964
96.8k
    if (num_children == 0) {
965
64.2k
        _use_serial_source = op->is_serial_operator();
966
64.2k
    }
967
    // rely on that tnodes is preorder of the plan
968
132k
    for (int i = 0; i < num_children; i++) {
969
35.2k
        ++*node_idx;
970
35.2k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
971
35.2k
                                            current_followed_by_shuffled_operator,
972
35.2k
                                            current_require_bucket_distribution));
973
974
        // we are expecting a child, but have used all nodes
975
        // this means we have been given a bad tree and must fail
976
35.2k
        if (*node_idx >= tnodes.size()) {
977
0
            return Status::InternalError(
978
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
979
0
                    "nodes: {}",
980
0
                    *node_idx, tnodes.size());
981
0
        }
982
35.2k
    }
983
984
96.8k
    return Status::OK();
985
96.8k
}
986
987
void PipelineFragmentContext::_inherit_pipeline_properties(
988
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
989
264
        PipelinePtr pipe_with_sink) {
990
264
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
991
264
    pipe_with_source->set_num_tasks(_num_instances);
992
264
    pipe_with_source->set_data_distribution(data_distribution);
993
264
}
994
995
Status PipelineFragmentContext::_add_local_exchange_impl(
996
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
997
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
998
        const std::map<int, int>& bucket_seq_to_instance_idx,
999
264
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1000
264
    auto& operators = cur_pipe->operators();
1001
264
    const auto downstream_pipeline_id = cur_pipe->id();
1002
264
    auto local_exchange_id = next_operator_id();
1003
    // 1. Create a new pipeline with local exchange sink.
1004
264
    DataSinkOperatorPtr sink;
1005
264
    auto sink_id = next_sink_operator_id();
1006
1007
    /**
1008
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
1009
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
1010
     */
1011
264
    const bool followed_by_shuffled_operator =
1012
264
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
1013
264
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
1014
264
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
1015
264
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
1016
264
                                         followed_by_shuffled_operator && !_use_serial_source;
1017
264
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
1018
264
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
1019
264
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
1020
264
    if (bucket_seq_to_instance_idx.empty() &&
1021
264
        data_distribution.distribution_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
1022
0
        data_distribution.distribution_type =
1023
0
                use_global_hash_shuffle ? TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE
1024
0
                                        : TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1025
0
    }
1026
264
    if (!use_global_hash_shuffle &&
1027
264
        data_distribution.distribution_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
1028
0
        data_distribution.distribution_type = TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1029
0
    }
1030
264
    RETURN_IF_ERROR(new_pip->set_sink(sink));
1031
264
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
1032
264
                                          num_buckets, shuffle_idx_to_instance_idx));
1033
1034
    // 2. Create and initialize LocalExchangeSharedState.
1035
264
    std::shared_ptr<LocalExchangeSharedState> shared_state =
1036
264
            LocalExchangeSharedState::create_shared(_num_instances);
1037
264
    switch (data_distribution.distribution_type) {
1038
0
    case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
1039
0
    case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
1040
0
        shared_state->exchanger = ShuffleExchanger::create_unique(
1041
0
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1042
0
                use_global_hash_shuffle ? _total_instances : _num_instances,
1043
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1044
0
                        ? cast_set<int>(
1045
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1046
0
                        : 0,
1047
0
                data_distribution.distribution_type);
1048
0
        break;
1049
0
    case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
1050
0
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
1051
0
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
1052
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1053
0
                        ? cast_set<int>(
1054
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1055
0
                        : 0);
1056
0
        break;
1057
264
    case TLocalPartitionType::PASSTHROUGH:
1058
264
        shared_state->exchanger = PassthroughExchanger::create_unique(
1059
264
                cur_pipe->num_tasks(), _num_instances,
1060
264
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1061
264
                        ? cast_set<int>(
1062
264
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1063
264
                        : 0);
1064
264
        break;
1065
0
    case TLocalPartitionType::BROADCAST:
1066
0
        shared_state->exchanger = BroadcastExchanger::create_unique(
1067
0
                cur_pipe->num_tasks(), _num_instances,
1068
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1069
0
                        ? cast_set<int>(
1070
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1071
0
                        : 0);
1072
0
        break;
1073
0
    case TLocalPartitionType::PASS_TO_ONE:
1074
0
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
1075
            // If shared hash table is enabled for BJ, hash table will be built by only one task
1076
0
            shared_state->exchanger = PassToOneExchanger::create_unique(
1077
0
                    cur_pipe->num_tasks(), _num_instances,
1078
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1079
0
                            ? cast_set<int>(_runtime_state->query_options()
1080
0
                                                    .local_exchange_free_blocks_limit)
1081
0
                            : 0);
1082
0
        } else {
1083
0
            shared_state->exchanger = BroadcastExchanger::create_unique(
1084
0
                    cur_pipe->num_tasks(), _num_instances,
1085
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1086
0
                            ? cast_set<int>(_runtime_state->query_options()
1087
0
                                                    .local_exchange_free_blocks_limit)
1088
0
                            : 0);
1089
0
        }
1090
0
        break;
1091
0
    case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
1092
0
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
1093
0
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1094
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1095
0
                        ? cast_set<int>(
1096
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1097
0
                        : 0);
1098
0
        break;
1099
0
    default:
1100
0
        return Status::InternalError("Unsupported local exchange type : " +
1101
0
                                     std::to_string((int)data_distribution.distribution_type));
1102
264
    }
1103
264
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
1104
264
                                             "LOCAL_EXCHANGE_OPERATOR");
1105
264
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
1106
264
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
1107
1108
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
1109
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
1110
1111
    // 3.1 Initialize new pipeline's operator list.
1112
264
    std::copy(operators.begin(), operators.begin() + idx,
1113
264
              std::inserter(new_pip->operators(), new_pip->operators().end()));
1114
1115
    // 3.2 Erase unused operators in previous pipeline.
1116
264
    operators.erase(operators.begin(), operators.begin() + idx);
1117
1118
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
1119
264
    OperatorPtr source_op;
1120
264
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
1121
264
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
1122
264
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
1123
264
    if (!operators.empty()) {
1124
0
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
1125
0
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
1126
0
    }
1127
264
    operators.insert(operators.begin(), source_op);
1128
1129
    // 5. Set children for two pipelines separately.
1130
264
    std::vector<std::shared_ptr<Pipeline>> new_children;
1131
264
    std::vector<PipelineId> edges_with_source;
1132
264
    for (auto child : cur_pipe->children()) {
1133
264
        bool found = false;
1134
264
        for (auto op : new_pip->operators()) {
1135
264
            if (child->sink()->node_id() == op->node_id()) {
1136
0
                new_pip->set_children(child);
1137
0
                found = true;
1138
0
            };
1139
264
        }
1140
264
        if (!found) {
1141
264
            new_children.push_back(child);
1142
264
            edges_with_source.push_back(child->id());
1143
264
        }
1144
264
    }
1145
264
    new_children.push_back(new_pip);
1146
264
    edges_with_source.push_back(new_pip->id());
1147
1148
    // 6. Set DAG for new pipelines.
1149
264
    if (!new_pip->children().empty()) {
1150
0
        std::vector<PipelineId> edges_with_sink;
1151
0
        for (auto child : new_pip->children()) {
1152
0
            edges_with_sink.push_back(child->id());
1153
0
        }
1154
0
        _dag.insert({new_pip->id(), edges_with_sink});
1155
0
    }
1156
264
    cur_pipe->set_children(new_children);
1157
264
    _dag[downstream_pipeline_id] = edges_with_source;
1158
264
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
1159
264
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
1160
264
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
1161
1162
    // 7. Inherit properties from current pipeline.
1163
264
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
1164
264
    return Status::OK();
1165
264
}
1166
1167
Status PipelineFragmentContext::_add_local_exchange(
1168
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
1169
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
1170
        const std::map<int, int>& bucket_seq_to_instance_idx,
1171
4.48k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1172
4.48k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
1173
4.21k
        return Status::OK();
1174
4.21k
    }
1175
1176
264
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
1177
0
        return Status::OK();
1178
0
    }
1179
264
    *do_local_exchange = true;
1180
1181
264
    auto& operators = cur_pipe->operators();
1182
264
    auto total_op_num = operators.size();
1183
264
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
1184
264
    RETURN_IF_ERROR(_add_local_exchange_impl(
1185
264
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
1186
264
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1187
1188
264
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
1189
0
            << "total_op_num: " << total_op_num
1190
0
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
1191
0
            << " new_pip->operators().size(): " << new_pip->operators().size();
1192
1193
    // There are some local shuffles with relatively heavy operations on the sink.
1194
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
1195
    // Therefore, local passthrough is used to increase the concurrency of the sink.
1196
    // op -> local sink(1) -> local source (n)
1197
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
1198
264
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
1199
264
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
1200
0
        RETURN_IF_ERROR(_add_local_exchange_impl(
1201
0
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
1202
0
                add_pipeline(new_pip, pip_idx + 2),
1203
0
                DataDistribution(TLocalPartitionType::PASSTHROUGH), do_local_exchange, num_buckets,
1204
0
                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1205
0
    }
1206
264
    return Status::OK();
1207
264
}
1208
1209
Status PipelineFragmentContext::_plan_local_exchange(
1210
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
1211
32.4k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1212
77.3k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
1213
44.9k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
1214
        // Set property if child pipeline is not join operator's child.
1215
44.9k
        if (!_pipelines[pip_idx]->children().empty()) {
1216
9.38k
            for (auto& child : _pipelines[pip_idx]->children()) {
1217
9.38k
                if (child->sink()->node_id() ==
1218
9.38k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
1219
7.34k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
1220
7.34k
                }
1221
9.38k
            }
1222
8.36k
        }
1223
1224
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1225
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1226
        // still keep colocate plan after local shuffle
1227
44.9k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1228
44.9k
                                             bucket_seq_to_instance_idx,
1229
44.9k
                                             shuffle_idx_to_instance_idx));
1230
44.9k
    }
1231
32.4k
    return Status::OK();
1232
32.4k
}
1233
1234
Status PipelineFragmentContext::_plan_local_exchange(
1235
        int num_buckets, int pip_idx, PipelinePtr pip,
1236
        const std::map<int, int>& bucket_seq_to_instance_idx,
1237
44.9k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1238
44.9k
    int idx = 1;
1239
44.9k
    bool do_local_exchange = false;
1240
44.9k
    do {
1241
44.9k
        auto& ops = pip->operators();
1242
44.9k
        do_local_exchange = false;
1243
        // Plan local exchange for each operator.
1244
47.9k
        for (; idx < ops.size();) {
1245
3.06k
            auto _le_req = ops[idx]->required_data_distribution(_runtime_state.get());
1246
3.06k
            if (_le_req.need_local_exchange()) {
1247
2.04k
                RETURN_IF_ERROR(_add_local_exchange(
1248
2.04k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, _le_req,
1249
2.04k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1250
2.04k
                        shuffle_idx_to_instance_idx));
1251
2.04k
            }
1252
3.06k
            if (do_local_exchange) {
1253
                // If local exchange is needed for current operator, we will split this pipeline to
1254
                // two pipelines by local exchange sink/source. And then we need to process remaining
1255
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1256
                // is current operator was already processed) and continue to plan local exchange.
1257
0
                idx = 2;
1258
0
                break;
1259
0
            }
1260
3.06k
            idx++;
1261
3.06k
        }
1262
44.9k
    } while (do_local_exchange);
1263
44.9k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1264
2.43k
        RETURN_IF_ERROR(_add_local_exchange(
1265
2.43k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1266
2.43k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1267
2.43k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1268
2.43k
    }
1269
44.9k
    return Status::OK();
1270
44.9k
}
1271
1272
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1273
                                                  const std::vector<TExpr>& output_exprs,
1274
                                                  const TPipelineFragmentParams& params,
1275
                                                  const RowDescriptor& row_desc,
1276
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1277
61.6k
                                                  PipelineId cur_pipeline_id) {
1278
61.6k
    switch (thrift_sink.type) {
1279
17.2k
    case TDataSinkType::DATA_STREAM_SINK: {
1280
17.2k
        if (!thrift_sink.__isset.stream_sink) {
1281
0
            return Status::InternalError("Missing data stream sink.");
1282
0
        }
1283
17.2k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1284
17.2k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1285
17.2k
                params.destinations, _fragment_instance_ids);
1286
17.2k
        break;
1287
17.2k
    }
1288
38.6k
    case TDataSinkType::RESULT_SINK: {
1289
38.6k
        if (!thrift_sink.__isset.result_sink) {
1290
0
            return Status::InternalError("Missing data buffer sink.");
1291
0
        }
1292
1293
38.6k
        auto& pipeline = _pipelines[cur_pipeline_id];
1294
38.6k
        int child_node_id = pipeline->operators().back()->node_id();
1295
38.6k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), child_node_id + 1,
1296
38.6k
                                                      row_desc, output_exprs,
1297
38.6k
                                                      thrift_sink.result_sink);
1298
38.6k
        break;
1299
38.6k
    }
1300
54
    case TDataSinkType::DICTIONARY_SINK: {
1301
54
        if (!thrift_sink.__isset.dictionary_sink) {
1302
0
            return Status::InternalError("Missing dict sink.");
1303
0
        }
1304
1305
54
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1306
54
                                                    thrift_sink.dictionary_sink);
1307
54
        break;
1308
54
    }
1309
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1310
2.79k
    case TDataSinkType::OLAP_TABLE_SINK: {
1311
2.79k
        auto& pipeline = _pipelines[cur_pipeline_id];
1312
2.79k
        int child_node_id = pipeline->operators().back()->node_id();
1313
2.79k
        if (state->query_options().enable_memtable_on_sink_node &&
1314
2.79k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1315
2.79k
            !_has_row_binlog(thrift_sink.olap_table_sink) && !config::is_cloud_mode()) {
1316
1.45k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(
1317
1.45k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1318
1.45k
        } else {
1319
1.34k
            _sink = std::make_shared<OlapTableSinkOperatorX>(
1320
1.34k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1321
1.34k
        }
1322
2.79k
        break;
1323
0
    }
1324
0
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1325
0
        DCHECK(thrift_sink.__isset.olap_table_sink);
1326
0
        DCHECK(state->get_query_ctx() != nullptr);
1327
0
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1328
0
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1329
0
                                                                output_exprs);
1330
0
        break;
1331
0
    }
1332
742
    case TDataSinkType::HIVE_TABLE_SINK: {
1333
742
        if (!thrift_sink.__isset.hive_table_sink) {
1334
0
            return Status::InternalError("Missing hive table sink.");
1335
0
        }
1336
742
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1337
742
                                                         output_exprs);
1338
742
        break;
1339
742
    }
1340
868
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1341
868
        if (!thrift_sink.__isset.iceberg_table_sink) {
1342
0
            return Status::InternalError("Missing iceberg table sink.");
1343
0
        }
1344
868
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1345
2
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1346
2
                                                                     row_desc, output_exprs);
1347
866
        } else {
1348
866
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1349
866
                                                                row_desc, output_exprs);
1350
866
        }
1351
868
        break;
1352
868
    }
1353
10
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1354
10
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1355
0
            return Status::InternalError("Missing iceberg delete sink.");
1356
0
        }
1357
10
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1358
10
                                                             row_desc, output_exprs);
1359
10
        break;
1360
10
    }
1361
40
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1362
40
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1363
0
            return Status::InternalError("Missing iceberg merge sink.");
1364
0
        }
1365
40
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1366
40
                                                            output_exprs);
1367
40
        break;
1368
40
    }
1369
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1370
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1371
0
            return Status::InternalError("Missing max compute table sink.");
1372
0
        }
1373
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1374
0
                                                       output_exprs);
1375
0
        break;
1376
0
    }
1377
44
    case TDataSinkType::JDBC_TABLE_SINK: {
1378
44
        if (!thrift_sink.__isset.jdbc_table_sink) {
1379
0
            return Status::InternalError("Missing data jdbc sink.");
1380
0
        }
1381
44
        if (config::enable_java_support) {
1382
44
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1383
44
                                                             output_exprs);
1384
44
        } else {
1385
0
            return Status::InternalError(
1386
0
                    "Jdbc table sink is not enabled, you can change be config "
1387
0
                    "enable_java_support to true and restart be.");
1388
0
        }
1389
44
        break;
1390
44
    }
1391
44
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1392
0
        if (!thrift_sink.__isset.memory_scratch_sink) {
1393
0
            return Status::InternalError("Missing data buffer sink.");
1394
0
        }
1395
1396
0
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1397
0
                                                             output_exprs);
1398
0
        break;
1399
0
    }
1400
82
    case TDataSinkType::RESULT_FILE_SINK: {
1401
82
        if (!thrift_sink.__isset.result_file_sink) {
1402
0
            return Status::InternalError("Missing result file sink.");
1403
0
        }
1404
1405
        // Result file sink is not the top sink
1406
82
        if (params.__isset.destinations && !params.destinations.empty()) {
1407
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1408
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1409
0
                    params.destinations, output_exprs, desc_tbl);
1410
82
        } else {
1411
82
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1412
82
                                                              output_exprs);
1413
82
        }
1414
82
        break;
1415
82
    }
1416
1.02k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1417
1.02k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1418
1.02k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1419
1.02k
        auto sink_id = next_sink_operator_id();
1420
1.02k
        const int multi_cast_node_id = sink_id;
1421
1.02k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1422
        // one sink has multiple sources.
1423
1.02k
        std::vector<int> sources;
1424
4.08k
        for (int i = 0; i < sender_size; ++i) {
1425
3.06k
            auto source_id = next_operator_id();
1426
3.06k
            sources.push_back(source_id);
1427
3.06k
        }
1428
1429
1.02k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1430
1.02k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1431
4.08k
        for (int i = 0; i < sender_size; ++i) {
1432
3.06k
            auto new_pipeline = add_pipeline();
1433
            // use to exchange sink
1434
3.06k
            RowDescriptor* exchange_row_desc = nullptr;
1435
3.06k
            {
1436
3.06k
                const auto& tmp_row_desc =
1437
3.06k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1438
3.06k
                                ? RowDescriptor(state->desc_tbl(),
1439
3.06k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1440
3.06k
                                                         .output_tuple_id})
1441
3.06k
                                : row_desc;
1442
3.06k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1443
3.06k
            }
1444
3.06k
            auto source_id = sources[i];
1445
3.06k
            OperatorPtr source_op;
1446
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1447
3.06k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1448
3.06k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1449
3.06k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1450
3.06k
                    /*operator_id=*/source_id);
1451
3.06k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1452
3.06k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1453
            // 2. create and set sink operator of data stream sender for new pipeline
1454
1455
3.06k
            DataSinkOperatorPtr sink_op;
1456
3.06k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1457
3.06k
                    state, *exchange_row_desc, next_sink_operator_id(),
1458
3.06k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1459
3.06k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1460
1461
3.06k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1462
3.06k
            {
1463
3.06k
                TDataSink* t = pool->add(new TDataSink());
1464
3.06k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1465
3.06k
                RETURN_IF_ERROR(sink_op->init(*t));
1466
3.06k
            }
1467
1468
            // 3. set dependency dag
1469
3.06k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1470
3.06k
        }
1471
1.02k
        if (sources.empty()) {
1472
0
            return Status::InternalError("size of sources must be greater than 0");
1473
0
        }
1474
1.02k
        break;
1475
1.02k
    }
1476
1.02k
    case TDataSinkType::BLACKHOLE_SINK: {
1477
5
        if (!thrift_sink.__isset.blackhole_sink) {
1478
0
            return Status::InternalError("Missing blackhole sink.");
1479
0
        }
1480
1481
5
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1482
5
        break;
1483
5
    }
1484
78
    case TDataSinkType::TVF_TABLE_SINK: {
1485
78
        if (!thrift_sink.__isset.tvf_table_sink) {
1486
0
            return Status::InternalError("Missing TVF table sink.");
1487
0
        }
1488
78
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1489
78
                                                        output_exprs);
1490
78
        break;
1491
78
    }
1492
0
    default:
1493
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1494
61.6k
    }
1495
61.6k
    return Status::OK();
1496
61.6k
}
1497
1498
// NOLINTBEGIN(readability-function-size)
1499
// NOLINTBEGIN(readability-function-cognitive-complexity)
1500
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1501
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1502
                                                 PipelinePtr& cur_pipe, int parent_idx,
1503
                                                 int child_idx,
1504
                                                 const bool followed_by_shuffled_operator,
1505
                                                 const bool require_bucket_distribution,
1506
96.8k
                                                 OperatorPtr& cache_op) {
1507
96.8k
    std::vector<DataSinkOperatorPtr> sink_ops;
1508
96.8k
    Defer defer = Defer([&]() {
1509
96.8k
        if (op) {
1510
96.8k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1511
96.8k
        }
1512
96.8k
        for (auto& s : sink_ops) {
1513
30.7k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1514
30.7k
        }
1515
96.8k
    });
1516
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1517
    // Therefore, here we need to use a stack-like structure.
1518
96.8k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1519
96.8k
    std::stringstream error_msg;
1520
96.8k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1521
1522
96.8k
    bool fe_with_old_version = false;
1523
96.8k
    switch (tnode.node_type) {
1524
28.7k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1525
28.7k
        op = std::make_shared<OlapScanOperatorX>(
1526
28.7k
                pool, tnode, next_operator_id(), descs, _num_instances,
1527
28.7k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1528
28.7k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1529
28.7k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1530
28.7k
        break;
1531
28.7k
    }
1532
0
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1533
0
        DCHECK(_query_ctx != nullptr);
1534
0
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1535
0
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1536
0
                                                    _num_instances);
1537
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1538
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1539
0
        break;
1540
0
    }
1541
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1542
0
        if (config::enable_java_support) {
1543
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1544
0
                                                     _num_instances);
1545
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1546
0
        } else {
1547
0
            return Status::InternalError(
1548
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1549
0
                    "to true and restart be.");
1550
0
        }
1551
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1552
0
        break;
1553
0
    }
1554
11.6k
    case TPlanNodeType::FILE_SCAN_NODE: {
1555
11.6k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1556
11.6k
                                                 _num_instances);
1557
11.6k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1558
11.6k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1559
11.6k
        break;
1560
11.6k
    }
1561
20.3k
    case TPlanNodeType::EXCHANGE_NODE: {
1562
20.3k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1563
20.3k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1564
20.3k
                                  : 0;
1565
20.3k
        DCHECK_GT(num_senders, 0);
1566
20.3k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1567
20.3k
                                                       num_senders);
1568
20.3k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1569
20.3k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1570
20.3k
        break;
1571
20.3k
    }
1572
12.7k
    case TPlanNodeType::AGGREGATION_NODE: {
1573
12.7k
        if (tnode.agg_node.grouping_exprs.empty() &&
1574
12.7k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1575
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1576
0
                                         ": group by and output is empty");
1577
0
        }
1578
12.7k
        bool need_create_cache_op =
1579
12.7k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1580
12.7k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1581
0
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1582
0
            auto cache_source_id = next_operator_id();
1583
0
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1584
0
                                                        _params.fragment.query_cache_param);
1585
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1586
1587
0
            const auto downstream_pipeline_id = cur_pipe->id();
1588
0
            if (!_dag.contains(downstream_pipeline_id)) {
1589
0
                _dag.insert({downstream_pipeline_id, {}});
1590
0
            }
1591
0
            new_pipe = add_pipeline(cur_pipe);
1592
0
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1593
1594
0
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1595
0
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1596
0
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1597
0
            return Status::OK();
1598
0
        };
1599
12.7k
        const bool group_by_limit_opt =
1600
12.7k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1601
1602
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1603
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1604
12.7k
        const bool enable_spill = _runtime_state->enable_spill() &&
1605
12.7k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1606
12.7k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1607
12.7k
                                      tnode.agg_node.use_streaming_preaggregation &&
1608
12.7k
                                      !tnode.agg_node.grouping_exprs.empty();
1609
        // TODO: distinct streaming agg does not support spill.
1610
12.7k
        const bool can_use_distinct_streaming_agg =
1611
12.7k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1612
12.7k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1613
12.7k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1614
12.7k
                _params.query_options.enable_distinct_streaming_aggregation;
1615
1616
12.7k
        if (can_use_distinct_streaming_agg) {
1617
26
            if (need_create_cache_op) {
1618
0
                PipelinePtr new_pipe;
1619
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1620
1621
0
                cache_op = op;
1622
0
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1623
0
                                                                     tnode, descs);
1624
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1625
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1626
0
                cur_pipe = new_pipe;
1627
26
            } else {
1628
26
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1629
26
                                                                     tnode, descs);
1630
26
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1631
26
            }
1632
12.7k
        } else if (is_streaming_agg) {
1633
341
            if (need_create_cache_op) {
1634
0
                PipelinePtr new_pipe;
1635
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1636
0
                cache_op = op;
1637
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1638
0
                                                             descs);
1639
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1640
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1641
0
                cur_pipe = new_pipe;
1642
341
            } else {
1643
341
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1644
341
                                                             descs);
1645
341
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1646
341
            }
1647
12.3k
        } else {
1648
            // create new pipeline to add query cache operator
1649
12.3k
            PipelinePtr new_pipe;
1650
12.3k
            if (need_create_cache_op) {
1651
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1652
0
                cache_op = op;
1653
0
            }
1654
1655
12.3k
            if (enable_spill) {
1656
32
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1657
32
                                                                     next_operator_id(), descs);
1658
12.3k
            } else {
1659
12.3k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1660
12.3k
            }
1661
12.3k
            if (need_create_cache_op) {
1662
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1663
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1664
0
                cur_pipe = new_pipe;
1665
12.3k
            } else {
1666
12.3k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1667
12.3k
            }
1668
1669
12.3k
            const auto downstream_pipeline_id = cur_pipe->id();
1670
12.3k
            if (!_dag.contains(downstream_pipeline_id)) {
1671
11.3k
                _dag.insert({downstream_pipeline_id, {}});
1672
11.3k
            }
1673
12.3k
            cur_pipe = add_pipeline(cur_pipe);
1674
12.3k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1675
1676
12.3k
            if (enable_spill) {
1677
32
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1678
32
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1679
12.3k
            } else {
1680
12.3k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1681
12.3k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1682
12.3k
            }
1683
12.3k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1684
12.3k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1685
12.3k
        }
1686
12.7k
        break;
1687
12.7k
    }
1688
12.7k
    case TPlanNodeType::BUCKETED_AGGREGATION_NODE: {
1689
0
        if (tnode.bucketed_agg_node.grouping_exprs.empty()) {
1690
0
            return Status::InternalError(
1691
0
                    "Bucketed aggregation node {} should not be used without group by keys",
1692
0
                    tnode.node_id);
1693
0
        }
1694
1695
        // Create source operator (goes on the current / downstream pipeline).
1696
0
        op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1697
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1698
1699
        // Create a new pipeline for the sink side.
1700
0
        const auto downstream_pipeline_id = cur_pipe->id();
1701
0
        if (!_dag.contains(downstream_pipeline_id)) {
1702
0
            _dag.insert({downstream_pipeline_id, {}});
1703
0
        }
1704
0
        cur_pipe = add_pipeline(cur_pipe);
1705
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1706
1707
        // Create sink operator.
1708
0
        sink_ops.push_back(std::make_shared<BucketedAggSinkOperatorX>(
1709
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1710
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1711
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1712
1713
        // Pre-register a single shared state for ALL instances so that every
1714
        // sink instance writes its per-instance hash table into the same
1715
        // BucketedAggSharedState and every source instance can merge across
1716
        // all of them.
1717
0
        {
1718
0
            auto shared_state = BucketedAggSharedState::create_shared();
1719
0
            shared_state->id = op->operator_id();
1720
0
            shared_state->related_op_ids.insert(op->operator_id());
1721
1722
0
            for (int i = 0; i < _num_instances; i++) {
1723
0
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1724
0
                                                             "BUCKETED_AGG_SINK_DEPENDENCY");
1725
0
                sink_dep->set_shared_state(shared_state.get());
1726
0
                shared_state->sink_deps.push_back(sink_dep);
1727
0
            }
1728
0
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1729
0
                                                     op->node_id(), "BUCKETED_AGG_SOURCE");
1730
0
            _op_id_to_shared_state.insert(
1731
0
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1732
0
        }
1733
0
        break;
1734
0
    }
1735
416
    case TPlanNodeType::HASH_JOIN_NODE: {
1736
416
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1737
416
                                       tnode.hash_join_node.is_broadcast_join;
1738
416
        const auto enable_spill = _runtime_state->enable_spill();
1739
416
        if (enable_spill && !is_broadcast_join) {
1740
0
            auto tnode_ = tnode;
1741
0
            tnode_.runtime_filters.clear();
1742
0
            auto inner_probe_operator =
1743
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1744
1745
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1746
            // So here use `tnode_` which has no runtime filters.
1747
0
            auto probe_side_inner_sink_operator =
1748
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1749
1750
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1751
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1752
1753
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1754
0
                    pool, tnode_, next_operator_id(), descs);
1755
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1756
0
                                                inner_probe_operator);
1757
0
            op = std::move(probe_operator);
1758
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1759
1760
0
            const auto downstream_pipeline_id = cur_pipe->id();
1761
0
            if (!_dag.contains(downstream_pipeline_id)) {
1762
0
                _dag.insert({downstream_pipeline_id, {}});
1763
0
            }
1764
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1765
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1766
1767
0
            auto inner_sink_operator =
1768
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1769
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1770
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1771
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1772
1773
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1774
0
            sink_ops.push_back(std::move(sink_operator));
1775
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1776
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1777
1778
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1779
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1780
416
        } else {
1781
416
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1782
416
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1783
1784
416
            const auto downstream_pipeline_id = cur_pipe->id();
1785
416
            if (!_dag.contains(downstream_pipeline_id)) {
1786
410
                _dag.insert({downstream_pipeline_id, {}});
1787
410
            }
1788
416
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1789
416
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1790
1791
416
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1792
416
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1793
416
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1794
416
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1795
1796
416
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1797
416
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1798
416
        }
1799
416
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1800
378
            std::shared_ptr<HashJoinSharedState> shared_state =
1801
378
                    HashJoinSharedState::create_shared(_num_instances);
1802
3.30k
            for (int i = 0; i < _num_instances; i++) {
1803
2.92k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1804
2.92k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1805
2.92k
                sink_dep->set_shared_state(shared_state.get());
1806
2.92k
                shared_state->sink_deps.push_back(sink_dep);
1807
2.92k
            }
1808
378
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1809
378
                                                     op->node_id(), "HASH_JOIN_PROBE");
1810
378
            _op_id_to_shared_state.insert(
1811
378
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1812
378
        }
1813
416
        break;
1814
416
    }
1815
2.07k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1816
2.07k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1817
2.07k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1818
1819
2.07k
        const auto downstream_pipeline_id = cur_pipe->id();
1820
2.07k
        if (!_dag.contains(downstream_pipeline_id)) {
1821
2.07k
            _dag.insert({downstream_pipeline_id, {}});
1822
2.07k
        }
1823
2.07k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1824
2.07k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1825
1826
2.07k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1827
2.07k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1828
2.07k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1829
2.07k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1830
2.07k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1831
2.07k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1832
2.07k
        break;
1833
2.07k
    }
1834
2.20k
    case TPlanNodeType::UNION_NODE: {
1835
2.20k
        int child_count = tnode.num_children;
1836
2.20k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1837
2.20k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1838
1839
2.20k
        const auto downstream_pipeline_id = cur_pipe->id();
1840
2.20k
        if (!_dag.contains(downstream_pipeline_id)) {
1841
2.18k
            _dag.insert({downstream_pipeline_id, {}});
1842
2.18k
        }
1843
2.40k
        for (int i = 0; i < child_count; i++) {
1844
204
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1845
204
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1846
204
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1847
204
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1848
204
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1849
204
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1850
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1851
204
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1852
204
        }
1853
2.20k
        break;
1854
2.20k
    }
1855
6.41k
    case TPlanNodeType::SORT_NODE: {
1856
6.41k
        const auto should_spill = _runtime_state->enable_spill() &&
1857
6.41k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1858
6.41k
        const bool use_local_merge =
1859
6.41k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1860
6.41k
        if (should_spill) {
1861
4
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1862
6.41k
        } else if (use_local_merge) {
1863
6.17k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1864
6.17k
                                                                 descs);
1865
6.17k
        } else {
1866
237
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1867
237
        }
1868
6.41k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1869
1870
6.41k
        const auto downstream_pipeline_id = cur_pipe->id();
1871
6.41k
        if (!_dag.contains(downstream_pipeline_id)) {
1872
6.41k
            _dag.insert({downstream_pipeline_id, {}});
1873
6.41k
        }
1874
6.41k
        cur_pipe = add_pipeline(cur_pipe);
1875
6.41k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1876
1877
6.41k
        if (should_spill) {
1878
4
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1879
4
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1880
6.41k
        } else {
1881
6.41k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1882
6.41k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1883
6.41k
        }
1884
6.41k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1885
6.41k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1886
6.41k
        break;
1887
6.41k
    }
1888
6.41k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1889
0
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1890
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1891
1892
0
        const auto downstream_pipeline_id = cur_pipe->id();
1893
0
        if (!_dag.contains(downstream_pipeline_id)) {
1894
0
            _dag.insert({downstream_pipeline_id, {}});
1895
0
        }
1896
0
        cur_pipe = add_pipeline(cur_pipe);
1897
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1898
1899
0
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1900
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1901
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1902
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1903
0
        break;
1904
0
    }
1905
22
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1906
22
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1907
22
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1908
1909
22
        const auto downstream_pipeline_id = cur_pipe->id();
1910
22
        if (!_dag.contains(downstream_pipeline_id)) {
1911
22
            _dag.insert({downstream_pipeline_id, {}});
1912
22
        }
1913
22
        cur_pipe = add_pipeline(cur_pipe);
1914
22
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1915
1916
22
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1917
22
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1918
22
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1919
22
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1920
22
        break;
1921
22
    }
1922
467
    case TPlanNodeType::MATERIALIZATION_NODE: {
1923
467
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1924
467
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1925
467
        break;
1926
467
    }
1927
467
    case TPlanNodeType::INTERSECT_NODE: {
1928
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1929
0
                                                                      cur_pipe, sink_ops));
1930
0
        break;
1931
0
    }
1932
0
    case TPlanNodeType::EXCEPT_NODE: {
1933
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1934
0
                                                                       cur_pipe, sink_ops));
1935
0
        break;
1936
0
    }
1937
0
    case TPlanNodeType::REPEAT_NODE: {
1938
0
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1939
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1940
0
        break;
1941
0
    }
1942
2
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1943
2
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1944
2
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1945
2
        break;
1946
2
    }
1947
100
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1948
100
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1949
100
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1950
100
        break;
1951
100
    }
1952
103
    case TPlanNodeType::EMPTY_SET_NODE: {
1953
103
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1954
103
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1955
103
        break;
1956
103
    }
1957
103
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1958
99
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1959
99
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1960
99
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1961
99
        break;
1962
99
    }
1963
270
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1964
270
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1965
270
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1966
270
        break;
1967
270
    }
1968
943
    case TPlanNodeType::META_SCAN_NODE: {
1969
943
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1970
943
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1971
943
        break;
1972
943
    }
1973
1.02k
    case TPlanNodeType::SELECT_NODE: {
1974
1.02k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1975
1.02k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1976
1.02k
        break;
1977
1.02k
    }
1978
1.02k
    case TPlanNodeType::REC_CTE_NODE: {
1979
0
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1980
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1981
1982
0
        const auto downstream_pipeline_id = cur_pipe->id();
1983
0
        if (!_dag.contains(downstream_pipeline_id)) {
1984
0
            _dag.insert({downstream_pipeline_id, {}});
1985
0
        }
1986
1987
0
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1988
0
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1989
1990
0
        DataSinkOperatorPtr anchor_sink;
1991
0
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1992
0
                                                                  op->operator_id(), tnode, descs);
1993
0
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1994
0
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1995
0
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1996
1997
0
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1998
0
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1999
2000
0
        DataSinkOperatorPtr rec_sink;
2001
0
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
2002
0
                                                         tnode, descs);
2003
0
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
2004
0
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
2005
0
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
2006
2007
0
        break;
2008
0
    }
2009
0
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
2010
0
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
2011
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2012
0
        break;
2013
0
    }
2014
9.24k
    case TPlanNodeType::LOCAL_EXCHANGE_NODE: {
2015
9.24k
        op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs);
2016
        // The downstream pipeline (containing LocalExchangeSource) must have
2017
        // _num_instances tasks — matching BE-native _inherit_pipeline_properties
2018
        // which sets pipe_with_source.set_num_tasks(_num_instances).
2019
        // Without this, when the parent pipeline was reduced by a serial operator
2020
        // (e.g., serial Exchange with use_serial_exchange=true, or UNPARTITIONED
2021
        // Exchange), the downstream inherits the reduced num_tasks via
2022
        // add_pipeline(parent).  The deferred exchanger creates _num_instances
2023
        // channels but only fewer source tasks initialize mem_counters — the
2024
        // sink round-robins to all channels and crashes on uninitialized ones.
2025
9.24k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2026
        // Restore downstream pipeline's num_tasks (mirroring _inherit_pipeline_properties:
2027
        // downstream keeps _num_instances, upstream gets the serial/reduced count)
2028
9.24k
        cur_pipe->set_num_tasks(_num_instances);
2029
2030
9.24k
        const auto downstream_pipeline_id = cur_pipe->id();
2031
9.24k
        if (!_dag.contains(downstream_pipeline_id)) {
2032
8.87k
            _dag.insert({downstream_pipeline_id, {}});
2033
8.87k
        }
2034
9.24k
        cur_pipe = add_pipeline(cur_pipe);
2035
        // If this local exchange was inserted because of a serial scan (is_serial_operator),
2036
        // the upstream pipeline (cur_pipe) should have num_tasks=1 (only 1 scan task).
2037
        // We set this now so the exchanger is created with the correct sender count.
2038
        // Child operators added later (serial scan) will also set num_tasks=1, which is
2039
        // consistent with this.
2040
9.24k
        if (op->is_serial_operator() && _parallel_instances > 0) {
2041
0
            cur_pipe->set_num_tasks(_parallel_instances);
2042
0
        }
2043
9.24k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
2044
9.24k
        int num_partitions = 0;
2045
9.24k
        std::map<int, int> shuffle_id_to_instance_idx;
2046
9.24k
        auto partition_type = tnode.local_exchange_node.partition_type;
2047
9.24k
        switch (partition_type) {
2048
4
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
2049
4
            num_partitions = _params.num_buckets;
2050
4
            shuffle_id_to_instance_idx = _params.bucket_seq_to_instance_idx;
2051
4
            break;
2052
340
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
2053
3.06k
            for (int i = 0; i < _num_instances; i++) {
2054
2.72k
                shuffle_id_to_instance_idx[i] = i;
2055
2.72k
            }
2056
340
            num_partitions = _num_instances;
2057
340
            break;
2058
0
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
2059
0
            num_partitions = _total_instances;
2060
0
            shuffle_id_to_instance_idx = _params.shuffle_idx_to_instance_idx;
2061
0
            break;
2062
8.89k
        default:
2063
8.89k
            break;
2064
9.24k
        }
2065
9.23k
        auto local_exchange_id = op->operator_id();
2066
9.23k
        auto sink_id = next_sink_operator_id();
2067
9.23k
        DataSinkOperatorPtr sink = std::make_shared<LocalExchangeSinkOperatorX>(
2068
9.23k
                sink_id, local_exchange_id, tnode, num_partitions, shuffle_id_to_instance_idx);
2069
9.23k
        sink_ops.push_back(sink);
2070
9.23k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
2071
9.23k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
2072
2073
        // For FE-planned local exchange, we need to:
2074
        // 1. Initialize the partitioner for hash shuffle types
2075
        // 2. Defer exchanger creation until after the full plan tree is built
2076
        //    (child operators like serial ExchangeNode may change cur_pipe->num_tasks())
2077
        // 3. Register shared state so pipeline tasks can find it
2078
9.23k
        RETURN_IF_ERROR(static_cast<LocalExchangeSinkOperatorX*>(cur_pipe->sink())
2079
9.23k
                                ->init_partitioner(_runtime_state.get()));
2080
2081
9.23k
        int free_blocks_limit =
2082
9.23k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
2083
9.23k
                        ? cast_set<int>(
2084
9.23k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
2085
9.23k
                        : 0;
2086
9.23k
        auto shared_state = LocalExchangeSharedState::create_shared(_num_instances);
2087
9.23k
        shared_state->create_source_dependencies(_num_instances, local_exchange_id,
2088
9.23k
                                                 local_exchange_id, "LOCAL_EXCHANGE_OPERATOR");
2089
9.23k
        shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
2090
9.23k
        _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
2091
        // Defer exchanger creation: sender count depends on final upstream num_tasks
2092
9.23k
        _deferred_exchangers.push_back({shared_state, cur_pipe, partition_type, num_partitions,
2093
9.23k
                                        free_blocks_limit, local_exchange_id, sink_id});
2094
9.23k
        break;
2095
9.23k
    }
2096
0
    default:
2097
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
2098
0
                                     print_plan_node_type(tnode.node_type));
2099
96.8k
    }
2100
96.8k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
2101
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
2102
0
        op->set_serial_operator();
2103
0
    }
2104
2105
96.8k
    return Status::OK();
2106
96.8k
}
2107
// NOLINTEND(readability-function-cognitive-complexity)
2108
// NOLINTEND(readability-function-size)
2109
2110
template <bool is_intersect>
2111
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
2112
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
2113
0
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2114
0
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2115
0
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2116
2117
0
    const auto downstream_pipeline_id = cur_pipe->id();
2118
0
    if (!_dag.contains(downstream_pipeline_id)) {
2119
0
        _dag.insert({downstream_pipeline_id, {}});
2120
0
    }
2121
2122
0
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2123
0
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2124
0
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2125
2126
0
        if (child_id == 0) {
2127
0
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2128
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2129
0
        } else {
2130
0
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2131
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2132
0
        }
2133
0
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2134
0
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2135
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2136
0
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2137
0
    }
2138
2139
0
    return Status::OK();
2140
0
}
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
2141
2142
61.6k
Status PipelineFragmentContext::submit() {
2143
61.6k
    if (_submitted) {
2144
0
        return Status::InternalError("submitted");
2145
0
    }
2146
61.6k
    _submitted = true;
2147
2148
61.6k
    int submit_tasks = 0;
2149
61.6k
    Status st;
2150
61.6k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
2151
135k
    for (auto& task : _tasks) {
2152
241k
        for (auto& t : task) {
2153
241k
            st = scheduler->submit(t.first);
2154
241k
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
2155
241k
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
2156
241k
            if (!st) {
2157
0
                cancel(Status::InternalError("submit context to executor fail"));
2158
0
                std::lock_guard<std::mutex> l(_task_mutex);
2159
0
                _total_tasks = submit_tasks;
2160
0
                break;
2161
0
            }
2162
241k
            submit_tasks++;
2163
241k
        }
2164
135k
    }
2165
61.6k
    if (!st.ok()) {
2166
0
        bool need_remove = false;
2167
0
        {
2168
0
            std::lock_guard<std::mutex> l(_task_mutex);
2169
0
            if (_closed_tasks >= _total_tasks) {
2170
0
                need_remove = _close_fragment_instance();
2171
0
            }
2172
0
        }
2173
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2174
0
        if (need_remove) {
2175
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2176
0
        }
2177
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
2178
0
                                     BackendOptions::get_localhost());
2179
61.6k
    } else {
2180
61.6k
        return st;
2181
61.6k
    }
2182
61.6k
}
2183
2184
0
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
2185
0
    if (_runtime_state->enable_profile()) {
2186
0
        std::stringstream ss;
2187
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2188
0
            runtime_profile_ptr->pretty_print(&ss);
2189
0
        }
2190
2191
0
        if (_runtime_state->load_channel_profile()) {
2192
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2193
0
        }
2194
2195
0
        auto profile_str =
2196
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
2197
0
                            this->_fragment_id, extra_info, ss.str());
2198
0
        LOG_LONG_STRING(INFO, profile_str);
2199
0
    }
2200
0
}
2201
// If all pipeline tasks binded to the fragment instance are finished, then we could
2202
// close the fragment instance.
2203
// Returns true if the caller should call remove_pipeline_context() **after** releasing
2204
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
2205
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
2206
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
2207
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
2208
// (via debug_string()).
2209
61.6k
bool PipelineFragmentContext::_close_fragment_instance() {
2210
61.6k
    if (_is_fragment_instance_closed) {
2211
0
        return false;
2212
0
    }
2213
61.6k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
2214
61.6k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
2215
61.6k
    if (!_need_notify_close) {
2216
61.6k
        auto st = send_report(true);
2217
61.6k
        if (!st) {
2218
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
2219
0
                                        print_id(_query_id), _fragment_id, st.to_string());
2220
0
        }
2221
61.6k
    }
2222
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
2223
    // Since stream load does not have someting like coordinator on FE, so
2224
    // backend can not report profile to FE, ant its profile can not be shown
2225
    // in the same way with other query. So we print the profile content to info log.
2226
2227
61.6k
    if (_runtime_state->enable_profile() &&
2228
61.6k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
2229
228
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
2230
228
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
2231
0
        std::stringstream ss;
2232
        // Compute the _local_time_percent before pretty_print the runtime_profile
2233
        // Before add this operation, the print out like that:
2234
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
2235
        // After add the operation, the print out like that:
2236
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
2237
        // We can easily know the exec node execute time without child time consumed.
2238
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2239
0
            runtime_profile_ptr->pretty_print(&ss);
2240
0
        }
2241
2242
0
        if (_runtime_state->load_channel_profile()) {
2243
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2244
0
        }
2245
2246
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
2247
0
    }
2248
2249
61.6k
    if (_query_ctx->enable_profile()) {
2250
228
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
2251
228
                                         collect_realtime_load_channel_profile());
2252
228
    }
2253
2254
    // Return whether the caller needs to remove from the pipeline map.
2255
    // The caller must do this after releasing _task_mutex.
2256
61.6k
    return !_need_notify_close;
2257
61.6k
}
2258
2259
240k
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
2260
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
2261
240k
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
2262
240k
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
2263
95.7k
        if (_dag.contains(pipeline_id)) {
2264
34.6k
            for (auto dep : _dag[pipeline_id]) {
2265
34.3k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
2266
34.3k
            }
2267
34.6k
        }
2268
95.7k
    }
2269
240k
    bool need_remove = false;
2270
240k
    {
2271
240k
        std::lock_guard<std::mutex> l(_task_mutex);
2272
240k
        ++_closed_tasks;
2273
        // Update query-level finished task progress in real time.
2274
240k
        _query_ctx->inc_finished_task_num();
2275
240k
        if (_closed_tasks >= _total_tasks) {
2276
61.6k
            need_remove = _close_fragment_instance();
2277
61.6k
        }
2278
240k
    }
2279
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2280
240k
    if (need_remove) {
2281
61.6k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2282
61.6k
    }
2283
240k
}
2284
2285
7.02k
std::string PipelineFragmentContext::get_load_error_url() {
2286
7.02k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
2287
0
        return to_load_error_http_path(str);
2288
0
    }
2289
23.3k
    for (auto& tasks : _tasks) {
2290
29.3k
        for (auto& task : tasks) {
2291
29.3k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
2292
6
                return to_load_error_http_path(str);
2293
6
            }
2294
29.3k
        }
2295
23.3k
    }
2296
7.01k
    return "";
2297
7.02k
}
2298
2299
7.02k
std::string PipelineFragmentContext::get_first_error_msg() {
2300
7.02k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
2301
0
        return str;
2302
0
    }
2303
23.3k
    for (auto& tasks : _tasks) {
2304
29.3k
        for (auto& task : tasks) {
2305
29.3k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
2306
6
                return str;
2307
6
            }
2308
29.3k
        }
2309
23.3k
    }
2310
7.01k
    return "";
2311
7.02k
}
2312
2313
0
std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const {
2314
0
    std::stringstream url;
2315
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
2316
0
        << "/api/_download_load?"
2317
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
2318
0
    return url.str();
2319
0
}
2320
2321
6.52k
void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) {
2322
6.52k
    DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
2323
6.52k
        int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
2324
6.52k
        LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
2325
6.52k
        std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
2326
6.52k
        LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
2327
6.52k
    });
2328
2329
6.52k
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
2330
6.52k
    if (req.coord_addr.hostname == "external") {
2331
        // External query (flink/spark read tablets) not need to report to FE.
2332
0
        return;
2333
0
    }
2334
6.52k
    int callback_retries = 10;
2335
6.52k
    const int sleep_ms = 1000;
2336
6.52k
    Status exec_status = req.status;
2337
6.52k
    Status coord_status;
2338
6.52k
    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
2339
6.52k
    do {
2340
6.52k
        coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
2341
6.52k
                                                            req.coord_addr, &coord_status);
2342
6.52k
        if (!coord_status.ok()) {
2343
0
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
2344
0
        }
2345
6.52k
    } while (!coord_status.ok() && callback_retries-- > 0);
2346
2347
6.52k
    if (!coord_status.ok()) {
2348
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
2349
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
2350
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
2351
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
2352
0
        return;
2353
0
    }
2354
2355
6.52k
    TReportExecStatusParams params;
2356
6.52k
    params.protocol_version = FrontendServiceVersion::V1;
2357
6.52k
    params.__set_query_id(req.query_id);
2358
6.52k
    params.__set_backend_num(req.backend_num);
2359
6.52k
    params.__set_fragment_instance_id(req.fragment_instance_id);
2360
6.52k
    params.__set_fragment_id(req.fragment_id);
2361
6.52k
    params.__set_status(exec_status.to_thrift());
2362
6.52k
    params.__set_done(req.done);
2363
6.52k
    params.__set_query_type(req.runtime_state->query_type());
2364
6.52k
    params.__isset.profile = false;
2365
2366
6.52k
    DCHECK(req.runtime_state != nullptr);
2367
2368
6.52k
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
2369
6.00k
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2370
6.00k
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2371
6.00k
    } else {
2372
527
        DCHECK(!req.runtime_states.empty());
2373
527
        if (!req.runtime_state->output_files().empty()) {
2374
0
            params.__isset.delta_urls = true;
2375
0
            for (auto& it : req.runtime_state->output_files()) {
2376
0
                params.delta_urls.push_back(_to_http_path(it));
2377
0
            }
2378
0
        }
2379
527
        if (!params.delta_urls.empty()) {
2380
0
            params.__isset.delta_urls = true;
2381
0
        }
2382
527
    }
2383
2384
6.52k
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
2385
6.52k
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
2386
6.52k
    static std::string s_unselected_rows = "unselected.rows";
2387
6.52k
    int64_t num_rows_load_success = 0;
2388
6.52k
    int64_t num_rows_load_filtered = 0;
2389
6.52k
    int64_t num_rows_load_unselected = 0;
2390
6.52k
    if (req.runtime_state->num_rows_load_total() > 0 ||
2391
6.52k
        req.runtime_state->num_rows_load_filtered() > 0 ||
2392
6.52k
        req.runtime_state->num_finished_range() > 0) {
2393
0
        params.__isset.load_counters = true;
2394
2395
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
2396
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
2397
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
2398
0
        params.__isset.fragment_instance_reports = true;
2399
0
        TFragmentInstanceReport t;
2400
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
2401
0
        t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
2402
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2403
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2404
0
        params.fragment_instance_reports.push_back(t);
2405
6.52k
    } else if (!req.runtime_states.empty()) {
2406
24.6k
        for (auto* rs : req.runtime_states) {
2407
24.6k
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
2408
24.6k
                rs->num_finished_range() > 0) {
2409
3.59k
                params.__isset.load_counters = true;
2410
3.59k
                num_rows_load_success += rs->num_rows_load_success();
2411
3.59k
                num_rows_load_filtered += rs->num_rows_load_filtered();
2412
3.59k
                num_rows_load_unselected += rs->num_rows_load_unselected();
2413
3.59k
                params.__isset.fragment_instance_reports = true;
2414
3.59k
                TFragmentInstanceReport t;
2415
3.59k
                t.__set_fragment_instance_id(rs->fragment_instance_id());
2416
3.59k
                t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
2417
3.59k
                t.__set_loaded_rows(rs->num_rows_load_total());
2418
3.59k
                t.__set_loaded_bytes(rs->num_bytes_load_total());
2419
3.59k
                params.fragment_instance_reports.push_back(t);
2420
3.59k
            }
2421
24.6k
        }
2422
6.52k
    }
2423
6.52k
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
2424
6.52k
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
2425
6.52k
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
2426
2427
6.52k
    if (!req.load_error_url.empty()) {
2428
6
        params.__set_tracking_url(req.load_error_url);
2429
6
    }
2430
6.52k
    if (!req.first_error_msg.empty()) {
2431
6
        params.__set_first_error_msg(req.first_error_msg);
2432
6
    }
2433
24.6k
    for (auto* rs : req.runtime_states) {
2434
24.6k
        if (rs->wal_id() > 0) {
2435
0
            params.__set_txn_id(rs->wal_id());
2436
0
            params.__set_label(rs->import_label());
2437
0
        }
2438
24.6k
    }
2439
6.52k
    if (!req.runtime_state->export_output_files().empty()) {
2440
0
        params.__isset.export_files = true;
2441
0
        params.export_files = req.runtime_state->export_output_files();
2442
6.52k
    } else if (!req.runtime_states.empty()) {
2443
24.6k
        for (auto* rs : req.runtime_states) {
2444
24.6k
            if (!rs->export_output_files().empty()) {
2445
0
                params.__isset.export_files = true;
2446
0
                params.export_files.insert(params.export_files.end(),
2447
0
                                           rs->export_output_files().begin(),
2448
0
                                           rs->export_output_files().end());
2449
0
            }
2450
24.6k
        }
2451
6.52k
    }
2452
6.52k
    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
2453
0
        params.__isset.commitInfos = true;
2454
0
        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
2455
6.52k
    } else if (!req.runtime_states.empty()) {
2456
24.6k
        for (auto* rs : req.runtime_states) {
2457
24.6k
            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
2458
1.51k
                params.__isset.commitInfos = true;
2459
1.51k
                params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
2460
1.51k
            }
2461
24.6k
        }
2462
6.52k
    }
2463
6.52k
    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
2464
0
        params.__isset.errorTabletInfos = true;
2465
0
        params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
2466
6.52k
    } else if (!req.runtime_states.empty()) {
2467
24.6k
        for (auto* rs : req.runtime_states) {
2468
24.6k
            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
2469
0
                params.__isset.errorTabletInfos = true;
2470
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
2471
0
                                               rs_eti.end());
2472
0
            }
2473
24.6k
        }
2474
6.52k
    }
2475
6.52k
    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
2476
0
        params.__isset.hive_partition_updates = true;
2477
0
        params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
2478
0
                                             hpu.end());
2479
6.52k
    } else if (!req.runtime_states.empty()) {
2480
24.6k
        for (auto* rs : req.runtime_states) {
2481
24.6k
            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
2482
1.09k
                params.__isset.hive_partition_updates = true;
2483
1.09k
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
2484
1.09k
                                                     rs_hpu.begin(), rs_hpu.end());
2485
1.09k
            }
2486
24.6k
        }
2487
6.52k
    }
2488
6.52k
    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
2489
0
        params.__isset.iceberg_commit_datas = true;
2490
0
        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
2491
0
                                           icd.end());
2492
6.52k
    } else if (!req.runtime_states.empty()) {
2493
24.6k
        for (auto* rs : req.runtime_states) {
2494
24.6k
            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
2495
1.04k
                params.__isset.iceberg_commit_datas = true;
2496
1.04k
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
2497
1.04k
                                                   rs_icd.begin(), rs_icd.end());
2498
1.04k
            }
2499
24.6k
        }
2500
6.52k
    }
2501
2502
6.52k
    if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
2503
0
        params.__isset.mc_commit_datas = true;
2504
0
        params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
2505
6.52k
    } else if (!req.runtime_states.empty()) {
2506
24.6k
        for (auto* rs : req.runtime_states) {
2507
24.6k
            if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
2508
0
                params.__isset.mc_commit_datas = true;
2509
0
                params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
2510
0
                                              rs_mcd.end());
2511
0
            }
2512
24.6k
        }
2513
6.52k
    }
2514
2515
6.52k
    req.runtime_state->get_unreported_errors(&(params.error_log));
2516
6.52k
    params.__isset.error_log = (!params.error_log.empty());
2517
2518
6.52k
    if (_exec_env->cluster_info()->backend_id != 0) {
2519
6.52k
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
2520
6.52k
    }
2521
2522
6.52k
    TReportExecStatusResult res;
2523
6.52k
    Status rpc_status;
2524
2525
6.52k
    VLOG_DEBUG << "reportExecStatus params is "
2526
0
               << apache::thrift::ThriftDebugString(params).c_str();
2527
6.52k
    if (!exec_status.ok()) {
2528
81
        LOG(WARNING) << "report error status: " << exec_status.msg()
2529
81
                     << " to coordinator: " << req.coord_addr
2530
81
                     << ", query id: " << print_id(req.query_id);
2531
81
    }
2532
6.52k
    try {
2533
6.52k
        try {
2534
6.52k
            (*coord)->reportExecStatus(res, params);
2535
6.52k
        } catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) {
2536
#ifndef ADDRESS_SANITIZER
2537
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
2538
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
2539
                         << req.coord_addr << ", err: " << e.what();
2540
#endif
2541
0
            rpc_status = coord->reopen();
2542
2543
0
            if (!rpc_status.ok()) {
2544
0
                req.cancel_fn(rpc_status);
2545
0
                return;
2546
0
            }
2547
0
            (*coord)->reportExecStatus(res, params);
2548
0
        }
2549
2550
6.52k
        rpc_status = Status::create<false>(res.status);
2551
6.52k
    } catch (apache::thrift::TException& e) {
2552
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
2553
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
2554
0
    }
2555
2556
6.52k
    if (!rpc_status.ok()) {
2557
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
2558
0
                 print_id(req.query_id), rpc_status.to_string());
2559
0
        req.cancel_fn(rpc_status);
2560
0
    }
2561
6.52k
}
2562
2563
62.3k
Status PipelineFragmentContext::send_report(bool done) {
2564
62.3k
    Status exec_status = _query_ctx->exec_status();
2565
    // If plan is done successfully, but _is_report_success is false,
2566
    // no need to send report.
2567
    // Load will set _is_report_success to true because load wants to know
2568
    // the process.
2569
62.3k
    if (!_is_report_success && done && exec_status.ok()) {
2570
55.7k
        return Status::OK();
2571
55.7k
    }
2572
2573
    // If both _is_report_success and _is_report_on_cancel are false,
2574
    // which means no matter query is success or failed, no report is needed.
2575
    // This may happen when the query limit reached and
2576
    // a internal cancellation being processed
2577
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
2578
    // be set to false, to avoid sending fault report to FE.
2579
6.62k
    if (!_is_report_success && !_is_report_on_cancel) {
2580
100
        if (done) {
2581
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
2582
100
            return Status::OK();
2583
100
        }
2584
0
        return Status::NeedSendAgain("");
2585
100
    }
2586
2587
6.52k
    std::vector<RuntimeState*> runtime_states;
2588
2589
21.1k
    for (auto& tasks : _tasks) {
2590
24.6k
        for (auto& task : tasks) {
2591
24.6k
            runtime_states.push_back(task.second.get());
2592
24.6k
        }
2593
21.1k
    }
2594
2595
6.52k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
2596
6.52k
                                        ? get_load_error_url()
2597
18.4E
                                        : _query_ctx->get_load_error_url();
2598
6.52k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2599
6.52k
                                          ? get_first_error_msg()
2600
18.4E
                                          : _query_ctx->get_first_error_msg();
2601
2602
6.52k
    ReportStatusRequest req {.status = exec_status,
2603
6.52k
                             .runtime_states = runtime_states,
2604
6.52k
                             .done = done || !exec_status.ok(),
2605
6.52k
                             .coord_addr = _query_ctx->coord_addr,
2606
6.52k
                             .query_id = _query_id,
2607
6.52k
                             .fragment_id = _fragment_id,
2608
6.52k
                             .fragment_instance_id = TUniqueId(),
2609
6.52k
                             .backend_num = -1,
2610
6.52k
                             .runtime_state = _runtime_state.get(),
2611
6.52k
                             .load_error_url = load_eror_url,
2612
6.52k
                             .first_error_msg = first_error_msg,
2613
6.52k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2614
6.52k
    auto ctx = std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this());
2615
6.52k
    return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() {
2616
6.52k
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
2617
6.52k
        _coordinator_callback(req);
2618
6.52k
        if (!req.done) {
2619
711
            ctx->refresh_next_report_time();
2620
711
        }
2621
6.52k
    });
2622
6.62k
}
2623
2624
0
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2625
0
    size_t res = 0;
2626
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2627
    // here to traverse the vector.
2628
0
    for (const auto& task_instances : _tasks) {
2629
0
        for (const auto& task : task_instances) {
2630
0
            if (task.first->is_running()) {
2631
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2632
0
                                      << " is running, task: " << (void*)task.first.get()
2633
0
                                      << ", is_running: " << task.first->is_running();
2634
0
                *has_running_task = true;
2635
0
                return 0;
2636
0
            }
2637
2638
0
            size_t revocable_size = task.first->get_revocable_size();
2639
0
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2640
0
                res += revocable_size;
2641
0
            }
2642
0
        }
2643
0
    }
2644
0
    return res;
2645
0
}
2646
2647
0
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2648
0
    std::vector<PipelineTask*> revocable_tasks;
2649
0
    for (const auto& task_instances : _tasks) {
2650
0
        for (const auto& task : task_instances) {
2651
0
            size_t revocable_size_ = task.first->get_revocable_size();
2652
2653
0
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2654
0
                revocable_tasks.emplace_back(task.first.get());
2655
0
            }
2656
0
        }
2657
0
    }
2658
0
    return revocable_tasks;
2659
0
}
2660
2661
0
std::string PipelineFragmentContext::debug_string() {
2662
0
    std::lock_guard<std::mutex> l(_task_mutex);
2663
0
    fmt::memory_buffer debug_string_buffer;
2664
0
    fmt::format_to(debug_string_buffer,
2665
0
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2666
0
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2667
0
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2668
0
    for (size_t j = 0; j < _tasks.size(); j++) {
2669
0
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2670
0
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2671
0
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2672
0
                           _tasks[j][i].first->debug_string());
2673
0
        }
2674
0
    }
2675
2676
0
    return fmt::to_string(debug_string_buffer);
2677
0
}
2678
2679
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2680
228
PipelineFragmentContext::collect_realtime_profile() const {
2681
228
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2682
2683
    // we do not have mutex to protect pipeline_id_to_profile
2684
    // so we need to make sure this funciton is invoked after fragment context
2685
    // has already been prepared.
2686
228
    if (!_prepared) {
2687
0
        std::string msg =
2688
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2689
0
        DCHECK(false) << msg;
2690
0
        LOG_ERROR(msg);
2691
0
        return res;
2692
0
    }
2693
2694
    // Make sure first profile is fragment level profile
2695
228
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2696
228
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2697
228
    res.push_back(fragment_profile);
2698
2699
    // pipeline_id_to_profile is initialized in prepare stage
2700
332
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2701
332
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2702
332
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2703
332
        res.push_back(profile_ptr);
2704
332
    }
2705
2706
228
    return res;
2707
228
}
2708
2709
std::shared_ptr<TRuntimeProfileTree>
2710
228
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2711
    // we do not have mutex to protect pipeline_id_to_profile
2712
    // so we need to make sure this funciton is invoked after fragment context
2713
    // has already been prepared.
2714
228
    if (!_prepared) {
2715
0
        std::string msg =
2716
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2717
0
        DCHECK(false) << msg;
2718
0
        LOG_ERROR(msg);
2719
0
        return nullptr;
2720
0
    }
2721
2722
312
    for (const auto& tasks : _tasks) {
2723
542
        for (const auto& task : tasks) {
2724
542
            if (task.second->load_channel_profile() == nullptr) {
2725
0
                continue;
2726
0
            }
2727
2728
542
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2729
2730
542
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2731
542
                                                           _runtime_state->profile_level());
2732
542
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2733
542
        }
2734
312
    }
2735
2736
228
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2737
228
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2738
228
                                                      _runtime_state->profile_level());
2739
228
    return load_channel_profile;
2740
228
}
2741
2742
// Collect runtime filter IDs registered by all tasks in this PFC.
2743
// Used during recursive CTE stage transitions to know which filters to deregister
2744
// before creating the new PFC for the next recursion round.
2745
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2746
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2747
// the vector sizes do not change, and individual RuntimeState filter sets are
2748
// written only during open() which has completed by the time we reach rerun.
2749
0
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2750
0
    std::set<int> result;
2751
0
    for (const auto& _task : _tasks) {
2752
0
        for (const auto& task : _task) {
2753
0
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2754
0
            result.merge(set);
2755
0
        }
2756
0
    }
2757
0
    if (_runtime_state) {
2758
0
        auto set = _runtime_state->get_deregister_runtime_filter();
2759
0
        result.merge(set);
2760
0
    }
2761
0
    return result;
2762
0
}
2763
2764
61.6k
void PipelineFragmentContext::_release_resource() {
2765
61.6k
    std::lock_guard<std::mutex> l(_task_mutex);
2766
    // The memory released by the query end is recorded in the query mem tracker.
2767
61.6k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2768
61.6k
    auto st = _query_ctx->exec_status();
2769
135k
    for (auto& _task : _tasks) {
2770
135k
        if (!_task.empty()) {
2771
135k
            _call_back(_task.front().first->runtime_state(), &st);
2772
135k
        }
2773
135k
    }
2774
61.6k
    _tasks.clear();
2775
61.6k
    _dag.clear();
2776
61.6k
    _pip_id_to_pipeline.clear();
2777
61.6k
    _pipelines.clear();
2778
61.6k
    _sink.reset();
2779
61.6k
    _root_op.reset();
2780
61.6k
    _runtime_filter_mgr_map.clear();
2781
61.6k
    _op_id_to_shared_state.clear();
2782
61.6k
}
2783
2784
} // namespace doris