Coverage Report

Created: 2026-05-20 14:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/FrontendService.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/PlanNodes_types.h>
25
#include <pthread.h>
26
27
#include <algorithm>
28
#include <cstdlib>
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <fmt/format.h>
31
#include <thrift/Thrift.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
#include <thrift/transport/TTransportException.h>
34
35
#include <chrono> // IWYU pragma: keep
36
#include <map>
37
#include <memory>
38
#include <ostream>
39
#include <utility>
40
41
#include "cloud/config.h"
42
#include "common/cast_set.h"
43
#include "common/config.h"
44
#include "common/exception.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "exec/exchange/local_exchange_sink_operator.h"
48
#include "exec/exchange/local_exchange_source_operator.h"
49
#include "exec/exchange/local_exchanger.h"
50
#include "exec/exchange/vdata_stream_mgr.h"
51
#include "exec/operator/aggregation_sink_operator.h"
52
#include "exec/operator/aggregation_source_operator.h"
53
#include "exec/operator/analytic_sink_operator.h"
54
#include "exec/operator/analytic_source_operator.h"
55
#include "exec/operator/assert_num_rows_operator.h"
56
#include "exec/operator/blackhole_sink_operator.h"
57
#include "exec/operator/bucketed_aggregation_sink_operator.h"
58
#include "exec/operator/bucketed_aggregation_source_operator.h"
59
#include "exec/operator/cache_sink_operator.h"
60
#include "exec/operator/cache_source_operator.h"
61
#include "exec/operator/datagen_operator.h"
62
#include "exec/operator/dict_sink_operator.h"
63
#include "exec/operator/distinct_streaming_aggregation_operator.h"
64
#include "exec/operator/empty_set_operator.h"
65
#include "exec/operator/exchange_sink_operator.h"
66
#include "exec/operator/exchange_source_operator.h"
67
#include "exec/operator/file_scan_operator.h"
68
#include "exec/operator/group_commit_block_sink_operator.h"
69
#include "exec/operator/group_commit_scan_operator.h"
70
#include "exec/operator/hashjoin_build_sink.h"
71
#include "exec/operator/hashjoin_probe_operator.h"
72
#include "exec/operator/hive_table_sink_operator.h"
73
#include "exec/operator/iceberg_delete_sink_operator.h"
74
#include "exec/operator/iceberg_merge_sink_operator.h"
75
#include "exec/operator/iceberg_table_sink_operator.h"
76
#include "exec/operator/jdbc_scan_operator.h"
77
#include "exec/operator/jdbc_table_sink_operator.h"
78
#include "exec/operator/local_merge_sort_source_operator.h"
79
#include "exec/operator/materialization_opertor.h"
80
#include "exec/operator/maxcompute_table_sink_operator.h"
81
#include "exec/operator/memory_scratch_sink_operator.h"
82
#include "exec/operator/meta_scan_operator.h"
83
#include "exec/operator/multi_cast_data_stream_sink.h"
84
#include "exec/operator/multi_cast_data_stream_source.h"
85
#include "exec/operator/nested_loop_join_build_operator.h"
86
#include "exec/operator/nested_loop_join_probe_operator.h"
87
#include "exec/operator/olap_scan_operator.h"
88
#include "exec/operator/olap_table_sink_operator.h"
89
#include "exec/operator/olap_table_sink_v2_operator.h"
90
#include "exec/operator/partition_sort_sink_operator.h"
91
#include "exec/operator/partition_sort_source_operator.h"
92
#include "exec/operator/partitioned_aggregation_sink_operator.h"
93
#include "exec/operator/partitioned_aggregation_source_operator.h"
94
#include "exec/operator/partitioned_hash_join_probe_operator.h"
95
#include "exec/operator/partitioned_hash_join_sink_operator.h"
96
#include "exec/operator/rec_cte_anchor_sink_operator.h"
97
#include "exec/operator/rec_cte_scan_operator.h"
98
#include "exec/operator/rec_cte_sink_operator.h"
99
#include "exec/operator/rec_cte_source_operator.h"
100
#include "exec/operator/repeat_operator.h"
101
#include "exec/operator/result_file_sink_operator.h"
102
#include "exec/operator/result_sink_operator.h"
103
#include "exec/operator/schema_scan_operator.h"
104
#include "exec/operator/select_operator.h"
105
#include "exec/operator/set_probe_sink_operator.h"
106
#include "exec/operator/set_sink_operator.h"
107
#include "exec/operator/set_source_operator.h"
108
#include "exec/operator/sort_sink_operator.h"
109
#include "exec/operator/sort_source_operator.h"
110
#include "exec/operator/spill_iceberg_table_sink_operator.h"
111
#include "exec/operator/spill_sort_sink_operator.h"
112
#include "exec/operator/spill_sort_source_operator.h"
113
#include "exec/operator/streaming_aggregation_operator.h"
114
#include "exec/operator/table_function_operator.h"
115
#include "exec/operator/tvf_table_sink_operator.h"
116
#include "exec/operator/union_sink_operator.h"
117
#include "exec/operator/union_source_operator.h"
118
#include "exec/pipeline/dependency.h"
119
#include "exec/pipeline/pipeline_task.h"
120
#include "exec/pipeline/task_scheduler.h"
121
#include "exec/runtime_filter/runtime_filter_mgr.h"
122
#include "exec/sort/topn_sorter.h"
123
#include "exec/spill/spill_file.h"
124
#include "io/fs/stream_load_pipe.h"
125
#include "load/stream_load/new_load_stream_mgr.h"
126
#include "runtime/exec_env.h"
127
#include "runtime/fragment_mgr.h"
128
#include "runtime/result_buffer_mgr.h"
129
#include "runtime/runtime_state.h"
130
#include "runtime/thread_context.h"
131
#include "service/backend_options.h"
132
#include "util/client_cache.h"
133
#include "util/countdown_latch.h"
134
#include "util/debug_util.h"
135
#include "util/network_util.h"
136
#include "util/uid_util.h"
137
138
namespace doris {
139
PipelineFragmentContext::PipelineFragmentContext(
140
        TUniqueId query_id, const TPipelineFragmentParams& request,
141
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
142
        const std::function<void(RuntimeState*, Status*)>& call_back)
143
124k
        : _query_id(std::move(query_id)),
144
124k
          _fragment_id(request.fragment_id),
145
124k
          _exec_env(exec_env),
146
124k
          _query_ctx(std::move(query_ctx)),
147
124k
          _call_back(call_back),
148
124k
          _is_report_on_cancel(true),
149
124k
          _params(request),
150
124k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
151
124k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
152
124k
                                                               : false) {
153
124k
    _fragment_watcher.start();
154
124k
}
155
156
124k
PipelineFragmentContext::~PipelineFragmentContext() {
157
124k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
158
124k
            .tag("query_id", print_id(_query_id))
159
124k
            .tag("fragment_id", _fragment_id);
160
124k
    _release_resource();
161
124k
    {
162
        // The memory released by the query end is recorded in the query mem tracker.
163
124k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
164
124k
        _runtime_state.reset();
165
124k
        _query_ctx.reset();
166
124k
    }
167
124k
}
168
169
0
bool PipelineFragmentContext::is_timeout(timespec now) const {
170
0
    if (_timeout <= 0) {
171
0
        return false;
172
0
    }
173
0
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
174
0
}
175
176
// notify_close() transitions the PFC from "waiting for external close notification" to
177
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
178
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
179
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
180
923
bool PipelineFragmentContext::notify_close() {
181
923
    bool all_closed = false;
182
923
    bool need_remove = false;
183
923
    {
184
923
        std::lock_guard<std::mutex> l(_task_mutex);
185
923
        if (_closed_tasks >= _total_tasks) {
186
28
            if (_need_notify_close) {
187
                // Fragment was cancelled and waiting for notify to close.
188
                // Record that we need to remove from fragment mgr, but do it
189
                // after releasing _task_mutex to avoid ABBA deadlock with
190
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
191
                // first, then _task_mutex via debug_string()).
192
0
                need_remove = true;
193
0
            }
194
28
            all_closed = true;
195
28
        }
196
        // make fragment release by self after cancel
197
923
        _need_notify_close = false;
198
923
    }
199
923
    if (need_remove) {
200
0
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
201
0
    }
202
923
    return all_closed;
203
923
}
204
205
// Must not add lock in this method. Because it will call query ctx cancel. And
206
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
207
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
208
// There maybe dead lock.
209
921
void PipelineFragmentContext::cancel(const Status reason) {
210
921
    LOG_INFO("PipelineFragmentContext::cancel")
211
921
            .tag("query_id", print_id(_query_id))
212
921
            .tag("fragment_id", _fragment_id)
213
921
            .tag("reason", reason.to_string());
214
921
    if (notify_close()) {
215
28
        return;
216
28
    }
217
    // Timeout is a special error code, we need print current stack to debug timeout issue.
218
893
    if (reason.is<ErrorCode::TIMEOUT>()) {
219
0
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
220
0
                                   debug_string());
221
0
        LOG_LONG_STRING(WARNING, dbg_str);
222
0
    }
223
224
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
225
893
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
226
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
227
0
                    debug_string());
228
0
    }
229
230
895
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
231
0
        print_profile("cancel pipeline, reason: " + reason.to_string());
232
0
    }
233
234
893
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
235
0
        _query_ctx->set_load_error_url(error_url);
236
0
    }
237
238
893
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
239
0
        _query_ctx->set_first_error_msg(first_error_msg);
240
0
    }
241
242
893
    _query_ctx->cancel(reason, _fragment_id);
243
893
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
244
218
        _is_report_on_cancel = false;
245
675
    } else {
246
3.10k
        for (auto& id : _fragment_instance_ids) {
247
3.10k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
248
3.10k
        }
249
675
    }
250
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
251
    // For stream load the fragment's query_id == load id, it is set in FE.
252
893
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
253
893
    if (stream_load_ctx != nullptr) {
254
0
        stream_load_ctx->pipe->cancel(reason.to_string());
255
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
256
        // We need to set the error URL at this point to ensure error information is properly
257
        // propagated to the client.
258
0
        stream_load_ctx->error_url = get_load_error_url();
259
0
        stream_load_ctx->first_error_msg = get_first_error_msg();
260
0
    }
261
262
3.40k
    for (auto& tasks : _tasks) {
263
8.74k
        for (auto& task : tasks) {
264
8.74k
            task.first->unblock_all_dependencies();
265
8.74k
        }
266
3.40k
    }
267
893
}
268
269
200k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
270
200k
    PipelineId id = _next_pipeline_id++;
271
200k
    auto pipeline = std::make_shared<Pipeline>(
272
200k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
273
200k
            parent ? parent->num_tasks() : _num_instances);
274
200k
    if (idx >= 0) {
275
324
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
276
200k
    } else {
277
200k
        _pipelines.emplace_back(pipeline);
278
200k
    }
279
200k
    if (parent) {
280
67.5k
        parent->set_children(pipeline);
281
67.5k
    }
282
200k
    return pipeline;
283
200k
}
284
285
124k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
286
124k
    {
287
124k
        SCOPED_TIMER(_build_pipelines_timer);
288
        // 2. Build pipelines with operators in this fragment.
289
124k
        auto root_pipeline = add_pipeline();
290
124k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
291
124k
                                         &_root_op, root_pipeline));
292
293
        // Propagate _num_instances from LOCAL_EXCHANGE pipelines to ancestor pipelines
294
        // that inherited reduced num_tasks from a serial operator.
295
124k
        _propagate_local_exchange_num_tasks();
296
297
        // Create deferred local exchangers now that all pipelines have final num_tasks.
298
124k
        RETURN_IF_ERROR(_create_deferred_local_exchangers());
299
300
        // Raise num_tasks for pipelines whose serial non-scan operators (e.g.,
301
        // UNPARTITIONED Exchange) reduced num_tasks below _num_instances.
302
        // Without this, fragment instances 1+ have no task for these pipelines
303
        // and downstream operators fail with "must set shared state".
304
        //
305
        // This applies to ALL pipelines (not just deferred exchanger upstreams):
306
        // fragments with UNION/INTERSECT/EXCEPT + serial Exchange in child
307
        // pipelines also need the raise, even without FE-planned local exchange.
308
        //
309
        // Exception: serial scan sources (pooling scan) keep num_tasks=1 — the
310
        // PassthroughExchanger(1, N) handles the fan-out correctly.
311
        // NOTE: Do NOT raise pipelines whose source is a serial operator
312
        // (Exchange or scan) — they legitimately have 1 task, and raising
313
        // them causes crashes (e.g., 4 Exchange tasks but only 1 receives
314
        // data).  The correct fix for shared state injection across
315
        // instances is handled by the FE: it inserts local exchange nodes
316
        // between serial operators and their downstream consumers, creating
317
        // proper pipeline boundaries with _num_instances tasks.
318
319
        // 3. Create sink operator
320
124k
        if (!_params.fragment.__isset.output_sink) {
321
0
            return Status::InternalError("No output sink in this fragment!");
322
0
        }
323
124k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
324
124k
                                          _params.fragment.output_exprs, _params,
325
124k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
326
124k
                                          *_desc_tbl, root_pipeline->id()));
327
124k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
328
124k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
329
330
200k
        for (PipelinePtr& pipeline : _pipelines) {
331
200k
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
332
200k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
333
200k
        }
334
124k
    }
335
    // 4. Build local exchanger
336
124k
    if (_runtime_state->plan_local_shuffle()) {
337
73.2k
        SCOPED_TIMER(_plan_local_exchanger_timer);
338
73.2k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
339
73.2k
                                             _params.bucket_seq_to_instance_idx,
340
73.2k
                                             _params.shuffle_idx_to_instance_idx));
341
73.2k
    }
342
343
    // 5. Initialize global states in pipelines.
344
200k
    for (PipelinePtr& pipeline : _pipelines) {
345
200k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
346
200k
        pipeline->children().clear();
347
200k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
348
200k
    }
349
350
124k
    {
351
124k
        SCOPED_TIMER(_build_tasks_timer);
352
        // 6. Build pipeline tasks and initialize local state.
353
124k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
354
124k
    }
355
356
124k
    return Status::OK();
357
124k
}
358
359
124k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
360
124k
    if (_prepared) {
361
0
        return Status::InternalError("Already prepared");
362
0
    }
363
124k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
364
124k
        _timeout = _params.query_options.execution_timeout;
365
124k
    }
366
367
124k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
368
124k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
369
124k
    SCOPED_TIMER(_prepare_timer);
370
124k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
371
124k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
372
124k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
373
124k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
374
124k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
375
124k
    {
376
124k
        SCOPED_TIMER(_init_context_timer);
377
124k
        cast_set(_num_instances, _params.local_params.size());
378
124k
        _total_instances =
379
124k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
380
381
124k
        auto* fragment_context = this;
382
383
124k
        if (_params.query_options.__isset.is_report_success) {
384
124k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
385
124k
        }
386
387
        // 1. Set up the global runtime state.
388
124k
        _runtime_state = RuntimeState::create_unique(
389
124k
                _params.query_id, _params.fragment_id, _params.query_options,
390
124k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
391
124k
        _runtime_state->set_task_execution_context(shared_from_this());
392
124k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
393
124k
        if (_params.__isset.backend_id) {
394
123k
            _runtime_state->set_backend_id(_params.backend_id);
395
123k
        }
396
124k
        if (_params.__isset.import_label) {
397
0
            _runtime_state->set_import_label(_params.import_label);
398
0
        }
399
124k
        if (_params.__isset.db_name) {
400
0
            _runtime_state->set_db_name(_params.db_name);
401
0
        }
402
124k
        if (_params.__isset.load_job_id) {
403
0
            _runtime_state->set_load_job_id(_params.load_job_id);
404
0
        }
405
406
124k
        if (_params.is_simplified_param) {
407
43.6k
            _desc_tbl = _query_ctx->desc_tbl;
408
81.0k
        } else {
409
81.0k
            DCHECK(_params.__isset.desc_tbl);
410
81.0k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
411
81.0k
                                                  &_desc_tbl));
412
81.0k
        }
413
124k
        _runtime_state->set_desc_tbl(_desc_tbl);
414
124k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
415
124k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
416
124k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
417
124k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
418
419
        // init fragment_instance_ids
420
124k
        const auto target_size = _params.local_params.size();
421
124k
        _fragment_instance_ids.resize(target_size);
422
376k
        for (size_t i = 0; i < _params.local_params.size(); i++) {
423
251k
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
424
251k
            _fragment_instance_ids[i] = fragment_instance_id;
425
251k
        }
426
124k
    }
427
428
124k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
429
430
124k
    _init_next_report_time();
431
432
124k
    _prepared = true;
433
124k
    return Status::OK();
434
124k
}
435
436
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
437
        int instance_idx,
438
251k
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
439
251k
    const auto& local_params = _params.local_params[instance_idx];
440
251k
    auto fragment_instance_id = local_params.fragment_instance_id;
441
251k
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
442
251k
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
443
251k
    auto get_shared_state = [&](PipelinePtr pipeline)
444
251k
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
445
451k
                                       std::vector<std::shared_ptr<Dependency>>>> {
446
451k
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
447
451k
                                std::vector<std::shared_ptr<Dependency>>>>
448
451k
                shared_state_map;
449
475k
        for (auto& op : pipeline->operators()) {
450
475k
            auto source_id = op->operator_id();
451
475k
            if (auto iter = _op_id_to_shared_state.find(source_id);
452
475k
                iter != _op_id_to_shared_state.end()) {
453
142k
                shared_state_map.insert({source_id, iter->second});
454
142k
            }
455
475k
        }
456
456k
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
457
456k
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
458
456k
                iter != _op_id_to_shared_state.end()) {
459
42.3k
                shared_state_map.insert({sink_to_source_id, iter->second});
460
42.3k
            }
461
456k
        }
462
451k
        return shared_state_map;
463
451k
    };
464
465
802k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
466
551k
        auto& pipeline = _pipelines[pip_idx];
467
551k
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
468
451k
            auto task_runtime_state = RuntimeState::create_unique(
469
451k
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
470
451k
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
471
451k
            {
472
                // Initialize runtime state for this task
473
451k
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
474
475
451k
                task_runtime_state->set_task_execution_context(shared_from_this());
476
451k
                task_runtime_state->set_be_number(local_params.backend_num);
477
478
451k
                if (_params.__isset.backend_id) {
479
450k
                    task_runtime_state->set_backend_id(_params.backend_id);
480
450k
                }
481
451k
                if (_params.__isset.import_label) {
482
0
                    task_runtime_state->set_import_label(_params.import_label);
483
0
                }
484
451k
                if (_params.__isset.db_name) {
485
0
                    task_runtime_state->set_db_name(_params.db_name);
486
0
                }
487
451k
                if (_params.__isset.load_job_id) {
488
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
489
0
                }
490
451k
                if (_params.__isset.wal_id) {
491
0
                    task_runtime_state->set_wal_id(_params.wal_id);
492
0
                }
493
451k
                if (_params.__isset.content_length) {
494
0
                    task_runtime_state->set_content_length(_params.content_length);
495
0
                }
496
497
451k
                task_runtime_state->set_desc_tbl(_desc_tbl);
498
451k
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
499
451k
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
500
451k
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
501
451k
                task_runtime_state->set_max_operator_id(max_operator_id());
502
451k
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
503
451k
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
504
451k
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
505
506
451k
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
507
451k
            }
508
451k
            auto cur_task_id = _total_tasks++;
509
451k
            task_runtime_state->set_task_id(cur_task_id);
510
451k
            task_runtime_state->set_task_num(pipeline->num_tasks());
511
451k
            auto task = std::make_shared<PipelineTask>(
512
451k
                    pipeline, cur_task_id, task_runtime_state.get(),
513
451k
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
514
451k
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
515
451k
                    instance_idx);
516
451k
            pipeline->incr_created_tasks(instance_idx, task.get());
517
451k
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
518
451k
            _tasks[instance_idx].emplace_back(
519
451k
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
520
451k
                            std::move(task), std::move(task_runtime_state)});
521
451k
        }
522
551k
    }
523
524
    /**
525
         * Build DAG for pipeline tasks.
526
         * For example, we have
527
         *
528
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
529
         *            \                      /
530
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
531
         *                 \                          /
532
         *               JoinProbeOperator2 (Pipeline1)
533
         *
534
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
535
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
536
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
537
         *
538
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
539
         * and JoinProbeOperator2.
540
         */
541
551k
    for (auto& _pipeline : _pipelines) {
542
551k
        if (pipeline_id_to_task.contains(_pipeline->id())) {
543
451k
            auto* task = pipeline_id_to_task[_pipeline->id()];
544
451k
            DCHECK(task != nullptr);
545
546
            // If this task has upstream dependency, then inject it into this task.
547
451k
            if (_dag.contains(_pipeline->id())) {
548
294k
                auto& deps = _dag[_pipeline->id()];
549
302k
                for (auto& dep : deps) {
550
302k
                    if (pipeline_id_to_task.contains(dep)) {
551
200k
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
552
200k
                        if (ss) {
553
157k
                            task->inject_shared_state(ss);
554
157k
                        } else {
555
43.3k
                            pipeline_id_to_task[dep]->inject_shared_state(
556
43.3k
                                    task->get_source_shared_state());
557
43.3k
                        }
558
200k
                    }
559
302k
                }
560
294k
            }
561
451k
        }
562
551k
    }
563
802k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
564
551k
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
565
451k
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
566
451k
            DCHECK(pipeline_id_to_profile[pip_idx]);
567
451k
            std::vector<TScanRangeParams> scan_ranges;
568
451k
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
569
451k
            if (local_params.per_node_scan_ranges.contains(node_id)) {
570
96.5k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
571
96.5k
            }
572
451k
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
573
451k
                                                             _params.fragment.output_sink));
574
451k
        }
575
551k
    }
576
251k
    {
577
251k
        std::lock_guard<std::mutex> l(_state_map_lock);
578
251k
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
579
251k
    }
580
251k
    return Status::OK();
581
251k
}
582
583
124k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
584
124k
    _total_tasks = 0;
585
124k
    _closed_tasks = 0;
586
124k
    const auto target_size = _params.local_params.size();
587
124k
    _tasks.resize(target_size);
588
124k
    _runtime_filter_mgr_map.resize(target_size);
589
325k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
590
200k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
591
200k
    }
592
124k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
593
594
124k
    if (target_size > 1 &&
595
124k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
596
18.0k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
597
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
598
2
        std::vector<Status> prepare_status(target_size);
599
2
        int submitted_tasks = 0;
600
2
        Status submit_status;
601
2
        CountDownLatch latch((int)target_size);
602
6
        for (int i = 0; i < target_size; i++) {
603
4
            submit_status = thread_pool->submit_func([&, i]() {
604
4
                SCOPED_ATTACH_TASK(_query_ctx.get());
605
4
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
606
4
                latch.count_down();
607
4
            });
608
4
            if (LIKELY(submit_status.ok())) {
609
4
                submitted_tasks++;
610
4
            } else {
611
0
                break;
612
0
            }
613
4
        }
614
2
        latch.arrive_and_wait(target_size - submitted_tasks);
615
2
        if (UNLIKELY(!submit_status.ok())) {
616
0
            return submit_status;
617
0
        }
618
6
        for (int i = 0; i < submitted_tasks; i++) {
619
4
            if (!prepare_status[i].ok()) {
620
0
                return prepare_status[i];
621
0
            }
622
4
        }
623
124k
    } else {
624
376k
        for (int i = 0; i < target_size; i++) {
625
251k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
626
251k
        }
627
124k
    }
628
124k
    _pipeline_parent_map.clear();
629
124k
    _op_id_to_shared_state.clear();
630
    // Record task cardinality once when this fragment context finishes task initialization.
631
124k
    _query_ctx->add_total_task_num(_total_tasks.load(std::memory_order_relaxed));
632
633
124k
    return Status::OK();
634
124k
}
635
636
124k
void PipelineFragmentContext::_init_next_report_time() {
637
124k
    auto interval_s = config::pipeline_status_report_interval;
638
124k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
639
18.4E
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
640
10.9k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
641
        // We don't want to wait longer than it takes to run the entire fragment.
642
10.9k
        _previous_report_time =
643
10.9k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
644
10.9k
        _disable_period_report = false;
645
10.9k
    }
646
124k
}
647
648
1.43k
void PipelineFragmentContext::refresh_next_report_time() {
649
1.43k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
650
1.43k
    DCHECK(disable == true);
651
1.43k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
652
1.43k
    _disable_period_report.compare_exchange_strong(disable, false);
653
1.43k
}
654
655
1.82M
void PipelineFragmentContext::trigger_report_if_necessary() {
656
1.82M
    if (!_is_report_success) {
657
1.63M
        return;
658
1.63M
    }
659
193k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
660
193k
    if (disable) {
661
2.85k
        return;
662
2.85k
    }
663
190k
    int32_t interval_s = config::pipeline_status_report_interval;
664
190k
    if (interval_s <= 0) {
665
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
666
0
                        "trigger "
667
0
                        "report.";
668
0
    }
669
190k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
670
190k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
671
190k
    if (MonotonicNanos() > next_report_time) {
672
1.44k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
673
1.44k
                                                            std::memory_order_acq_rel)) {
674
8
            return;
675
8
        }
676
1.43k
        if (VLOG_FILE_IS_ON) {
677
0
            VLOG_FILE << "Reporting "
678
0
                      << "profile for query_id " << print_id(_query_id)
679
0
                      << ", fragment id: " << _fragment_id;
680
681
0
            std::stringstream ss;
682
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
683
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
684
0
            if (_runtime_state->load_channel_profile()) {
685
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
686
0
            }
687
688
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
689
0
                      << " profile:\n"
690
0
                      << ss.str();
691
0
        }
692
1.43k
        auto st = send_report(false);
693
1.43k
        if (!st.ok()) {
694
0
            disable = true;
695
0
            _disable_period_report.compare_exchange_strong(disable, false,
696
0
                                                           std::memory_order_acq_rel);
697
0
        }
698
1.43k
    }
699
190k
}
700
701
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
702
124k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
703
124k
    if (_params.fragment.plan.nodes.empty()) {
704
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
705
0
    }
706
707
124k
    int node_idx = 0;
708
709
124k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
710
124k
                                        &node_idx, root, cur_pipe, 0, false, false));
711
712
124k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
713
0
        return Status::InternalError(
714
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
715
0
    }
716
124k
    return Status::OK();
717
124k
}
718
719
124k
Status PipelineFragmentContext::_create_deferred_local_exchangers() {
720
124k
    for (auto& info : _deferred_exchangers) {
721
        // DANGER ZONE — do not "fix" this line without reading the history.
722
        //
723
        // sender_count seeds Exchanger::_running_sink_operators, which the source side
724
        // waits to reach 0 via sub_running_sink_operators on each sink LocalState close.
725
        // The correct value is THIS pipeline-instance's sink task count, which is exactly
726
        // info.upstream_pipe->num_tasks() — one PipelineTask per task, one close per task.
727
        //
728
        // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to mirror the
729
        //   BE-planned path in _add_local_exchange_impl (~line 1023).  THIS BREAKS the
730
        //   common FE-planned shape of `serial scan → LE(PT) → ...`: upstream_pipe
731
        //   genuinely has num_tasks=1, only 1 close arrives, but seed becomes
732
        //   _num_instances so _running_sink_operators never reaches 0 — downstream
733
        //   sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
734
        //   mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING; build 949402
735
        //   regressed exactly this way).  BE-planned mode uses max() because its
736
        //   `cur_pipe` is the source-side pipeline (always raised to _num_instances by
737
        //   add_pipeline) — not analogous to our `upstream_pipe` here, which is the
738
        //   sink-side pipeline that may legitimately stay at 1 for serial sources.
739
        //
740
        // Tempting wrong fix #2: multiply by _num_instances on the theory shared_state
741
        //   is shared across all instances.  Same hang — each fragment-instance
742
        //   PipelineFragmentContext has its OWN _op_id_to_shared_state map, so the
743
        //   exchanger is per-instance, not per-BE.  num_tasks() is already the right
744
        //   close-count for one instance.
745
        //
746
        // If a hang shows up with `_running_sink_operators < 0`, the bug is upstream:
747
        // _propagate_local_exchange_num_tasks left num_tasks too low (or too high) for
748
        // this fragment shape.  Fix THAT pass, not this seed value.
749
16.7k
        const int sender_count = info.upstream_pipe->num_tasks();
750
16.7k
        switch (info.partition_type) {
751
666
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
752
666
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
753
666
            info.shared_state->exchanger = ShuffleExchanger::create_unique(
754
666
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit,
755
666
                    info.partition_type);
756
666
            break;
757
8
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
758
8
            info.shared_state->exchanger = BucketShuffleExchanger::create_unique(
759
8
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit);
760
8
            break;
761
15.3k
        case TLocalPartitionType::PASSTHROUGH:
762
15.3k
            info.shared_state->exchanger = PassthroughExchanger::create_unique(
763
15.3k
                    sender_count, _num_instances, info.free_blocks_limit);
764
15.3k
            break;
765
58
        case TLocalPartitionType::BROADCAST:
766
58
            info.shared_state->exchanger = BroadcastExchanger::create_unique(
767
58
                    sender_count, _num_instances, info.free_blocks_limit);
768
58
            break;
769
666
        case TLocalPartitionType::PASS_TO_ONE:
770
666
            if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
771
666
                info.shared_state->exchanger = PassToOneExchanger::create_unique(
772
666
                        sender_count, _num_instances, info.free_blocks_limit);
773
666
            } else {
774
0
                info.shared_state->exchanger = BroadcastExchanger::create_unique(
775
0
                        sender_count, _num_instances, info.free_blocks_limit);
776
0
            }
777
666
            break;
778
58
        case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
779
58
            info.shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
780
58
                    sender_count, _num_instances, info.free_blocks_limit);
781
58
            break;
782
0
        case TLocalPartitionType::NOOP:
783
0
        case TLocalPartitionType::LOCAL_MERGE_SORT:
784
            // FE-planned LocalExchangeNode currently never emits NOOP or LOCAL_MERGE_SORT
785
            // through the deferred-exchanger path.  NOOP means "no exchange needed" and
786
            // is filtered out before reaching here; LOCAL_MERGE_SORT is planned by the
787
            // legacy BE path only.  Crash in debug to surface the protocol violation if
788
            // that ever changes; return an error in release to avoid silently corrupting
789
            // execution.
790
0
            DCHECK(false) << "FE-planned local exchange should not emit partition_type="
791
0
                          << static_cast<int>(info.partition_type);
792
0
            return Status::InternalError("FE-planned local exchange emitted unsupported type: " +
793
0
                                         std::to_string(static_cast<int>(info.partition_type)));
794
0
        default:
795
            // New TLocalPartitionType added on FE side without a BE handler here.
796
0
            DCHECK(false) << "Unhandled TLocalPartitionType in deferred exchangers: "
797
0
                          << static_cast<int>(info.partition_type);
798
0
            return Status::InternalError("Unsupported FE-planned local exchange type: " +
799
0
                                         std::to_string(static_cast<int>(info.partition_type)));
800
16.7k
        }
801
16.7k
    }
802
124k
    _deferred_exchangers.clear();
803
124k
    return Status::OK();
804
124k
}
805
806
124k
void PipelineFragmentContext::_propagate_local_exchange_num_tasks() {
807
    // Only runs when FE has planned local exchanges and BE deferred their construction.
808
    // In legacy mode (enable_local_shuffle_planner=false) BE plans LE itself via
809
    // _plan_local_exchange and _deferred_exchangers stays empty — the legacy path
810
    // already gets its num_tasks right at construction time, so the propagate passes
811
    // would be no-ops and are skipped.  This is a transitional design: once the FE
812
    // planner is the only planner, the propagation logic itself should degrade into
813
    // a pure assertion that the FE plan already wired the right num_tasks everywhere.
814
124k
    if (_deferred_exchangers.empty()) {
815
109k
        return;
816
109k
    }
817
    // Fix num_tasks mismatches between paired pipelines created by pipeline-splitting
818
    // operators (AGG, SORT, JOIN).  These pairs share state via inject_shared_state and
819
    // must have consistent num_tasks; otherwise instance 1+ tasks access null shared_state.
820
    //
821
    // Pass 1 (upward): when FE inserts LOCAL_EXCHANGE_NODE below a splitting operator,
822
    // the LE pipeline gets _num_instances but ancestor pipelines still have the reduced
823
    // count.  Raise them to _num_instances (skip serial source operators).
824
    //
825
    // Pass 2 (downward): when a serial Exchange feeds a splitting operator (e.g., scalar
826
    // merge-finalize AGG), the sink pipeline has 1 task but the source pipeline may have
827
    // been raised (by LE or by inheriting _num_instances from add_pipeline).  Lower it
828
    // to match, unless it contains LocalExchangeSource (deliberately raised by LE).
829
15.3k
    std::unordered_map<PipelineId, PipelinePtr> id_to_pipe;
830
45.5k
    for (auto& p : _pipelines) {
831
45.5k
        id_to_pipe[p->id()] = p;
832
45.5k
    }
833
834
    // Pass 1 (upward): raise downstream pipelines to _num_instances when any upstream
835
    // dependency was already raised by LOCAL_EXCHANGE.
836
15.3k
    bool changed = true;
837
30.6k
    while (changed) {
838
15.3k
        changed = false;
839
29.4k
        for (const auto& [downstream_id, upstream_ids] : _dag) {
840
29.4k
            auto dit = id_to_pipe.find(downstream_id);
841
29.4k
            if (dit == id_to_pipe.end()) {
842
0
                continue;
843
0
            }
844
29.4k
            auto& downstream = dit->second;
845
29.4k
            if (downstream->num_tasks() >= _num_instances) {
846
29.4k
                continue;
847
29.4k
            }
848
24
            if (!downstream->operators().empty() &&
849
24
                downstream->operators().front()->is_serial_operator()) {
850
24
                continue;
851
24
            }
852
0
            for (auto upstream_id : upstream_ids) {
853
0
                auto uit = id_to_pipe.find(upstream_id);
854
0
                if (uit != id_to_pipe.end() && uit->second->num_tasks() >= _num_instances) {
855
0
                    downstream->set_num_tasks(_num_instances);
856
0
                    changed = true;
857
0
                    break;
858
0
                }
859
0
            }
860
0
        }
861
15.3k
    }
862
863
    // Pass 2 (downward): when a pipeline-splitting operator (AGG, SORT, JOIN) creates
864
    // paired pipelines (source + sink), they share state via inject_shared_state and must
865
    // have consistent num_tasks.  If the sink pipeline has a serial source (Exchange) with
866
    // num_tasks < _num_instances, but the source pipeline was raised (by inheriting from
867
    // an LE-fixed parent during add_pipeline), lower the source pipeline to match.
868
    // Skip pipelines containing LocalExchangeSource — they were deliberately raised.
869
    //
870
    // Example: serial Exchange → AGG(merge finalize) → LOCAL_EXCHANGE → NLJ
871
    //   P_sink  = [Exchange, AGGSink]   num_tasks=1  (serial)
872
    //   P_source = [AGGSource]          num_tasks=_num_instances (inherited)
873
    //   → lower P_source to 1; the LE above fans out the 1 result to N NLJ tasks.
874
15.3k
    changed = true;
875
30.6k
    while (changed) {
876
15.3k
        changed = false;
877
29.4k
        for (const auto& [downstream_id, upstream_ids] : _dag) {
878
29.4k
            auto dit = id_to_pipe.find(downstream_id);
879
29.4k
            if (dit == id_to_pipe.end()) {
880
0
                continue;
881
0
            }
882
29.4k
            auto& downstream = dit->second;
883
29.4k
            if (downstream->num_tasks() <= 1) {
884
24
                continue;
885
24
            }
886
            // Don't lower pipelines that contain LocalExchangeSource — they were
887
            // deliberately raised to _num_instances by LOCAL_EXCHANGE processing.
888
29.4k
            bool has_le_source = false;
889
29.4k
            for (auto& op : downstream->operators()) {
890
29.4k
                if (dynamic_cast<LocalExchangeSourceOperatorX*>(op.get())) {
891
16.7k
                    has_le_source = true;
892
16.7k
                    break;
893
16.7k
                }
894
29.4k
            }
895
29.4k
            if (has_le_source) {
896
16.7k
                continue;
897
16.7k
            }
898
12.7k
            for (auto upstream_id : upstream_ids) {
899
12.7k
                auto uit = id_to_pipe.find(upstream_id);
900
12.7k
                if (uit != id_to_pipe.end() && uit->second->num_tasks() < downstream->num_tasks() &&
901
12.7k
                    !uit->second->operators().empty() &&
902
12.7k
                    uit->second->operators().front()->is_serial_operator()) {
903
0
                    downstream->set_num_tasks(uit->second->num_tasks());
904
0
                    changed = true;
905
0
                    break;
906
0
                }
907
12.7k
            }
908
12.6k
        }
909
15.3k
    }
910
15.3k
}
911
912
Status PipelineFragmentContext::_create_tree_helper(
913
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
914
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
915
205k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
916
    // propagate error case
917
205k
    if (*node_idx >= tnodes.size()) {
918
0
        return Status::InternalError(
919
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
920
0
                *node_idx, tnodes.size());
921
0
    }
922
205k
    const TPlanNode& tnode = tnodes[*node_idx];
923
924
205k
    int num_children = tnodes[*node_idx].num_children;
925
205k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
926
205k
    bool current_require_bucket_distribution = require_bucket_distribution;
927
    // TODO: Create CacheOperator is confused now
928
205k
    OperatorPtr op = nullptr;
929
205k
    OperatorPtr cache_op = nullptr;
930
205k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
931
205k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
932
205k
                                     followed_by_shuffled_operator,
933
205k
                                     current_require_bucket_distribution, cache_op));
934
    // Initialization must be done here. For example, group by expressions in agg will be used to
935
    // decide if a local shuffle should be planed, so it must be initialized here.
936
205k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
937
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
938
205k
    if (parent != nullptr) {
939
        // add to parent's child(s)
940
80.8k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
941
124k
    } else {
942
124k
        *root = op;
943
124k
    }
944
    /**
945
     * `TLocalPartitionType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
946
     *
947
     * For plan:
948
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
949
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
950
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
951
     *
952
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
953
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
954
     */
955
205k
    auto required_data_distribution =
956
205k
            cur_pipe->operators().empty()
957
205k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
958
205k
                    : op->required_data_distribution(_runtime_state.get());
959
205k
    current_followed_by_shuffled_operator =
960
205k
            ((followed_by_shuffled_operator ||
961
205k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
962
201k
                                             : op->is_shuffled_operator())) &&
963
205k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
964
205k
            (followed_by_shuffled_operator &&
965
198k
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
966
967
205k
    current_require_bucket_distribution =
968
205k
            ((require_bucket_distribution ||
969
205k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
970
202k
                                             : op->is_colocated_operator())) &&
971
205k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
972
205k
            (require_bucket_distribution &&
973
198k
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
974
975
205k
    if (num_children == 0) {
976
131k
        _use_serial_source = op->is_serial_operator();
977
131k
    }
978
    // rely on that tnodes is preorder of the plan
979
286k
    for (int i = 0; i < num_children; i++) {
980
80.8k
        ++*node_idx;
981
80.8k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
982
80.8k
                                            current_followed_by_shuffled_operator,
983
80.8k
                                            current_require_bucket_distribution));
984
985
        // we are expecting a child, but have used all nodes
986
        // this means we have been given a bad tree and must fail
987
80.8k
        if (*node_idx >= tnodes.size()) {
988
0
            return Status::InternalError(
989
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
990
0
                    "nodes: {}",
991
0
                    *node_idx, tnodes.size());
992
0
        }
993
80.8k
    }
994
995
205k
    return Status::OK();
996
205k
}
997
998
void PipelineFragmentContext::_inherit_pipeline_properties(
999
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
1000
324
        PipelinePtr pipe_with_sink) {
1001
324
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
1002
324
    pipe_with_source->set_num_tasks(_num_instances);
1003
324
    pipe_with_source->set_data_distribution(data_distribution);
1004
324
}
1005
1006
Status PipelineFragmentContext::_add_local_exchange_impl(
1007
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
1008
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
1009
        const std::map<int, int>& bucket_seq_to_instance_idx,
1010
324
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1011
324
    auto& operators = cur_pipe->operators();
1012
324
    const auto downstream_pipeline_id = cur_pipe->id();
1013
324
    auto local_exchange_id = next_operator_id();
1014
    // 1. Create a new pipeline with local exchange sink.
1015
324
    DataSinkOperatorPtr sink;
1016
324
    auto sink_id = next_sink_operator_id();
1017
1018
    /**
1019
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
1020
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
1021
     */
1022
324
    const bool followed_by_shuffled_operator =
1023
324
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
1024
324
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
1025
324
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
1026
324
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
1027
324
                                         followed_by_shuffled_operator && !_use_serial_source;
1028
324
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
1029
324
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
1030
324
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
1031
324
    if (bucket_seq_to_instance_idx.empty() &&
1032
324
        data_distribution.distribution_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
1033
0
        data_distribution.distribution_type =
1034
0
                use_global_hash_shuffle ? TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE
1035
0
                                        : TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1036
0
    }
1037
324
    if (!use_global_hash_shuffle &&
1038
324
        data_distribution.distribution_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
1039
0
        data_distribution.distribution_type = TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1040
0
    }
1041
324
    RETURN_IF_ERROR(new_pip->set_sink(sink));
1042
324
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
1043
324
                                          num_buckets, shuffle_idx_to_instance_idx));
1044
1045
    // 2. Create and initialize LocalExchangeSharedState.
1046
324
    std::shared_ptr<LocalExchangeSharedState> shared_state =
1047
324
            LocalExchangeSharedState::create_shared(_num_instances);
1048
324
    switch (data_distribution.distribution_type) {
1049
0
    case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
1050
0
    case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
1051
0
        shared_state->exchanger = ShuffleExchanger::create_unique(
1052
0
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1053
0
                use_global_hash_shuffle ? _total_instances : _num_instances,
1054
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1055
0
                        ? cast_set<int>(
1056
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1057
0
                        : 0,
1058
0
                data_distribution.distribution_type);
1059
0
        break;
1060
0
    case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
1061
0
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
1062
0
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
1063
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1064
0
                        ? cast_set<int>(
1065
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1066
0
                        : 0);
1067
0
        break;
1068
324
    case TLocalPartitionType::PASSTHROUGH:
1069
324
        shared_state->exchanger = PassthroughExchanger::create_unique(
1070
324
                cur_pipe->num_tasks(), _num_instances,
1071
324
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1072
324
                        ? cast_set<int>(
1073
324
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1074
324
                        : 0);
1075
324
        break;
1076
0
    case TLocalPartitionType::BROADCAST:
1077
0
        shared_state->exchanger = BroadcastExchanger::create_unique(
1078
0
                cur_pipe->num_tasks(), _num_instances,
1079
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1080
0
                        ? cast_set<int>(
1081
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1082
0
                        : 0);
1083
0
        break;
1084
0
    case TLocalPartitionType::PASS_TO_ONE:
1085
0
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
1086
            // If shared hash table is enabled for BJ, hash table will be built by only one task
1087
0
            shared_state->exchanger = PassToOneExchanger::create_unique(
1088
0
                    cur_pipe->num_tasks(), _num_instances,
1089
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1090
0
                            ? cast_set<int>(_runtime_state->query_options()
1091
0
                                                    .local_exchange_free_blocks_limit)
1092
0
                            : 0);
1093
0
        } else {
1094
0
            shared_state->exchanger = BroadcastExchanger::create_unique(
1095
0
                    cur_pipe->num_tasks(), _num_instances,
1096
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1097
0
                            ? cast_set<int>(_runtime_state->query_options()
1098
0
                                                    .local_exchange_free_blocks_limit)
1099
0
                            : 0);
1100
0
        }
1101
0
        break;
1102
0
    case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
1103
0
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
1104
0
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1105
0
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1106
0
                        ? cast_set<int>(
1107
0
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1108
0
                        : 0);
1109
0
        break;
1110
0
    default:
1111
0
        return Status::InternalError("Unsupported local exchange type : " +
1112
0
                                     std::to_string((int)data_distribution.distribution_type));
1113
324
    }
1114
324
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
1115
324
                                             "LOCAL_EXCHANGE_OPERATOR");
1116
324
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
1117
324
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
1118
1119
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
1120
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
1121
1122
    // 3.1 Initialize new pipeline's operator list.
1123
324
    std::copy(operators.begin(), operators.begin() + idx,
1124
324
              std::inserter(new_pip->operators(), new_pip->operators().end()));
1125
1126
    // 3.2 Erase unused operators in previous pipeline.
1127
324
    operators.erase(operators.begin(), operators.begin() + idx);
1128
1129
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
1130
324
    OperatorPtr source_op;
1131
324
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
1132
324
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
1133
324
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
1134
324
    if (!operators.empty()) {
1135
0
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
1136
0
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
1137
0
    }
1138
324
    operators.insert(operators.begin(), source_op);
1139
1140
    // 5. Set children for two pipelines separately.
1141
324
    std::vector<std::shared_ptr<Pipeline>> new_children;
1142
324
    std::vector<PipelineId> edges_with_source;
1143
324
    for (auto child : cur_pipe->children()) {
1144
324
        bool found = false;
1145
324
        for (auto op : new_pip->operators()) {
1146
324
            if (child->sink()->node_id() == op->node_id()) {
1147
0
                new_pip->set_children(child);
1148
0
                found = true;
1149
0
            };
1150
324
        }
1151
324
        if (!found) {
1152
324
            new_children.push_back(child);
1153
324
            edges_with_source.push_back(child->id());
1154
324
        }
1155
324
    }
1156
324
    new_children.push_back(new_pip);
1157
324
    edges_with_source.push_back(new_pip->id());
1158
1159
    // 6. Set DAG for new pipelines.
1160
324
    if (!new_pip->children().empty()) {
1161
0
        std::vector<PipelineId> edges_with_sink;
1162
0
        for (auto child : new_pip->children()) {
1163
0
            edges_with_sink.push_back(child->id());
1164
0
        }
1165
0
        _dag.insert({new_pip->id(), edges_with_sink});
1166
0
    }
1167
324
    cur_pipe->set_children(new_children);
1168
324
    _dag[downstream_pipeline_id] = edges_with_source;
1169
324
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
1170
324
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
1171
324
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
1172
1173
    // 7. Inherit properties from current pipeline.
1174
324
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
1175
324
    return Status::OK();
1176
324
}
1177
1178
Status PipelineFragmentContext::_add_local_exchange(
1179
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
1180
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
1181
        const std::map<int, int>& bucket_seq_to_instance_idx,
1182
14.3k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1183
14.3k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
1184
13.9k
        return Status::OK();
1185
13.9k
    }
1186
1187
324
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
1188
0
        return Status::OK();
1189
0
    }
1190
324
    *do_local_exchange = true;
1191
1192
324
    auto& operators = cur_pipe->operators();
1193
324
    auto total_op_num = operators.size();
1194
324
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
1195
324
    RETURN_IF_ERROR(_add_local_exchange_impl(
1196
324
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
1197
324
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1198
1199
324
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
1200
0
            << "total_op_num: " << total_op_num
1201
0
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
1202
0
            << " new_pip->operators().size(): " << new_pip->operators().size();
1203
1204
    // There are some local shuffles with relatively heavy operations on the sink.
1205
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
1206
    // Therefore, local passthrough is used to increase the concurrency of the sink.
1207
    // op -> local sink(1) -> local source (n)
1208
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
1209
324
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
1210
324
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
1211
0
        RETURN_IF_ERROR(_add_local_exchange_impl(
1212
0
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
1213
0
                add_pipeline(new_pip, pip_idx + 2),
1214
0
                DataDistribution(TLocalPartitionType::PASSTHROUGH), do_local_exchange, num_buckets,
1215
0
                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1216
0
    }
1217
324
    return Status::OK();
1218
324
}
1219
1220
Status PipelineFragmentContext::_plan_local_exchange(
1221
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
1222
73.1k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1223
185k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
1224
112k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
1225
        // Set property if child pipeline is not join operator's child.
1226
112k
        if (!_pipelines[pip_idx]->children().empty()) {
1227
30.9k
            for (auto& child : _pipelines[pip_idx]->children()) {
1228
30.9k
                if (child->sink()->node_id() ==
1229
30.9k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
1230
25.3k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
1231
25.3k
                }
1232
30.9k
            }
1233
28.1k
        }
1234
1235
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1236
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1237
        // still keep colocate plan after local shuffle
1238
112k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1239
112k
                                             bucket_seq_to_instance_idx,
1240
112k
                                             shuffle_idx_to_instance_idx));
1241
112k
    }
1242
73.1k
    return Status::OK();
1243
73.1k
}
1244
1245
Status PipelineFragmentContext::_plan_local_exchange(
1246
        int num_buckets, int pip_idx, PipelinePtr pip,
1247
        const std::map<int, int>& bucket_seq_to_instance_idx,
1248
112k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1249
112k
    int idx = 1;
1250
112k
    bool do_local_exchange = false;
1251
112k
    do {
1252
112k
        auto& ops = pip->operators();
1253
112k
        do_local_exchange = false;
1254
        // Plan local exchange for each operator.
1255
123k
        for (; idx < ops.size();) {
1256
10.9k
            auto _le_req = ops[idx]->required_data_distribution(_runtime_state.get());
1257
10.9k
            if (_le_req.need_local_exchange()) {
1258
8.12k
                RETURN_IF_ERROR(_add_local_exchange(
1259
8.12k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, _le_req,
1260
8.12k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1261
8.12k
                        shuffle_idx_to_instance_idx));
1262
8.12k
            }
1263
10.9k
            if (do_local_exchange) {
1264
                // If local exchange is needed for current operator, we will split this pipeline to
1265
                // two pipelines by local exchange sink/source. And then we need to process remaining
1266
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1267
                // is current operator was already processed) and continue to plan local exchange.
1268
0
                idx = 2;
1269
0
                break;
1270
0
            }
1271
10.9k
            idx++;
1272
10.9k
        }
1273
112k
    } while (do_local_exchange);
1274
112k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1275
6.18k
        RETURN_IF_ERROR(_add_local_exchange(
1276
6.18k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1277
6.18k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1278
6.18k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1279
6.18k
    }
1280
112k
    return Status::OK();
1281
112k
}
1282
1283
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1284
                                                  const std::vector<TExpr>& output_exprs,
1285
                                                  const TPipelineFragmentParams& params,
1286
                                                  const RowDescriptor& row_desc,
1287
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1288
124k
                                                  PipelineId cur_pipeline_id) {
1289
124k
    switch (thrift_sink.type) {
1290
40.8k
    case TDataSinkType::DATA_STREAM_SINK: {
1291
40.8k
        if (!thrift_sink.__isset.stream_sink) {
1292
0
            return Status::InternalError("Missing data stream sink.");
1293
0
        }
1294
40.8k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1295
40.8k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1296
40.8k
                params.destinations, _fragment_instance_ids);
1297
40.8k
        break;
1298
40.8k
    }
1299
73.2k
    case TDataSinkType::RESULT_SINK: {
1300
73.2k
        if (!thrift_sink.__isset.result_sink) {
1301
0
            return Status::InternalError("Missing data buffer sink.");
1302
0
        }
1303
1304
73.2k
        auto& pipeline = _pipelines[cur_pipeline_id];
1305
73.2k
        int child_node_id = pipeline->operators().back()->node_id();
1306
73.2k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), child_node_id + 1,
1307
73.2k
                                                      row_desc, output_exprs,
1308
73.2k
                                                      thrift_sink.result_sink);
1309
73.2k
        break;
1310
73.2k
    }
1311
12
    case TDataSinkType::DICTIONARY_SINK: {
1312
12
        if (!thrift_sink.__isset.dictionary_sink) {
1313
0
            return Status::InternalError("Missing dict sink.");
1314
0
        }
1315
1316
12
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1317
12
                                                    thrift_sink.dictionary_sink);
1318
12
        break;
1319
12
    }
1320
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1321
4.13k
    case TDataSinkType::OLAP_TABLE_SINK: {
1322
4.13k
        auto& pipeline = _pipelines[cur_pipeline_id];
1323
4.13k
        int child_node_id = pipeline->operators().back()->node_id();
1324
4.13k
        if (state->query_options().enable_memtable_on_sink_node &&
1325
4.13k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1326
4.13k
            !_has_row_binlog(thrift_sink.olap_table_sink) && !config::is_cloud_mode()) {
1327
2.70k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(
1328
2.70k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1329
2.70k
        } else {
1330
1.43k
            _sink = std::make_shared<OlapTableSinkOperatorX>(
1331
1.43k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1332
1.43k
        }
1333
4.13k
        break;
1334
0
    }
1335
0
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1336
0
        DCHECK(thrift_sink.__isset.olap_table_sink);
1337
0
        DCHECK(state->get_query_ctx() != nullptr);
1338
0
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1339
0
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1340
0
                                                                output_exprs);
1341
0
        break;
1342
0
    }
1343
1.46k
    case TDataSinkType::HIVE_TABLE_SINK: {
1344
1.46k
        if (!thrift_sink.__isset.hive_table_sink) {
1345
0
            return Status::InternalError("Missing hive table sink.");
1346
0
        }
1347
1.46k
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1348
1.46k
                                                         output_exprs);
1349
1.46k
        break;
1350
1.46k
    }
1351
1.73k
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1352
1.73k
        if (!thrift_sink.__isset.iceberg_table_sink) {
1353
0
            return Status::InternalError("Missing iceberg table sink.");
1354
0
        }
1355
1.73k
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1356
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1357
0
                                                                     row_desc, output_exprs);
1358
1.73k
        } else {
1359
1.73k
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1360
1.73k
                                                                row_desc, output_exprs);
1361
1.73k
        }
1362
1.73k
        break;
1363
1.73k
    }
1364
20
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1365
20
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1366
0
            return Status::InternalError("Missing iceberg delete sink.");
1367
0
        }
1368
20
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1369
20
                                                             row_desc, output_exprs);
1370
20
        break;
1371
20
    }
1372
80
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1373
80
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1374
0
            return Status::InternalError("Missing iceberg merge sink.");
1375
0
        }
1376
80
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1377
80
                                                            output_exprs);
1378
80
        break;
1379
80
    }
1380
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1381
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1382
0
            return Status::InternalError("Missing max compute table sink.");
1383
0
        }
1384
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1385
0
                                                       output_exprs);
1386
0
        break;
1387
0
    }
1388
88
    case TDataSinkType::JDBC_TABLE_SINK: {
1389
88
        if (!thrift_sink.__isset.jdbc_table_sink) {
1390
0
            return Status::InternalError("Missing data jdbc sink.");
1391
0
        }
1392
88
        if (config::enable_java_support) {
1393
88
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1394
88
                                                             output_exprs);
1395
88
        } else {
1396
0
            return Status::InternalError(
1397
0
                    "Jdbc table sink is not enabled, you can change be config "
1398
0
                    "enable_java_support to true and restart be.");
1399
0
        }
1400
88
        break;
1401
88
    }
1402
88
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1403
0
        if (!thrift_sink.__isset.memory_scratch_sink) {
1404
0
            return Status::InternalError("Missing data buffer sink.");
1405
0
        }
1406
1407
0
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1408
0
                                                             output_exprs);
1409
0
        break;
1410
0
    }
1411
164
    case TDataSinkType::RESULT_FILE_SINK: {
1412
164
        if (!thrift_sink.__isset.result_file_sink) {
1413
0
            return Status::InternalError("Missing result file sink.");
1414
0
        }
1415
1416
        // Result file sink is not the top sink
1417
164
        if (params.__isset.destinations && !params.destinations.empty()) {
1418
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1419
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1420
0
                    params.destinations, output_exprs, desc_tbl);
1421
164
        } else {
1422
164
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1423
164
                                                              output_exprs);
1424
164
        }
1425
164
        break;
1426
164
    }
1427
2.79k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1428
2.79k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1429
2.79k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1430
2.79k
        auto sink_id = next_sink_operator_id();
1431
2.79k
        const int multi_cast_node_id = sink_id;
1432
2.79k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1433
        // one sink has multiple sources.
1434
2.79k
        std::vector<int> sources;
1435
11.1k
        for (int i = 0; i < sender_size; ++i) {
1436
8.39k
            auto source_id = next_operator_id();
1437
8.39k
            sources.push_back(source_id);
1438
8.39k
        }
1439
1440
2.79k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1441
2.79k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1442
11.1k
        for (int i = 0; i < sender_size; ++i) {
1443
8.39k
            auto new_pipeline = add_pipeline();
1444
            // use to exchange sink
1445
8.39k
            RowDescriptor* exchange_row_desc = nullptr;
1446
8.39k
            {
1447
8.39k
                const auto& tmp_row_desc =
1448
8.39k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1449
8.39k
                                ? RowDescriptor(state->desc_tbl(),
1450
8.39k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1451
8.39k
                                                         .output_tuple_id})
1452
8.39k
                                : row_desc;
1453
8.39k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1454
8.39k
            }
1455
8.39k
            auto source_id = sources[i];
1456
8.39k
            OperatorPtr source_op;
1457
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1458
8.39k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1459
8.39k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1460
8.39k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1461
8.39k
                    /*operator_id=*/source_id);
1462
8.39k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1463
8.39k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1464
            // 2. create and set sink operator of data stream sender for new pipeline
1465
1466
8.39k
            DataSinkOperatorPtr sink_op;
1467
8.39k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1468
8.39k
                    state, *exchange_row_desc, next_sink_operator_id(),
1469
8.39k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1470
8.39k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1471
1472
8.39k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1473
8.39k
            {
1474
8.39k
                TDataSink* t = pool->add(new TDataSink());
1475
8.39k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1476
8.39k
                RETURN_IF_ERROR(sink_op->init(*t));
1477
8.39k
            }
1478
1479
            // 3. set dependency dag
1480
8.39k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1481
8.39k
        }
1482
2.79k
        if (sources.empty()) {
1483
0
            return Status::InternalError("size of sources must be greater than 0");
1484
0
        }
1485
2.79k
        break;
1486
2.79k
    }
1487
2.79k
    case TDataSinkType::BLACKHOLE_SINK: {
1488
10
        if (!thrift_sink.__isset.blackhole_sink) {
1489
0
            return Status::InternalError("Missing blackhole sink.");
1490
0
        }
1491
1492
10
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1493
10
        break;
1494
10
    }
1495
156
    case TDataSinkType::TVF_TABLE_SINK: {
1496
156
        if (!thrift_sink.__isset.tvf_table_sink) {
1497
0
            return Status::InternalError("Missing TVF table sink.");
1498
0
        }
1499
156
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1500
156
                                                        output_exprs);
1501
156
        break;
1502
156
    }
1503
0
    default:
1504
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1505
124k
    }
1506
124k
    return Status::OK();
1507
124k
}
1508
1509
// NOLINTBEGIN(readability-function-size)
1510
// NOLINTBEGIN(readability-function-cognitive-complexity)
1511
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1512
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1513
                                                 PipelinePtr& cur_pipe, int parent_idx,
1514
                                                 int child_idx,
1515
                                                 const bool followed_by_shuffled_operator,
1516
                                                 const bool require_bucket_distribution,
1517
205k
                                                 OperatorPtr& cache_op) {
1518
205k
    std::vector<DataSinkOperatorPtr> sink_ops;
1519
205k
    Defer defer = Defer([&]() {
1520
205k
        if (op) {
1521
205k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1522
205k
        }
1523
205k
        for (auto& s : sink_ops) {
1524
67.1k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1525
67.1k
        }
1526
205k
    });
1527
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1528
    // Therefore, here we need to use a stack-like structure.
1529
205k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1530
205k
    std::stringstream error_msg;
1531
205k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1532
1533
205k
    bool fe_with_old_version = false;
1534
205k
    switch (tnode.node_type) {
1535
53.1k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1536
53.1k
        op = std::make_shared<OlapScanOperatorX>(
1537
53.1k
                pool, tnode, next_operator_id(), descs, _num_instances,
1538
53.1k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1539
53.1k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1540
53.1k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1541
53.1k
        break;
1542
53.1k
    }
1543
0
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1544
0
        DCHECK(_query_ctx != nullptr);
1545
0
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1546
0
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1547
0
                                                    _num_instances);
1548
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1549
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1550
0
        break;
1551
0
    }
1552
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1553
0
        if (config::enable_java_support) {
1554
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1555
0
                                                     _num_instances);
1556
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1557
0
        } else {
1558
0
            return Status::InternalError(
1559
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1560
0
                    "to true and restart be.");
1561
0
        }
1562
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1563
0
        break;
1564
0
    }
1565
22.0k
    case TPlanNodeType::FILE_SCAN_NODE: {
1566
22.0k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1567
22.0k
                                                 _num_instances);
1568
22.0k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1569
22.0k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1570
22.0k
        break;
1571
22.0k
    }
1572
49.2k
    case TPlanNodeType::EXCHANGE_NODE: {
1573
49.2k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1574
49.2k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1575
18.4E
                                  : 0;
1576
49.2k
        DCHECK_GT(num_senders, 0);
1577
49.2k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1578
49.2k
                                                       num_senders);
1579
49.2k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1580
49.2k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1581
49.2k
        break;
1582
49.2k
    }
1583
33.5k
    case TPlanNodeType::AGGREGATION_NODE: {
1584
33.5k
        if (tnode.agg_node.grouping_exprs.empty() &&
1585
33.5k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1586
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1587
0
                                         ": group by and output is empty");
1588
0
        }
1589
33.5k
        bool need_create_cache_op =
1590
33.5k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1591
33.5k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1592
0
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1593
0
            auto cache_source_id = next_operator_id();
1594
0
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1595
0
                                                        _params.fragment.query_cache_param);
1596
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1597
1598
0
            const auto downstream_pipeline_id = cur_pipe->id();
1599
0
            if (!_dag.contains(downstream_pipeline_id)) {
1600
0
                _dag.insert({downstream_pipeline_id, {}});
1601
0
            }
1602
0
            new_pipe = add_pipeline(cur_pipe);
1603
0
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1604
1605
0
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1606
0
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1607
0
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1608
0
            return Status::OK();
1609
0
        };
1610
33.5k
        const bool group_by_limit_opt =
1611
33.5k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1612
1613
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1614
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1615
33.5k
        const bool enable_spill = _runtime_state->enable_spill() &&
1616
33.5k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1617
33.5k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1618
33.5k
                                      tnode.agg_node.use_streaming_preaggregation &&
1619
33.5k
                                      !tnode.agg_node.grouping_exprs.empty();
1620
        // TODO: distinct streaming agg does not support spill.
1621
33.5k
        const bool can_use_distinct_streaming_agg =
1622
33.5k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1623
33.5k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1624
33.5k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1625
33.5k
                _params.query_options.enable_distinct_streaming_aggregation;
1626
1627
33.5k
        if (can_use_distinct_streaming_agg) {
1628
36
            if (need_create_cache_op) {
1629
0
                PipelinePtr new_pipe;
1630
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1631
1632
0
                cache_op = op;
1633
0
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1634
0
                                                                     tnode, descs);
1635
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1636
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1637
0
                cur_pipe = new_pipe;
1638
36
            } else {
1639
36
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1640
36
                                                                     tnode, descs);
1641
36
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1642
36
            }
1643
33.4k
        } else if (is_streaming_agg) {
1644
3.20k
            if (need_create_cache_op) {
1645
0
                PipelinePtr new_pipe;
1646
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1647
0
                cache_op = op;
1648
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1649
0
                                                             descs);
1650
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1651
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1652
0
                cur_pipe = new_pipe;
1653
3.20k
            } else {
1654
3.20k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1655
3.20k
                                                             descs);
1656
3.20k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1657
3.20k
            }
1658
30.2k
        } else {
1659
            // create new pipeline to add query cache operator
1660
30.2k
            PipelinePtr new_pipe;
1661
30.2k
            if (need_create_cache_op) {
1662
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1663
0
                cache_op = op;
1664
0
            }
1665
1666
30.2k
            if (enable_spill) {
1667
154
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1668
154
                                                                     next_operator_id(), descs);
1669
30.1k
            } else {
1670
30.1k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1671
30.1k
            }
1672
30.2k
            if (need_create_cache_op) {
1673
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1674
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1675
0
                cur_pipe = new_pipe;
1676
30.2k
            } else {
1677
30.2k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1678
30.2k
            }
1679
1680
30.2k
            const auto downstream_pipeline_id = cur_pipe->id();
1681
30.2k
            if (!_dag.contains(downstream_pipeline_id)) {
1682
27.4k
                _dag.insert({downstream_pipeline_id, {}});
1683
27.4k
            }
1684
30.2k
            cur_pipe = add_pipeline(cur_pipe);
1685
30.2k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1686
1687
30.2k
            if (enable_spill) {
1688
154
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1689
154
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1690
30.1k
            } else {
1691
30.1k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1692
30.1k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1693
30.1k
            }
1694
30.2k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1695
30.2k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1696
30.2k
        }
1697
33.5k
        break;
1698
33.5k
    }
1699
33.5k
    case TPlanNodeType::BUCKETED_AGGREGATION_NODE: {
1700
0
        if (tnode.bucketed_agg_node.grouping_exprs.empty()) {
1701
0
            return Status::InternalError(
1702
0
                    "Bucketed aggregation node {} should not be used without group by keys",
1703
0
                    tnode.node_id);
1704
0
        }
1705
1706
        // Create source operator (goes on the current / downstream pipeline).
1707
0
        op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1708
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1709
1710
        // Create a new pipeline for the sink side.
1711
0
        const auto downstream_pipeline_id = cur_pipe->id();
1712
0
        if (!_dag.contains(downstream_pipeline_id)) {
1713
0
            _dag.insert({downstream_pipeline_id, {}});
1714
0
        }
1715
0
        cur_pipe = add_pipeline(cur_pipe);
1716
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1717
1718
        // Create sink operator.
1719
0
        sink_ops.push_back(std::make_shared<BucketedAggSinkOperatorX>(
1720
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1721
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1722
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1723
1724
        // Pre-register a single shared state for ALL instances so that every
1725
        // sink instance writes its per-instance hash table into the same
1726
        // BucketedAggSharedState and every source instance can merge across
1727
        // all of them.
1728
0
        {
1729
0
            auto shared_state = BucketedAggSharedState::create_shared();
1730
0
            shared_state->id = op->operator_id();
1731
0
            shared_state->related_op_ids.insert(op->operator_id());
1732
1733
0
            for (int i = 0; i < _num_instances; i++) {
1734
0
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1735
0
                                                             "BUCKETED_AGG_SINK_DEPENDENCY");
1736
0
                sink_dep->set_shared_state(shared_state.get());
1737
0
                shared_state->sink_deps.push_back(sink_dep);
1738
0
            }
1739
0
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1740
0
                                                     op->node_id(), "BUCKETED_AGG_SOURCE");
1741
0
            _op_id_to_shared_state.insert(
1742
0
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1743
0
        }
1744
0
        break;
1745
0
    }
1746
824
    case TPlanNodeType::HASH_JOIN_NODE: {
1747
824
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1748
824
                                       tnode.hash_join_node.is_broadcast_join;
1749
824
        const auto enable_spill = _runtime_state->enable_spill();
1750
824
        if (enable_spill && !is_broadcast_join) {
1751
0
            auto tnode_ = tnode;
1752
0
            tnode_.runtime_filters.clear();
1753
0
            auto inner_probe_operator =
1754
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1755
1756
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1757
            // So here use `tnode_` which has no runtime filters.
1758
0
            auto probe_side_inner_sink_operator =
1759
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1760
1761
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1762
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1763
1764
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1765
0
                    pool, tnode_, next_operator_id(), descs);
1766
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1767
0
                                                inner_probe_operator);
1768
0
            op = std::move(probe_operator);
1769
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1770
1771
0
            const auto downstream_pipeline_id = cur_pipe->id();
1772
0
            if (!_dag.contains(downstream_pipeline_id)) {
1773
0
                _dag.insert({downstream_pipeline_id, {}});
1774
0
            }
1775
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1776
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1777
1778
0
            auto inner_sink_operator =
1779
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1780
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1781
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1782
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1783
1784
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1785
0
            sink_ops.push_back(std::move(sink_operator));
1786
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1787
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1788
1789
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1790
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1791
824
        } else {
1792
824
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1793
824
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1794
1795
824
            const auto downstream_pipeline_id = cur_pipe->id();
1796
824
            if (!_dag.contains(downstream_pipeline_id)) {
1797
812
                _dag.insert({downstream_pipeline_id, {}});
1798
812
            }
1799
824
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1800
824
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1801
1802
824
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1803
824
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1804
824
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1805
824
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1806
1807
824
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1808
824
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1809
824
        }
1810
824
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1811
748
            std::shared_ptr<HashJoinSharedState> shared_state =
1812
748
                    HashJoinSharedState::create_shared(_num_instances);
1813
6.53k
            for (int i = 0; i < _num_instances; i++) {
1814
5.78k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1815
5.78k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1816
5.78k
                sink_dep->set_shared_state(shared_state.get());
1817
5.78k
                shared_state->sink_deps.push_back(sink_dep);
1818
5.78k
            }
1819
748
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1820
748
                                                     op->node_id(), "HASH_JOIN_PROBE");
1821
748
            _op_id_to_shared_state.insert(
1822
748
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1823
748
        }
1824
824
        break;
1825
824
    }
1826
5.66k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1827
5.66k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1828
5.66k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1829
1830
5.66k
        const auto downstream_pipeline_id = cur_pipe->id();
1831
5.66k
        if (!_dag.contains(downstream_pipeline_id)) {
1832
5.66k
            _dag.insert({downstream_pipeline_id, {}});
1833
5.66k
        }
1834
5.66k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1835
5.66k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1836
1837
5.66k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1838
5.66k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1839
5.66k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1840
5.66k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1841
5.66k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1842
5.66k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1843
5.66k
        break;
1844
5.66k
    }
1845
4.23k
    case TPlanNodeType::UNION_NODE: {
1846
4.23k
        int child_count = tnode.num_children;
1847
4.23k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1848
4.23k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1849
1850
4.23k
        const auto downstream_pipeline_id = cur_pipe->id();
1851
4.23k
        if (!_dag.contains(downstream_pipeline_id)) {
1852
4.20k
            _dag.insert({downstream_pipeline_id, {}});
1853
4.20k
        }
1854
4.64k
        for (int i = 0; i < child_count; i++) {
1855
408
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1856
408
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1857
408
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1858
408
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1859
408
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1860
408
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1861
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1862
408
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1863
408
        }
1864
4.23k
        break;
1865
4.23k
    }
1866
13.2k
    case TPlanNodeType::SORT_NODE: {
1867
13.2k
        const auto should_spill = _runtime_state->enable_spill() &&
1868
13.2k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1869
13.2k
        const bool use_local_merge =
1870
13.2k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1871
13.2k
        if (should_spill) {
1872
2
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1873
13.2k
        } else if (use_local_merge) {
1874
12.7k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1875
12.7k
                                                                 descs);
1876
12.7k
        } else {
1877
472
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1878
472
        }
1879
13.2k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1880
1881
13.2k
        const auto downstream_pipeline_id = cur_pipe->id();
1882
13.2k
        if (!_dag.contains(downstream_pipeline_id)) {
1883
13.2k
            _dag.insert({downstream_pipeline_id, {}});
1884
13.2k
        }
1885
13.2k
        cur_pipe = add_pipeline(cur_pipe);
1886
13.2k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1887
1888
13.2k
        if (should_spill) {
1889
2
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1890
2
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1891
13.2k
        } else {
1892
13.2k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1893
13.2k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1894
13.2k
        }
1895
13.2k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1896
13.2k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1897
13.2k
        break;
1898
13.2k
    }
1899
13.2k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1900
0
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1901
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1902
1903
0
        const auto downstream_pipeline_id = cur_pipe->id();
1904
0
        if (!_dag.contains(downstream_pipeline_id)) {
1905
0
            _dag.insert({downstream_pipeline_id, {}});
1906
0
        }
1907
0
        cur_pipe = add_pipeline(cur_pipe);
1908
0
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1909
1910
0
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1911
0
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1912
0
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1913
0
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1914
0
        break;
1915
0
    }
1916
44
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1917
44
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1918
44
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1919
1920
44
        const auto downstream_pipeline_id = cur_pipe->id();
1921
44
        if (!_dag.contains(downstream_pipeline_id)) {
1922
44
            _dag.insert({downstream_pipeline_id, {}});
1923
44
        }
1924
44
        cur_pipe = add_pipeline(cur_pipe);
1925
44
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1926
1927
44
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1928
44
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1929
44
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1930
44
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1931
44
        break;
1932
44
    }
1933
934
    case TPlanNodeType::MATERIALIZATION_NODE: {
1934
934
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1935
934
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1936
934
        break;
1937
934
    }
1938
934
    case TPlanNodeType::INTERSECT_NODE: {
1939
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1940
0
                                                                      cur_pipe, sink_ops));
1941
0
        break;
1942
0
    }
1943
0
    case TPlanNodeType::EXCEPT_NODE: {
1944
0
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1945
0
                                                                       cur_pipe, sink_ops));
1946
0
        break;
1947
0
    }
1948
0
    case TPlanNodeType::REPEAT_NODE: {
1949
0
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1950
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1951
0
        break;
1952
0
    }
1953
4
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1954
4
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1955
4
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1956
4
        break;
1957
4
    }
1958
200
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1959
200
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1960
200
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1961
200
        break;
1962
200
    }
1963
200
    case TPlanNodeType::EMPTY_SET_NODE: {
1964
158
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1965
158
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1966
158
        break;
1967
158
    }
1968
194
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1969
194
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1970
194
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1971
194
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1972
194
        break;
1973
194
    }
1974
508
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1975
508
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1976
508
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1977
508
        break;
1978
508
    }
1979
2.06k
    case TPlanNodeType::META_SCAN_NODE: {
1980
2.06k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1981
2.06k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1982
2.06k
        break;
1983
2.06k
    }
1984
2.78k
    case TPlanNodeType::SELECT_NODE: {
1985
2.78k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1986
2.78k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1987
2.78k
        break;
1988
2.78k
    }
1989
2.78k
    case TPlanNodeType::REC_CTE_NODE: {
1990
0
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1991
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1992
1993
0
        const auto downstream_pipeline_id = cur_pipe->id();
1994
0
        if (!_dag.contains(downstream_pipeline_id)) {
1995
0
            _dag.insert({downstream_pipeline_id, {}});
1996
0
        }
1997
1998
0
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1999
0
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
2000
2001
0
        DataSinkOperatorPtr anchor_sink;
2002
0
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
2003
0
                                                                  op->operator_id(), tnode, descs);
2004
0
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
2005
0
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
2006
0
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
2007
2008
0
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
2009
0
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
2010
2011
0
        DataSinkOperatorPtr rec_sink;
2012
0
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
2013
0
                                                         tnode, descs);
2014
0
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
2015
0
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
2016
0
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
2017
2018
0
        break;
2019
0
    }
2020
0
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
2021
0
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
2022
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2023
0
        break;
2024
0
    }
2025
16.7k
    case TPlanNodeType::LOCAL_EXCHANGE_NODE: {
2026
16.7k
        op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs);
2027
        // The downstream pipeline (containing LocalExchangeSource) must have
2028
        // _num_instances tasks — matching BE-native _inherit_pipeline_properties
2029
        // which sets pipe_with_source.set_num_tasks(_num_instances).
2030
        // Without this, when the parent pipeline was reduced by a serial operator
2031
        // (e.g., serial Exchange with use_serial_exchange=true, or UNPARTITIONED
2032
        // Exchange), the downstream inherits the reduced num_tasks via
2033
        // add_pipeline(parent).  The deferred exchanger creates _num_instances
2034
        // channels but only fewer source tasks initialize mem_counters — the
2035
        // sink round-robins to all channels and crashes on uninitialized ones.
2036
16.7k
        auto downstream_num_tasks = _num_instances;
2037
16.7k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2038
        // Restore downstream pipeline's num_tasks (mirroring _inherit_pipeline_properties:
2039
        // downstream keeps _num_instances, upstream gets the serial/reduced count)
2040
16.7k
        cur_pipe->set_num_tasks(downstream_num_tasks);
2041
2042
16.7k
        const auto downstream_pipeline_id = cur_pipe->id();
2043
16.7k
        if (!_dag.contains(downstream_pipeline_id)) {
2044
16.0k
            _dag.insert({downstream_pipeline_id, {}});
2045
16.0k
        }
2046
16.7k
        cur_pipe = add_pipeline(cur_pipe);
2047
        // If this local exchange was inserted because of a serial scan (is_serial_operator),
2048
        // the upstream pipeline (cur_pipe) should have num_tasks=1 (only 1 scan task).
2049
        // We set this now so the exchanger is created with the correct sender count.
2050
        // Child operators added later (serial scan) will also set num_tasks=1, which is
2051
        // consistent with this.
2052
16.7k
        if (op->is_serial_operator() && _parallel_instances > 0) {
2053
0
            cur_pipe->set_num_tasks(_parallel_instances);
2054
0
        }
2055
16.7k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
2056
16.7k
        int num_partitions = 0;
2057
16.7k
        std::map<int, int> shuffle_id_to_instance_idx;
2058
16.7k
        auto partition_type = tnode.local_exchange_node.partition_type;
2059
16.7k
        switch (partition_type) {
2060
8
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
2061
8
            num_partitions = _params.num_buckets;
2062
8
            shuffle_id_to_instance_idx = _params.bucket_seq_to_instance_idx;
2063
8
            break;
2064
666
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
2065
5.99k
            for (int i = 0; i < _num_instances; i++) {
2066
5.32k
                shuffle_id_to_instance_idx[i] = i;
2067
5.32k
            }
2068
666
            num_partitions = _num_instances;
2069
666
            break;
2070
0
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
2071
0
            num_partitions = _total_instances;
2072
0
            shuffle_id_to_instance_idx = _params.shuffle_idx_to_instance_idx;
2073
0
            break;
2074
16.0k
        default:
2075
16.0k
            break;
2076
16.7k
        }
2077
16.7k
        auto local_exchange_id = op->operator_id();
2078
16.7k
        auto sink_id = next_sink_operator_id();
2079
16.7k
        DataSinkOperatorPtr sink = std::make_shared<LocalExchangeSinkOperatorX>(
2080
16.7k
                sink_id, local_exchange_id, tnode, num_partitions, shuffle_id_to_instance_idx);
2081
16.7k
        sink_ops.push_back(sink);
2082
16.7k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
2083
16.7k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
2084
2085
        // For FE-planned local exchange, we need to:
2086
        // 1. Initialize the partitioner for hash shuffle types
2087
        // 2. Defer exchanger creation until after the full plan tree is built
2088
        //    (child operators like serial ExchangeNode may change cur_pipe->num_tasks())
2089
        // 3. Register shared state so pipeline tasks can find it
2090
16.7k
        RETURN_IF_ERROR(static_cast<LocalExchangeSinkOperatorX*>(cur_pipe->sink())
2091
16.7k
                                ->init_partitioner(_runtime_state.get()));
2092
2093
16.7k
        int free_blocks_limit =
2094
16.7k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
2095
16.7k
                        ? cast_set<int>(
2096
16.7k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
2097
18.4E
                        : 0;
2098
16.7k
        auto shared_state = LocalExchangeSharedState::create_shared(_num_instances);
2099
16.7k
        shared_state->create_source_dependencies(_num_instances, local_exchange_id,
2100
16.7k
                                                 local_exchange_id, "LOCAL_EXCHANGE_OPERATOR");
2101
16.7k
        shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
2102
16.7k
        _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
2103
        // Defer exchanger creation: sender count depends on final upstream num_tasks
2104
16.7k
        _deferred_exchangers.push_back({shared_state, cur_pipe, partition_type, num_partitions,
2105
16.7k
                                        free_blocks_limit, local_exchange_id, sink_id});
2106
16.7k
        break;
2107
16.7k
    }
2108
0
    default:
2109
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
2110
0
                                     print_plan_node_type(tnode.node_type));
2111
205k
    }
2112
205k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
2113
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
2114
0
        op->set_serial_operator();
2115
0
    }
2116
2117
205k
    return Status::OK();
2118
205k
}
2119
// NOLINTEND(readability-function-cognitive-complexity)
2120
// NOLINTEND(readability-function-size)
2121
2122
template <bool is_intersect>
2123
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
2124
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
2125
0
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2126
0
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2127
0
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2128
2129
0
    const auto downstream_pipeline_id = cur_pipe->id();
2130
0
    if (!_dag.contains(downstream_pipeline_id)) {
2131
0
        _dag.insert({downstream_pipeline_id, {}});
2132
0
    }
2133
2134
0
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2135
0
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2136
0
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2137
2138
0
        if (child_id == 0) {
2139
0
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2140
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2141
0
        } else {
2142
0
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2143
0
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2144
0
        }
2145
0
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2146
0
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2147
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2148
0
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2149
0
    }
2150
2151
0
    return Status::OK();
2152
0
}
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Unexecuted instantiation: _ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
2153
2154
124k
Status PipelineFragmentContext::submit() {
2155
124k
    if (_submitted) {
2156
0
        return Status::InternalError("submitted");
2157
0
    }
2158
124k
    _submitted = true;
2159
2160
124k
    int submit_tasks = 0;
2161
124k
    Status st;
2162
124k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
2163
250k
    for (auto& task : _tasks) {
2164
451k
        for (auto& t : task) {
2165
451k
            st = scheduler->submit(t.first);
2166
451k
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
2167
451k
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
2168
451k
            if (!st) {
2169
0
                cancel(Status::InternalError("submit context to executor fail"));
2170
0
                std::lock_guard<std::mutex> l(_task_mutex);
2171
0
                _total_tasks = submit_tasks;
2172
0
                break;
2173
0
            }
2174
451k
            submit_tasks++;
2175
451k
        }
2176
250k
    }
2177
124k
    if (!st.ok()) {
2178
0
        bool need_remove = false;
2179
0
        {
2180
0
            std::lock_guard<std::mutex> l(_task_mutex);
2181
0
            if (_closed_tasks >= _total_tasks) {
2182
0
                need_remove = _close_fragment_instance();
2183
0
            }
2184
0
        }
2185
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2186
0
        if (need_remove) {
2187
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2188
0
        }
2189
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
2190
0
                                     BackendOptions::get_localhost());
2191
124k
    } else {
2192
124k
        return st;
2193
124k
    }
2194
124k
}
2195
2196
0
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
2197
0
    if (_runtime_state->enable_profile()) {
2198
0
        std::stringstream ss;
2199
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2200
0
            runtime_profile_ptr->pretty_print(&ss);
2201
0
        }
2202
2203
0
        if (_runtime_state->load_channel_profile()) {
2204
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2205
0
        }
2206
2207
0
        auto profile_str =
2208
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
2209
0
                            this->_fragment_id, extra_info, ss.str());
2210
0
        LOG_LONG_STRING(INFO, profile_str);
2211
0
    }
2212
0
}
2213
// If all pipeline tasks binded to the fragment instance are finished, then we could
2214
// close the fragment instance.
2215
// Returns true if the caller should call remove_pipeline_context() **after** releasing
2216
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
2217
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
2218
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
2219
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
2220
// (via debug_string()).
2221
124k
bool PipelineFragmentContext::_close_fragment_instance() {
2222
124k
    if (_is_fragment_instance_closed) {
2223
0
        return false;
2224
0
    }
2225
124k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
2226
124k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
2227
124k
    if (!_need_notify_close) {
2228
124k
        auto st = send_report(true);
2229
124k
        if (!st) {
2230
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
2231
0
                                        print_id(_query_id), _fragment_id, st.to_string());
2232
0
        }
2233
124k
    }
2234
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
2235
    // Since stream load does not have someting like coordinator on FE, so
2236
    // backend can not report profile to FE, ant its profile can not be shown
2237
    // in the same way with other query. So we print the profile content to info log.
2238
2239
124k
    if (_runtime_state->enable_profile() &&
2240
124k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
2241
456
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
2242
456
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
2243
0
        std::stringstream ss;
2244
        // Compute the _local_time_percent before pretty_print the runtime_profile
2245
        // Before add this operation, the print out like that:
2246
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
2247
        // After add the operation, the print out like that:
2248
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
2249
        // We can easily know the exec node execute time without child time consumed.
2250
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2251
0
            runtime_profile_ptr->pretty_print(&ss);
2252
0
        }
2253
2254
0
        if (_runtime_state->load_channel_profile()) {
2255
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2256
0
        }
2257
2258
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
2259
0
    }
2260
2261
124k
    if (_query_ctx->enable_profile()) {
2262
456
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
2263
456
                                         collect_realtime_load_channel_profile());
2264
456
    }
2265
2266
    // Return whether the caller needs to remove from the pipeline map.
2267
    // The caller must do this after releasing _task_mutex.
2268
124k
    return !_need_notify_close;
2269
124k
}
2270
2271
450k
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
2272
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
2273
450k
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
2274
450k
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
2275
200k
        if (_dag.contains(pipeline_id)) {
2276
76.2k
            for (auto dep : _dag[pipeline_id]) {
2277
76.2k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
2278
76.2k
            }
2279
76.1k
        }
2280
200k
    }
2281
450k
    bool need_remove = false;
2282
450k
    {
2283
450k
        std::lock_guard<std::mutex> l(_task_mutex);
2284
450k
        ++_closed_tasks;
2285
        // Update query-level finished task progress in real time.
2286
450k
        _query_ctx->inc_finished_task_num();
2287
450k
        if (_closed_tasks >= _total_tasks) {
2288
124k
            need_remove = _close_fragment_instance();
2289
124k
        }
2290
450k
    }
2291
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2292
450k
    if (need_remove) {
2293
124k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2294
124k
    }
2295
450k
}
2296
2297
13.3k
std::string PipelineFragmentContext::get_load_error_url() {
2298
13.3k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
2299
0
        return to_load_error_http_path(str);
2300
0
    }
2301
43.0k
    for (auto& tasks : _tasks) {
2302
55.9k
        for (auto& task : tasks) {
2303
55.9k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
2304
12
                return to_load_error_http_path(str);
2305
12
            }
2306
55.9k
        }
2307
43.0k
    }
2308
13.3k
    return "";
2309
13.3k
}
2310
2311
13.3k
std::string PipelineFragmentContext::get_first_error_msg() {
2312
13.3k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
2313
0
        return str;
2314
0
    }
2315
43.0k
    for (auto& tasks : _tasks) {
2316
56.0k
        for (auto& task : tasks) {
2317
56.0k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
2318
12
                return str;
2319
12
            }
2320
56.0k
        }
2321
43.0k
    }
2322
13.3k
    return "";
2323
13.3k
}
2324
2325
0
std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const {
2326
0
    std::stringstream url;
2327
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
2328
0
        << "/api/_download_load?"
2329
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
2330
0
    return url.str();
2331
0
}
2332
2333
12.4k
void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) {
2334
12.4k
    DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
2335
12.4k
        int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
2336
12.4k
        LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
2337
12.4k
        std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
2338
12.4k
        LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
2339
12.4k
    });
2340
2341
12.4k
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
2342
12.4k
    if (req.coord_addr.hostname == "external") {
2343
        // External query (flink/spark read tablets) not need to report to FE.
2344
0
        return;
2345
0
    }
2346
12.4k
    int callback_retries = 10;
2347
12.4k
    const int sleep_ms = 1000;
2348
12.4k
    Status exec_status = req.status;
2349
12.4k
    Status coord_status;
2350
12.4k
    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
2351
12.4k
    do {
2352
12.4k
        coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
2353
12.4k
                                                            req.coord_addr, &coord_status);
2354
12.4k
        if (!coord_status.ok()) {
2355
0
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
2356
0
        }
2357
12.4k
    } while (!coord_status.ok() && callback_retries-- > 0);
2358
2359
12.4k
    if (!coord_status.ok()) {
2360
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
2361
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
2362
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
2363
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
2364
0
        return;
2365
0
    }
2366
2367
12.4k
    TReportExecStatusParams params;
2368
12.4k
    params.protocol_version = FrontendServiceVersion::V1;
2369
12.4k
    params.__set_query_id(req.query_id);
2370
12.4k
    params.__set_backend_num(req.backend_num);
2371
12.4k
    params.__set_fragment_instance_id(req.fragment_instance_id);
2372
12.4k
    params.__set_fragment_id(req.fragment_id);
2373
12.4k
    params.__set_status(exec_status.to_thrift());
2374
12.4k
    params.__set_done(req.done);
2375
12.4k
    params.__set_query_type(req.runtime_state->query_type());
2376
12.4k
    params.__isset.profile = false;
2377
2378
12.4k
    DCHECK(req.runtime_state != nullptr);
2379
2380
12.4k
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
2381
11.4k
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2382
11.4k
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2383
11.4k
    } else {
2384
1.01k
        DCHECK(!req.runtime_states.empty());
2385
1.01k
        if (!req.runtime_state->output_files().empty()) {
2386
0
            params.__isset.delta_urls = true;
2387
0
            for (auto& it : req.runtime_state->output_files()) {
2388
0
                params.delta_urls.push_back(_to_http_path(it));
2389
0
            }
2390
0
        }
2391
1.01k
        if (!params.delta_urls.empty()) {
2392
0
            params.__isset.delta_urls = true;
2393
0
        }
2394
1.01k
    }
2395
2396
12.4k
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
2397
12.4k
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
2398
12.4k
    static std::string s_unselected_rows = "unselected.rows";
2399
12.4k
    int64_t num_rows_load_success = 0;
2400
12.4k
    int64_t num_rows_load_filtered = 0;
2401
12.4k
    int64_t num_rows_load_unselected = 0;
2402
12.4k
    if (req.runtime_state->num_rows_load_total() > 0 ||
2403
12.4k
        req.runtime_state->num_rows_load_filtered() > 0 ||
2404
12.4k
        req.runtime_state->num_finished_range() > 0) {
2405
0
        params.__isset.load_counters = true;
2406
2407
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
2408
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
2409
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
2410
0
        params.__isset.fragment_instance_reports = true;
2411
0
        TFragmentInstanceReport t;
2412
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
2413
0
        t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
2414
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2415
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2416
0
        params.fragment_instance_reports.push_back(t);
2417
12.4k
    } else if (!req.runtime_states.empty()) {
2418
47.3k
        for (auto* rs : req.runtime_states) {
2419
47.3k
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
2420
47.3k
                rs->num_finished_range() > 0) {
2421
6.66k
                params.__isset.load_counters = true;
2422
6.66k
                num_rows_load_success += rs->num_rows_load_success();
2423
6.66k
                num_rows_load_filtered += rs->num_rows_load_filtered();
2424
6.66k
                num_rows_load_unselected += rs->num_rows_load_unselected();
2425
6.66k
                params.__isset.fragment_instance_reports = true;
2426
6.66k
                TFragmentInstanceReport t;
2427
6.66k
                t.__set_fragment_instance_id(rs->fragment_instance_id());
2428
6.66k
                t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
2429
6.66k
                t.__set_loaded_rows(rs->num_rows_load_total());
2430
6.66k
                t.__set_loaded_bytes(rs->num_bytes_load_total());
2431
6.66k
                params.fragment_instance_reports.push_back(t);
2432
6.66k
            }
2433
47.3k
        }
2434
12.4k
    }
2435
12.4k
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
2436
12.4k
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
2437
12.4k
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
2438
2439
12.4k
    if (!req.load_error_url.empty()) {
2440
12
        params.__set_tracking_url(req.load_error_url);
2441
12
    }
2442
12.4k
    if (!req.first_error_msg.empty()) {
2443
12
        params.__set_first_error_msg(req.first_error_msg);
2444
12
    }
2445
47.3k
    for (auto* rs : req.runtime_states) {
2446
47.3k
        if (rs->wal_id() > 0) {
2447
0
            params.__set_txn_id(rs->wal_id());
2448
0
            params.__set_label(rs->import_label());
2449
0
        }
2450
47.3k
    }
2451
12.4k
    if (!req.runtime_state->export_output_files().empty()) {
2452
0
        params.__isset.export_files = true;
2453
0
        params.export_files = req.runtime_state->export_output_files();
2454
12.4k
    } else if (!req.runtime_states.empty()) {
2455
47.3k
        for (auto* rs : req.runtime_states) {
2456
47.3k
            if (!rs->export_output_files().empty()) {
2457
0
                params.__isset.export_files = true;
2458
0
                params.export_files.insert(params.export_files.end(),
2459
0
                                           rs->export_output_files().begin(),
2460
0
                                           rs->export_output_files().end());
2461
0
            }
2462
47.3k
        }
2463
12.4k
    }
2464
12.4k
    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
2465
0
        params.__isset.commitInfos = true;
2466
0
        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
2467
12.4k
    } else if (!req.runtime_states.empty()) {
2468
47.3k
        for (auto* rs : req.runtime_states) {
2469
47.3k
            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
2470
2.77k
                params.__isset.commitInfos = true;
2471
2.77k
                params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
2472
2.77k
            }
2473
47.3k
        }
2474
12.4k
    }
2475
12.4k
    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
2476
0
        params.__isset.errorTabletInfos = true;
2477
0
        params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
2478
12.4k
    } else if (!req.runtime_states.empty()) {
2479
47.3k
        for (auto* rs : req.runtime_states) {
2480
47.3k
            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
2481
0
                params.__isset.errorTabletInfos = true;
2482
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
2483
0
                                               rs_eti.end());
2484
0
            }
2485
47.3k
        }
2486
12.4k
    }
2487
12.4k
    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
2488
0
        params.__isset.hive_partition_updates = true;
2489
0
        params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
2490
0
                                             hpu.end());
2491
12.4k
    } else if (!req.runtime_states.empty()) {
2492
47.3k
        for (auto* rs : req.runtime_states) {
2493
47.3k
            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
2494
2.14k
                params.__isset.hive_partition_updates = true;
2495
2.14k
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
2496
2.14k
                                                     rs_hpu.begin(), rs_hpu.end());
2497
2.14k
            }
2498
47.3k
        }
2499
12.4k
    }
2500
12.4k
    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
2501
0
        params.__isset.iceberg_commit_datas = true;
2502
0
        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
2503
0
                                           icd.end());
2504
12.4k
    } else if (!req.runtime_states.empty()) {
2505
47.3k
        for (auto* rs : req.runtime_states) {
2506
47.3k
            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
2507
2.07k
                params.__isset.iceberg_commit_datas = true;
2508
2.07k
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
2509
2.07k
                                                   rs_icd.begin(), rs_icd.end());
2510
2.07k
            }
2511
47.3k
        }
2512
12.4k
    }
2513
2514
12.4k
    if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
2515
0
        params.__isset.mc_commit_datas = true;
2516
0
        params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
2517
12.4k
    } else if (!req.runtime_states.empty()) {
2518
47.3k
        for (auto* rs : req.runtime_states) {
2519
47.3k
            if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
2520
0
                params.__isset.mc_commit_datas = true;
2521
0
                params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
2522
0
                                              rs_mcd.end());
2523
0
            }
2524
47.3k
        }
2525
12.4k
    }
2526
2527
12.4k
    req.runtime_state->get_unreported_errors(&(params.error_log));
2528
12.4k
    params.__isset.error_log = (!params.error_log.empty());
2529
2530
12.4k
    if (_exec_env->cluster_info()->backend_id != 0) {
2531
12.4k
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
2532
12.4k
    }
2533
2534
12.4k
    TReportExecStatusResult res;
2535
12.4k
    Status rpc_status;
2536
2537
12.4k
    VLOG_DEBUG << "reportExecStatus params is "
2538
0
               << apache::thrift::ThriftDebugString(params).c_str();
2539
12.4k
    if (!exec_status.ok()) {
2540
158
        LOG(WARNING) << "report error status: " << exec_status.msg()
2541
158
                     << " to coordinator: " << req.coord_addr
2542
158
                     << ", query id: " << print_id(req.query_id);
2543
158
    }
2544
12.4k
    try {
2545
12.4k
        try {
2546
12.4k
            (*coord)->reportExecStatus(res, params);
2547
12.4k
        } catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) {
2548
#ifndef ADDRESS_SANITIZER
2549
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
2550
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
2551
                         << req.coord_addr << ", err: " << e.what();
2552
#endif
2553
0
            rpc_status = coord->reopen();
2554
2555
0
            if (!rpc_status.ok()) {
2556
0
                req.cancel_fn(rpc_status);
2557
0
                return;
2558
0
            }
2559
0
            (*coord)->reportExecStatus(res, params);
2560
0
        }
2561
2562
12.4k
        rpc_status = Status::create<false>(res.status);
2563
12.4k
    } catch (apache::thrift::TException& e) {
2564
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
2565
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
2566
0
    }
2567
2568
12.4k
    if (!rpc_status.ok()) {
2569
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
2570
0
                 print_id(req.query_id), rpc_status.to_string());
2571
0
        req.cancel_fn(rpc_status);
2572
0
    }
2573
12.4k
}
2574
2575
126k
Status PipelineFragmentContext::send_report(bool done) {
2576
126k
    Status exec_status = _query_ctx->exec_status();
2577
    // If plan is done successfully, but _is_report_success is false,
2578
    // no need to send report.
2579
    // Load will set _is_report_success to true because load wants to know
2580
    // the process.
2581
126k
    if (!_is_report_success && done && exec_status.ok()) {
2582
113k
        return Status::OK();
2583
113k
    }
2584
2585
    // If both _is_report_success and _is_report_on_cancel are false,
2586
    // which means no matter query is success or failed, no report is needed.
2587
    // This may happen when the query limit reached and
2588
    // a internal cancellation being processed
2589
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
2590
    // be set to false, to avoid sending fault report to FE.
2591
12.7k
    if (!_is_report_success && !_is_report_on_cancel) {
2592
218
        if (done) {
2593
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
2594
218
            return Status::OK();
2595
218
        }
2596
0
        return Status::NeedSendAgain("");
2597
218
    }
2598
2599
12.4k
    std::vector<RuntimeState*> runtime_states;
2600
2601
39.6k
    for (auto& tasks : _tasks) {
2602
47.3k
        for (auto& task : tasks) {
2603
47.3k
            runtime_states.push_back(task.second.get());
2604
47.3k
        }
2605
39.6k
    }
2606
2607
12.4k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
2608
12.4k
                                        ? get_load_error_url()
2609
12.4k
                                        : _query_ctx->get_load_error_url();
2610
12.4k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2611
12.4k
                                          ? get_first_error_msg()
2612
12.4k
                                          : _query_ctx->get_first_error_msg();
2613
2614
12.4k
    ReportStatusRequest req {.status = exec_status,
2615
12.4k
                             .runtime_states = runtime_states,
2616
12.4k
                             .done = done || !exec_status.ok(),
2617
12.4k
                             .coord_addr = _query_ctx->coord_addr,
2618
12.4k
                             .query_id = _query_id,
2619
12.4k
                             .fragment_id = _fragment_id,
2620
12.4k
                             .fragment_instance_id = TUniqueId(),
2621
12.4k
                             .backend_num = -1,
2622
12.4k
                             .runtime_state = _runtime_state.get(),
2623
12.4k
                             .load_error_url = load_eror_url,
2624
12.4k
                             .first_error_msg = first_error_msg,
2625
12.4k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2626
12.4k
    auto ctx = std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this());
2627
12.4k
    return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() {
2628
12.4k
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
2629
12.4k
        _coordinator_callback(req);
2630
12.4k
        if (!req.done) {
2631
1.43k
            ctx->refresh_next_report_time();
2632
1.43k
        }
2633
12.4k
    });
2634
12.7k
}
2635
2636
0
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2637
0
    size_t res = 0;
2638
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2639
    // here to traverse the vector.
2640
0
    for (const auto& task_instances : _tasks) {
2641
0
        for (const auto& task : task_instances) {
2642
0
            if (task.first->is_running()) {
2643
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2644
0
                                      << " is running, task: " << (void*)task.first.get()
2645
0
                                      << ", is_running: " << task.first->is_running();
2646
0
                *has_running_task = true;
2647
0
                return 0;
2648
0
            }
2649
2650
0
            size_t revocable_size = task.first->get_revocable_size();
2651
0
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2652
0
                res += revocable_size;
2653
0
            }
2654
0
        }
2655
0
    }
2656
0
    return res;
2657
0
}
2658
2659
0
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2660
0
    std::vector<PipelineTask*> revocable_tasks;
2661
0
    for (const auto& task_instances : _tasks) {
2662
0
        for (const auto& task : task_instances) {
2663
0
            size_t revocable_size_ = task.first->get_revocable_size();
2664
2665
0
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2666
0
                revocable_tasks.emplace_back(task.first.get());
2667
0
            }
2668
0
        }
2669
0
    }
2670
0
    return revocable_tasks;
2671
0
}
2672
2673
0
std::string PipelineFragmentContext::debug_string() {
2674
0
    std::lock_guard<std::mutex> l(_task_mutex);
2675
0
    fmt::memory_buffer debug_string_buffer;
2676
0
    fmt::format_to(debug_string_buffer,
2677
0
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2678
0
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2679
0
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2680
0
    for (size_t j = 0; j < _tasks.size(); j++) {
2681
0
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2682
0
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2683
0
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2684
0
                           _tasks[j][i].first->debug_string());
2685
0
        }
2686
0
    }
2687
2688
0
    return fmt::to_string(debug_string_buffer);
2689
0
}
2690
2691
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2692
456
PipelineFragmentContext::collect_realtime_profile() const {
2693
456
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2694
2695
    // we do not have mutex to protect pipeline_id_to_profile
2696
    // so we need to make sure this funciton is invoked after fragment context
2697
    // has already been prepared.
2698
456
    if (!_prepared) {
2699
0
        std::string msg =
2700
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2701
0
        DCHECK(false) << msg;
2702
0
        LOG_ERROR(msg);
2703
0
        return res;
2704
0
    }
2705
2706
    // Make sure first profile is fragment level profile
2707
456
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2708
456
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2709
456
    res.push_back(fragment_profile);
2710
2711
    // pipeline_id_to_profile is initialized in prepare stage
2712
664
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2713
664
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2714
664
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2715
664
        res.push_back(profile_ptr);
2716
664
    }
2717
2718
456
    return res;
2719
456
}
2720
2721
std::shared_ptr<TRuntimeProfileTree>
2722
456
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2723
    // we do not have mutex to protect pipeline_id_to_profile
2724
    // so we need to make sure this funciton is invoked after fragment context
2725
    // has already been prepared.
2726
456
    if (!_prepared) {
2727
0
        std::string msg =
2728
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2729
0
        DCHECK(false) << msg;
2730
0
        LOG_ERROR(msg);
2731
0
        return nullptr;
2732
0
    }
2733
2734
624
    for (const auto& tasks : _tasks) {
2735
1.08k
        for (const auto& task : tasks) {
2736
1.08k
            if (task.second->load_channel_profile() == nullptr) {
2737
0
                continue;
2738
0
            }
2739
2740
1.08k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2741
2742
1.08k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2743
1.08k
                                                           _runtime_state->profile_level());
2744
1.08k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2745
1.08k
        }
2746
624
    }
2747
2748
456
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2749
456
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2750
456
                                                      _runtime_state->profile_level());
2751
456
    return load_channel_profile;
2752
456
}
2753
2754
// Collect runtime filter IDs registered by all tasks in this PFC.
2755
// Used during recursive CTE stage transitions to know which filters to deregister
2756
// before creating the new PFC for the next recursion round.
2757
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2758
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2759
// the vector sizes do not change, and individual RuntimeState filter sets are
2760
// written only during open() which has completed by the time we reach rerun.
2761
0
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2762
0
    std::set<int> result;
2763
0
    for (const auto& _task : _tasks) {
2764
0
        for (const auto& task : _task) {
2765
0
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2766
0
            result.merge(set);
2767
0
        }
2768
0
    }
2769
0
    if (_runtime_state) {
2770
0
        auto set = _runtime_state->get_deregister_runtime_filter();
2771
0
        result.merge(set);
2772
0
    }
2773
0
    return result;
2774
0
}
2775
2776
124k
void PipelineFragmentContext::_release_resource() {
2777
124k
    std::lock_guard<std::mutex> l(_task_mutex);
2778
    // The memory released by the query end is recorded in the query mem tracker.
2779
124k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2780
124k
    auto st = _query_ctx->exec_status();
2781
251k
    for (auto& _task : _tasks) {
2782
251k
        if (!_task.empty()) {
2783
251k
            _call_back(_task.front().first->runtime_state(), &st);
2784
251k
        }
2785
251k
    }
2786
124k
    _tasks.clear();
2787
124k
    _dag.clear();
2788
124k
    _pip_id_to_pipeline.clear();
2789
124k
    _pipelines.clear();
2790
124k
    _sink.reset();
2791
124k
    _root_op.reset();
2792
124k
    _runtime_filter_mgr_map.clear();
2793
124k
    _op_id_to_shared_state.clear();
2794
124k
}
2795
2796
} // namespace doris