Coverage Report

Created: 2026-06-29 06:56

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/FrontendService.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/PlanNodes_types.h>
25
#include <pthread.h>
26
27
#include <algorithm>
28
#include <cstdlib>
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <fmt/format.h>
31
#include <thrift/Thrift.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
#include <thrift/transport/TTransportException.h>
34
35
#include <chrono> // IWYU pragma: keep
36
#include <map>
37
#include <memory>
38
#include <ostream>
39
#include <utility>
40
41
#include "cloud/config.h"
42
#include "common/cast_set.h"
43
#include "common/config.h"
44
#include "common/exception.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "exec/exchange/local_exchange_sink_operator.h"
48
#include "exec/exchange/local_exchange_source_operator.h"
49
#include "exec/exchange/local_exchanger.h"
50
#include "exec/exchange/vdata_stream_mgr.h"
51
#include "exec/operator/aggregation_sink_operator.h"
52
#include "exec/operator/aggregation_source_operator.h"
53
#include "exec/operator/analytic_sink_operator.h"
54
#include "exec/operator/analytic_source_operator.h"
55
#include "exec/operator/assert_num_rows_operator.h"
56
#include "exec/operator/blackhole_sink_operator.h"
57
#include "exec/operator/bucketed_aggregation_sink_operator.h"
58
#include "exec/operator/bucketed_aggregation_source_operator.h"
59
#include "exec/operator/cache_sink_operator.h"
60
#include "exec/operator/cache_source_operator.h"
61
#include "exec/operator/datagen_operator.h"
62
#include "exec/operator/dict_sink_operator.h"
63
#include "exec/operator/distinct_streaming_aggregation_operator.h"
64
#include "exec/operator/empty_set_operator.h"
65
#include "exec/operator/exchange_sink_operator.h"
66
#include "exec/operator/exchange_source_operator.h"
67
#include "exec/operator/file_scan_operator.h"
68
#include "exec/operator/group_commit_block_sink_operator.h"
69
#include "exec/operator/group_commit_scan_operator.h"
70
#include "exec/operator/hashjoin_build_sink.h"
71
#include "exec/operator/hashjoin_probe_operator.h"
72
#include "exec/operator/hive_table_sink_operator.h"
73
#include "exec/operator/iceberg_delete_sink_operator.h"
74
#include "exec/operator/iceberg_merge_sink_operator.h"
75
#include "exec/operator/iceberg_table_sink_operator.h"
76
#include "exec/operator/jdbc_scan_operator.h"
77
#include "exec/operator/jdbc_table_sink_operator.h"
78
#include "exec/operator/local_merge_sort_source_operator.h"
79
#include "exec/operator/materialization_opertor.h"
80
#include "exec/operator/maxcompute_table_sink_operator.h"
81
#include "exec/operator/memory_scratch_sink_operator.h"
82
#include "exec/operator/meta_scan_operator.h"
83
#include "exec/operator/multi_cast_data_stream_sink.h"
84
#include "exec/operator/multi_cast_data_stream_source.h"
85
#include "exec/operator/nested_loop_join_build_operator.h"
86
#include "exec/operator/nested_loop_join_probe_operator.h"
87
#include "exec/operator/olap_scan_operator.h"
88
#include "exec/operator/olap_table_sink_operator.h"
89
#include "exec/operator/olap_table_sink_v2_operator.h"
90
#include "exec/operator/partition_sort_sink_operator.h"
91
#include "exec/operator/partition_sort_source_operator.h"
92
#include "exec/operator/partitioned_aggregation_sink_operator.h"
93
#include "exec/operator/partitioned_aggregation_source_operator.h"
94
#include "exec/operator/partitioned_hash_join_probe_operator.h"
95
#include "exec/operator/partitioned_hash_join_sink_operator.h"
96
#include "exec/operator/rec_cte_anchor_sink_operator.h"
97
#include "exec/operator/rec_cte_scan_operator.h"
98
#include "exec/operator/rec_cte_sink_operator.h"
99
#include "exec/operator/rec_cte_source_operator.h"
100
#include "exec/operator/repeat_operator.h"
101
#include "exec/operator/result_file_sink_operator.h"
102
#include "exec/operator/result_sink_operator.h"
103
#include "exec/operator/schema_scan_operator.h"
104
#include "exec/operator/select_operator.h"
105
#include "exec/operator/set_probe_sink_operator.h"
106
#include "exec/operator/set_sink_operator.h"
107
#include "exec/operator/set_source_operator.h"
108
#include "exec/operator/sort_sink_operator.h"
109
#include "exec/operator/sort_source_operator.h"
110
#include "exec/operator/spill_iceberg_table_sink_operator.h"
111
#include "exec/operator/spill_sort_sink_operator.h"
112
#include "exec/operator/spill_sort_source_operator.h"
113
#include "exec/operator/streaming_aggregation_operator.h"
114
#include "exec/operator/table_function_operator.h"
115
#include "exec/operator/tvf_table_sink_operator.h"
116
#include "exec/operator/union_sink_operator.h"
117
#include "exec/operator/union_source_operator.h"
118
#include "exec/pipeline/dependency.h"
119
#include "exec/pipeline/pipeline_task.h"
120
#include "exec/pipeline/task_scheduler.h"
121
#include "exec/runtime_filter/runtime_filter_mgr.h"
122
#include "exec/sort/topn_sorter.h"
123
#include "exec/spill/spill_file.h"
124
#include "io/fs/stream_load_pipe.h"
125
#include "load/stream_load/new_load_stream_mgr.h"
126
#include "runtime/exec_env.h"
127
#include "runtime/fragment_mgr.h"
128
#include "runtime/result_buffer_mgr.h"
129
#include "runtime/runtime_state.h"
130
#include "runtime/thread_context.h"
131
#include "service/backend_options.h"
132
#include "util/client_cache.h"
133
#include "util/countdown_latch.h"
134
#include "util/debug_util.h"
135
#include "util/network_util.h"
136
#include "util/uid_util.h"
137
138
namespace doris {
139
PipelineFragmentContext::PipelineFragmentContext(
140
        TUniqueId query_id, const TPipelineFragmentParams& request,
141
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
142
        const std::function<void(RuntimeState*, Status*)>& call_back)
143
452k
        : _query_id(std::move(query_id)),
144
452k
          _fragment_id(request.fragment_id),
145
452k
          _exec_env(exec_env),
146
452k
          _query_ctx(std::move(query_ctx)),
147
452k
          _call_back(call_back),
148
452k
          _is_report_on_cancel(true),
149
452k
          _params(request),
150
452k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
151
452k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
152
452k
                                                               : false) {
153
452k
    _fragment_watcher.start();
154
452k
}
155
156
452k
PipelineFragmentContext::~PipelineFragmentContext() {
157
452k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
158
452k
            .tag("query_id", print_id(_query_id))
159
452k
            .tag("fragment_id", _fragment_id);
160
452k
    _release_resource();
161
452k
    {
162
        // The memory released by the query end is recorded in the query mem tracker.
163
452k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
164
452k
        _runtime_state.reset();
165
452k
        _query_ctx.reset();
166
452k
    }
167
452k
}
168
169
238
bool PipelineFragmentContext::is_timeout(timespec now) const {
170
238
    if (_timeout <= 0) {
171
0
        return false;
172
0
    }
173
238
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
174
238
}
175
176
// notify_close() transitions the PFC from "waiting for external close notification" to
177
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
178
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
179
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
180
10.3k
bool PipelineFragmentContext::notify_close() {
181
10.3k
    bool all_closed = false;
182
10.3k
    bool need_remove = false;
183
10.3k
    {
184
10.3k
        std::lock_guard<std::mutex> l(_task_mutex);
185
10.3k
        if (_closed_tasks >= _total_tasks) {
186
3.58k
            if (_need_notify_close) {
187
                // Fragment was cancelled and waiting for notify to close.
188
                // Record that we need to remove from fragment mgr, but do it
189
                // after releasing _task_mutex to avoid ABBA deadlock with
190
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
191
                // first, then _task_mutex via debug_string()).
192
3.53k
                need_remove = true;
193
3.53k
            }
194
3.58k
            all_closed = true;
195
3.58k
        }
196
        // make fragment release by self after cancel
197
10.3k
        _need_notify_close = false;
198
10.3k
    }
199
10.3k
    if (need_remove) {
200
3.53k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
201
3.53k
    }
202
10.3k
    return all_closed;
203
10.3k
}
204
205
// Must not add lock in this method. Because it will call query ctx cancel. And
206
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
207
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
208
// There maybe dead lock.
209
6.74k
void PipelineFragmentContext::cancel(const Status reason) {
210
6.74k
    LOG_INFO("PipelineFragmentContext::cancel")
211
6.74k
            .tag("query_id", print_id(_query_id))
212
6.74k
            .tag("fragment_id", _fragment_id)
213
6.74k
            .tag("reason", reason.to_string());
214
6.74k
    if (notify_close()) {
215
72
        return;
216
72
    }
217
    // Timeout is a special error code, we need print current stack to debug timeout issue.
218
6.66k
    if (reason.is<ErrorCode::TIMEOUT>()) {
219
13
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
220
13
                                   debug_string());
221
13
        LOG_LONG_STRING(WARNING, dbg_str);
222
13
    }
223
224
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
225
6.66k
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
226
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
227
0
                    debug_string());
228
0
    }
229
230
6.67k
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
231
0
        print_profile("cancel pipeline, reason: " + reason.to_string());
232
0
    }
233
234
6.66k
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
235
23
        _query_ctx->set_load_error_url(error_url);
236
23
    }
237
238
6.66k
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
239
23
        _query_ctx->set_first_error_msg(first_error_msg);
240
23
    }
241
242
6.66k
    _query_ctx->cancel(reason, _fragment_id);
243
6.66k
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
244
356
        _is_report_on_cancel = false;
245
6.31k
    } else {
246
37.6k
        for (auto& id : _fragment_instance_ids) {
247
37.6k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
248
37.6k
        }
249
6.31k
    }
250
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
251
    // For stream load the fragment's query_id == load id, it is set in FE.
252
6.66k
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
253
6.66k
    if (stream_load_ctx != nullptr) {
254
31
        stream_load_ctx->pipe->cancel(reason.to_string());
255
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
256
        // We need to set the error URL at this point to ensure error information is properly
257
        // propagated to the client.
258
31
        stream_load_ctx->error_url = get_load_error_url();
259
31
        stream_load_ctx->first_error_msg = get_first_error_msg();
260
31
    }
261
262
38.4k
    for (auto& tasks : _tasks) {
263
80.6k
        for (auto& task : tasks) {
264
80.6k
            task.first->unblock_all_dependencies();
265
80.6k
        }
266
38.4k
    }
267
6.66k
}
268
269
694k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
270
694k
    PipelineId id = _next_pipeline_id++;
271
694k
    auto pipeline = std::make_shared<Pipeline>(
272
694k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
273
694k
            parent ? parent->num_tasks() : _num_instances);
274
694k
    if (idx >= 0) {
275
1.16k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
276
693k
    } else {
277
693k
        _pipelines.emplace_back(pipeline);
278
693k
    }
279
694k
    if (parent) {
280
235k
        parent->set_children(pipeline);
281
235k
    }
282
694k
    return pipeline;
283
694k
}
284
285
452k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
286
452k
    {
287
452k
        SCOPED_TIMER(_build_pipelines_timer);
288
        // 2. Build pipelines with operators in this fragment.
289
452k
        auto root_pipeline = add_pipeline();
290
452k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
291
452k
                                         &_root_op, root_pipeline));
292
293
        // Propagate _num_instances from LOCAL_EXCHANGE pipelines to ancestor pipelines
294
        // that inherited reduced num_tasks from a serial operator.
295
452k
        _propagate_local_exchange_num_tasks();
296
297
        // Create deferred local exchangers now that all pipelines have final num_tasks.
298
452k
        RETURN_IF_ERROR(_create_deferred_local_exchangers());
299
300
        // Raise num_tasks for pipelines whose serial non-scan operators (e.g.,
301
        // UNPARTITIONED Exchange) reduced num_tasks below _num_instances.
302
        // Without this, fragment instances 1+ have no task for these pipelines
303
        // and downstream operators fail with "must set shared state".
304
        //
305
        // This applies to ALL pipelines (not just deferred exchanger upstreams):
306
        // fragments with UNION/INTERSECT/EXCEPT + serial Exchange in child
307
        // pipelines also need the raise, even without FE-planned local exchange.
308
        //
309
        // Exception: serial scan sources (pooling scan) keep num_tasks=1 — the
310
        // PassthroughExchanger(1, N) handles the fan-out correctly.
311
        // NOTE: Do NOT raise pipelines whose source is a serial operator
312
        // (Exchange or scan) — they legitimately have 1 task, and raising
313
        // them causes crashes (e.g., 4 Exchange tasks but only 1 receives
314
        // data).  The correct fix for shared state injection across
315
        // instances is handled by the FE: it inserts local exchange nodes
316
        // between serial operators and their downstream consumers, creating
317
        // proper pipeline boundaries with _num_instances tasks.
318
319
        // 3. Create sink operator
320
452k
        if (!_params.fragment.__isset.output_sink) {
321
0
            return Status::InternalError("No output sink in this fragment!");
322
0
        }
323
452k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
324
452k
                                          _params.fragment.output_exprs, _params,
325
452k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
326
452k
                                          *_desc_tbl, root_pipeline->id()));
327
452k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
328
452k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
329
330
692k
        for (PipelinePtr& pipeline : _pipelines) {
331
692k
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
332
692k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
333
692k
        }
334
452k
    }
335
    // 4. Build local exchanger
336
452k
    if (_runtime_state->plan_local_shuffle()) {
337
146k
        SCOPED_TIMER(_plan_local_exchanger_timer);
338
146k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
339
146k
                                             _params.bucket_seq_to_instance_idx,
340
146k
                                             _params.shuffle_idx_to_instance_idx));
341
146k
    }
342
343
    // 5. Initialize global states in pipelines.
344
695k
    for (PipelinePtr& pipeline : _pipelines) {
345
695k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
346
695k
        pipeline->children().clear();
347
695k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
348
695k
    }
349
350
450k
    {
351
450k
        SCOPED_TIMER(_build_tasks_timer);
352
        // 6. Build pipeline tasks and initialize local state.
353
450k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
354
450k
    }
355
356
450k
    return Status::OK();
357
450k
}
358
359
452k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
360
452k
    if (_prepared) {
361
0
        return Status::InternalError("Already prepared");
362
0
    }
363
452k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
364
452k
        _timeout = _params.query_options.execution_timeout;
365
452k
    }
366
367
452k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
368
452k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
369
452k
    SCOPED_TIMER(_prepare_timer);
370
452k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
371
452k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
372
452k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
373
452k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
374
452k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
375
452k
    {
376
452k
        SCOPED_TIMER(_init_context_timer);
377
452k
        cast_set(_num_instances, _params.local_params.size());
378
452k
        _total_instances =
379
452k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
380
381
452k
        auto* fragment_context = this;
382
383
452k
        if (_params.query_options.__isset.is_report_success) {
384
450k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
385
450k
        }
386
387
        // 1. Set up the global runtime state.
388
452k
        _runtime_state = RuntimeState::create_unique(
389
452k
                _params.query_id, _params.fragment_id, _params.query_options,
390
452k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
391
452k
        _runtime_state->set_task_execution_context(shared_from_this());
392
452k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
393
452k
        if (_params.__isset.backend_id) {
394
447k
            _runtime_state->set_backend_id(_params.backend_id);
395
447k
        }
396
452k
        if (_params.__isset.import_label) {
397
242
            _runtime_state->set_import_label(_params.import_label);
398
242
        }
399
452k
        if (_params.__isset.db_name) {
400
193
            _runtime_state->set_db_name(_params.db_name);
401
193
        }
402
452k
        if (_params.__isset.load_job_id) {
403
0
            _runtime_state->set_load_job_id(_params.load_job_id);
404
0
        }
405
406
452k
        if (_params.is_simplified_param) {
407
152k
            _desc_tbl = _query_ctx->desc_tbl;
408
300k
        } else {
409
300k
            DCHECK(_params.__isset.desc_tbl);
410
300k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
411
300k
                                                  &_desc_tbl));
412
300k
        }
413
452k
        _runtime_state->set_desc_tbl(_desc_tbl);
414
452k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
415
452k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
416
452k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
417
452k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
418
419
        // init fragment_instance_ids
420
452k
        const auto target_size = _params.local_params.size();
421
452k
        _fragment_instance_ids.resize(target_size);
422
1.68M
        for (size_t i = 0; i < _params.local_params.size(); i++) {
423
1.22M
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
424
1.22M
            _fragment_instance_ids[i] = fragment_instance_id;
425
1.22M
        }
426
452k
    }
427
428
452k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
429
430
451k
    _init_next_report_time();
431
432
451k
    _prepared = true;
433
451k
    return Status::OK();
434
452k
}
435
436
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
437
        int instance_idx,
438
1.22M
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
439
1.22M
    const auto& local_params = _params.local_params[instance_idx];
440
1.22M
    auto fragment_instance_id = local_params.fragment_instance_id;
441
1.22M
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
442
1.22M
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
443
1.22M
    auto get_shared_state = [&](PipelinePtr pipeline)
444
1.22M
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
445
2.01M
                                       std::vector<std::shared_ptr<Dependency>>>> {
446
2.01M
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
447
2.01M
                                std::vector<std::shared_ptr<Dependency>>>>
448
2.01M
                shared_state_map;
449
2.56M
        for (auto& op : pipeline->operators()) {
450
2.56M
            auto source_id = op->operator_id();
451
2.56M
            if (auto iter = _op_id_to_shared_state.find(source_id);
452
2.56M
                iter != _op_id_to_shared_state.end()) {
453
774k
                shared_state_map.insert({source_id, iter->second});
454
774k
            }
455
2.56M
        }
456
2.01M
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
457
2.01M
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
458
2.01M
                iter != _op_id_to_shared_state.end()) {
459
307k
                shared_state_map.insert({sink_to_source_id, iter->second});
460
307k
            }
461
2.01M
        }
462
2.01M
        return shared_state_map;
463
2.01M
    };
464
465
3.69M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
466
2.47M
        auto& pipeline = _pipelines[pip_idx];
467
2.47M
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
468
2.00M
            auto task_runtime_state = RuntimeState::create_unique(
469
2.00M
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
470
2.00M
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
471
2.00M
            {
472
                // Initialize runtime state for this task
473
2.00M
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
474
475
2.00M
                task_runtime_state->set_task_execution_context(shared_from_this());
476
2.00M
                task_runtime_state->set_be_number(local_params.backend_num);
477
478
2.00M
                if (_params.__isset.backend_id) {
479
2.00M
                    task_runtime_state->set_backend_id(_params.backend_id);
480
2.00M
                }
481
2.00M
                if (_params.__isset.import_label) {
482
243
                    task_runtime_state->set_import_label(_params.import_label);
483
243
                }
484
2.00M
                if (_params.__isset.db_name) {
485
194
                    task_runtime_state->set_db_name(_params.db_name);
486
194
                }
487
2.00M
                if (_params.__isset.load_job_id) {
488
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
489
0
                }
490
2.00M
                if (_params.__isset.wal_id) {
491
115
                    task_runtime_state->set_wal_id(_params.wal_id);
492
115
                }
493
2.00M
                if (_params.__isset.content_length) {
494
34
                    task_runtime_state->set_content_length(_params.content_length);
495
34
                }
496
497
2.00M
                task_runtime_state->set_desc_tbl(_desc_tbl);
498
2.00M
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
499
2.00M
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
500
2.00M
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
501
2.00M
                task_runtime_state->set_max_operator_id(max_operator_id());
502
2.00M
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
503
2.00M
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
504
2.00M
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
505
506
2.00M
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
507
2.00M
            }
508
2.00M
            auto cur_task_id = _total_tasks++;
509
2.00M
            task_runtime_state->set_task_id(cur_task_id);
510
2.00M
            task_runtime_state->set_task_num(pipeline->num_tasks());
511
2.00M
            auto task = std::make_shared<PipelineTask>(
512
2.00M
                    pipeline, cur_task_id, task_runtime_state.get(),
513
2.00M
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
514
2.00M
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
515
2.00M
                    instance_idx);
516
2.00M
            pipeline->incr_created_tasks(instance_idx, task.get());
517
2.00M
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
518
2.00M
            _tasks[instance_idx].emplace_back(
519
2.00M
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
520
2.00M
                            std::move(task), std::move(task_runtime_state)});
521
2.00M
        }
522
2.47M
    }
523
524
    /**
525
         * Build DAG for pipeline tasks.
526
         * For example, we have
527
         *
528
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
529
         *            \                      /
530
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
531
         *                 \                          /
532
         *               JoinProbeOperator2 (Pipeline1)
533
         *
534
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
535
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
536
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
537
         *
538
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
539
         * and JoinProbeOperator2.
540
         */
541
2.47M
    for (auto& _pipeline : _pipelines) {
542
2.47M
        if (pipeline_id_to_task.contains(_pipeline->id())) {
543
2.00M
            auto* task = pipeline_id_to_task[_pipeline->id()];
544
2.00M
            DCHECK(task != nullptr);
545
546
            // If this task has upstream dependency, then inject it into this task.
547
2.00M
            if (_dag.contains(_pipeline->id())) {
548
1.25M
                auto& deps = _dag[_pipeline->id()];
549
1.25M
                for (auto& dep : deps) {
550
1.25M
                    if (pipeline_id_to_task.contains(dep)) {
551
785k
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
552
785k
                        if (ss) {
553
464k
                            task->inject_shared_state(ss);
554
464k
                        } else {
555
321k
                            pipeline_id_to_task[dep]->inject_shared_state(
556
321k
                                    task->get_source_shared_state());
557
321k
                        }
558
785k
                    }
559
1.25M
                }
560
1.25M
            }
561
2.00M
        }
562
2.47M
    }
563
3.70M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
564
2.47M
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
565
2.00M
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
566
2.00M
            DCHECK(pipeline_id_to_profile[pip_idx]);
567
2.00M
            std::vector<TScanRangeParams> scan_ranges;
568
2.00M
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
569
2.00M
            if (local_params.per_node_scan_ranges.contains(node_id)) {
570
351k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
571
351k
            }
572
2.00M
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
573
2.00M
                                                             _params.fragment.output_sink));
574
2.00M
        }
575
2.47M
    }
576
1.23M
    {
577
1.23M
        std::lock_guard<std::mutex> l(_state_map_lock);
578
1.23M
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
579
1.23M
    }
580
1.23M
    return Status::OK();
581
1.22M
}
582
583
451k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
584
451k
    _total_tasks = 0;
585
451k
    _closed_tasks = 0;
586
451k
    const auto target_size = _params.local_params.size();
587
451k
    _tasks.resize(target_size);
588
451k
    _runtime_filter_mgr_map.resize(target_size);
589
1.14M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
590
694k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
591
694k
    }
592
451k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
593
594
451k
    if (target_size > 1 &&
595
451k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
596
148k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
597
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
598
21.9k
        std::vector<Status> prepare_status(target_size);
599
21.9k
        int submitted_tasks = 0;
600
21.9k
        Status submit_status;
601
21.9k
        CountDownLatch latch((int)target_size);
602
226k
        for (int i = 0; i < target_size; i++) {
603
204k
            submit_status = thread_pool->submit_func([&, i]() {
604
204k
                SCOPED_ATTACH_TASK(_query_ctx.get());
605
204k
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
606
204k
                latch.count_down();
607
204k
            });
608
204k
            if (LIKELY(submit_status.ok())) {
609
204k
                submitted_tasks++;
610
18.4E
            } else {
611
18.4E
                break;
612
18.4E
            }
613
204k
        }
614
21.9k
        latch.arrive_and_wait(target_size - submitted_tasks);
615
21.9k
        if (UNLIKELY(!submit_status.ok())) {
616
0
            return submit_status;
617
0
        }
618
226k
        for (int i = 0; i < submitted_tasks; i++) {
619
204k
            if (!prepare_status[i].ok()) {
620
0
                return prepare_status[i];
621
0
            }
622
204k
        }
623
429k
    } else {
624
1.45M
        for (int i = 0; i < target_size; i++) {
625
1.02M
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
626
1.02M
        }
627
429k
    }
628
451k
    _pipeline_parent_map.clear();
629
451k
    _op_id_to_shared_state.clear();
630
    // Record task cardinality once when this fragment context finishes task initialization.
631
451k
    _query_ctx->add_total_task_num(_total_tasks.load(std::memory_order_relaxed));
632
633
451k
    return Status::OK();
634
451k
}
635
636
450k
void PipelineFragmentContext::_init_next_report_time() {
637
450k
    auto interval_s = config::pipeline_status_report_interval;
638
450k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
639
43.8k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
640
43.8k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
641
        // We don't want to wait longer than it takes to run the entire fragment.
642
43.8k
        _previous_report_time =
643
43.8k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
644
43.8k
        _disable_period_report = false;
645
43.8k
    }
646
450k
}
647
648
5.07k
void PipelineFragmentContext::refresh_next_report_time() {
649
5.07k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
650
5.07k
    DCHECK(disable == true);
651
5.07k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
652
5.07k
    _disable_period_report.compare_exchange_strong(disable, false);
653
5.07k
}
654
655
7.39M
void PipelineFragmentContext::trigger_report_if_necessary() {
656
7.39M
    if (!_is_report_success) {
657
6.90M
        return;
658
6.90M
    }
659
490k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
660
490k
    if (disable) {
661
9.69k
        return;
662
9.69k
    }
663
480k
    int32_t interval_s = config::pipeline_status_report_interval;
664
480k
    if (interval_s <= 0) {
665
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
666
0
                        "trigger "
667
0
                        "report.";
668
0
    }
669
480k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
670
480k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
671
480k
    if (MonotonicNanos() > next_report_time) {
672
5.08k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
673
5.08k
                                                            std::memory_order_acq_rel)) {
674
10
            return;
675
10
        }
676
5.07k
        if (VLOG_FILE_IS_ON) {
677
0
            VLOG_FILE << "Reporting "
678
0
                      << "profile for query_id " << print_id(_query_id)
679
0
                      << ", fragment id: " << _fragment_id;
680
681
0
            std::stringstream ss;
682
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
683
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
684
0
            if (_runtime_state->load_channel_profile()) {
685
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
686
0
            }
687
688
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
689
0
                      << " profile:\n"
690
0
                      << ss.str();
691
0
        }
692
5.07k
        auto st = send_report(false);
693
5.07k
        if (!st.ok()) {
694
0
            disable = true;
695
0
            _disable_period_report.compare_exchange_strong(disable, false,
696
0
                                                           std::memory_order_acq_rel);
697
0
        }
698
5.07k
    }
699
480k
}
700
701
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
702
448k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
703
448k
    if (_params.fragment.plan.nodes.empty()) {
704
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
705
0
    }
706
707
448k
    int node_idx = 0;
708
709
448k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
710
448k
                                        &node_idx, root, cur_pipe, 0, false, false));
711
712
448k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
713
0
        return Status::InternalError(
714
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
715
0
    }
716
448k
    return Status::OK();
717
448k
}
718
719
451k
Status PipelineFragmentContext::_create_deferred_local_exchangers() {
720
451k
    for (auto& info : _deferred_exchangers) {
721
        // DANGER ZONE — do not "fix" this line without reading the history.
722
        //
723
        // sender_count seeds Exchanger::_running_sink_operators, which the source side
724
        // waits to reach 0 via sub_running_sink_operators on each sink LocalState close.
725
        // The correct value is THIS pipeline-instance's sink task count, which is exactly
726
        // info.upstream_pipe->num_tasks() — one PipelineTask per task, one close per task.
727
        //
728
        // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to mirror the
729
        //   BE-planned path in _add_local_exchange_impl (~line 1023).  THIS BREAKS the
730
        //   common FE-planned shape of `serial scan → LE(PT) → ...`: upstream_pipe
731
        //   genuinely has num_tasks=1, only 1 close arrives, but seed becomes
732
        //   _num_instances so _running_sink_operators never reaches 0 — downstream
733
        //   sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
734
        //   mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING and regressed
735
        //   exactly this way).  BE-planned mode uses max() because its
736
        //   `cur_pipe` is the source-side pipeline (always raised to _num_instances by
737
        //   add_pipeline) — not analogous to our `upstream_pipe` here, which is the
738
        //   sink-side pipeline that may legitimately stay at 1 for serial sources.
739
        //
740
        // Tempting wrong fix #2: multiply by _num_instances on the theory shared_state
741
        //   is shared across all instances.  Same hang — each fragment-instance
742
        //   PipelineFragmentContext has its OWN _op_id_to_shared_state map, so the
743
        //   exchanger is per-instance, not per-BE.  num_tasks() is already the right
744
        //   close-count for one instance.
745
        //
746
        // If a hang shows up with `_running_sink_operators < 0`, the bug is upstream:
747
        // _propagate_local_exchange_num_tasks left num_tasks too low (or too high) for
748
        // this fragment shape.  Fix THAT pass, not this seed value.
749
113k
        const int sender_count = info.upstream_pipe->num_tasks();
750
113k
        switch (info.partition_type) {
751
13.8k
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
752
13.8k
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
753
13.8k
            info.shared_state->exchanger = ShuffleExchanger::create_unique(
754
13.8k
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit,
755
13.8k
                    info.partition_type);
756
13.8k
            break;
757
502
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
758
502
            info.shared_state->exchanger = BucketShuffleExchanger::create_unique(
759
502
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit);
760
502
            break;
761
95.1k
        case TLocalPartitionType::PASSTHROUGH:
762
95.1k
            info.shared_state->exchanger = PassthroughExchanger::create_unique(
763
95.1k
                    sender_count, _num_instances, info.free_blocks_limit);
764
95.1k
            break;
765
337
        case TLocalPartitionType::BROADCAST:
766
337
            info.shared_state->exchanger = BroadcastExchanger::create_unique(
767
337
                    sender_count, _num_instances, info.free_blocks_limit);
768
337
            break;
769
2.61k
        case TLocalPartitionType::PASS_TO_ONE:
770
2.61k
            if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
771
1.55k
                info.shared_state->exchanger = PassToOneExchanger::create_unique(
772
1.55k
                        sender_count, _num_instances, info.free_blocks_limit);
773
1.55k
            } else {
774
1.05k
                info.shared_state->exchanger = BroadcastExchanger::create_unique(
775
1.05k
                        sender_count, _num_instances, info.free_blocks_limit);
776
1.05k
            }
777
2.61k
            break;
778
912
        case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
779
912
            info.shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
780
912
                    sender_count, _num_instances, info.free_blocks_limit);
781
912
            break;
782
0
        case TLocalPartitionType::NOOP:
783
0
        case TLocalPartitionType::LOCAL_MERGE_SORT:
784
            // FE-planned LocalExchangeNode currently never emits NOOP or LOCAL_MERGE_SORT
785
            // through the deferred-exchanger path.  NOOP means "no exchange needed" and
786
            // is filtered out before reaching here; LOCAL_MERGE_SORT is planned by the
787
            // legacy BE path only.  Crash in debug to surface the protocol violation if
788
            // that ever changes; return an error in release to avoid silently corrupting
789
            // execution.
790
0
            DCHECK(false) << "FE-planned local exchange should not emit partition_type="
791
0
                          << static_cast<int>(info.partition_type);
792
0
            return Status::InternalError("FE-planned local exchange emitted unsupported type: " +
793
0
                                         std::to_string(static_cast<int>(info.partition_type)));
794
0
        default:
795
            // New TLocalPartitionType added on FE side without a BE handler here.
796
0
            DCHECK(false) << "Unhandled TLocalPartitionType in deferred exchangers: "
797
0
                          << static_cast<int>(info.partition_type);
798
0
            return Status::InternalError("Unsupported FE-planned local exchange type: " +
799
0
                                         std::to_string(static_cast<int>(info.partition_type)));
800
113k
        }
801
113k
    }
802
451k
    _deferred_exchangers.clear();
803
451k
    return Status::OK();
804
451k
}
805
806
451k
void PipelineFragmentContext::_propagate_local_exchange_num_tasks() {
807
    // Only runs when FE has planned local exchanges and BE deferred their construction.
808
    // In legacy mode (enable_local_shuffle_planner=false) BE plans LE itself via
809
    // _plan_local_exchange and _deferred_exchangers stays empty — the legacy path
810
    // already gets its num_tasks right at construction time, so the propagate passes
811
    // would be no-ops and are skipped.  This is a transitional design: once the FE
812
    // planner is the only planner, the propagation logic itself should degrade into
813
    // a pure assertion that the FE plan already wired the right num_tasks everywhere.
814
451k
    if (_deferred_exchangers.empty()) {
815
358k
        return;
816
358k
    }
817
    // Reconcile num_tasks across paired pipelines created by pipeline-splitting operators
818
    // (AGG, SORT, JOIN): they share state via inject_shared_state and must agree, or
819
    // instance 1+ tasks access null shared_state.  A pipeline's num_tasks is fully
820
    // determined by its source operator plus its upstreams:
821
    //   - LocalExchangeSource  -> _num_instances (the LE re-parallelizes)
822
    //   - serial source        -> its reduced count (kept as-is, typically 1)
823
    //   - otherwise (splitter) -> inherit from upstreams: raise to _num_instances if any
824
    //                             upstream was raised by an LE, then lower to a serial
825
    //                             upstream's count (lower wins).
826
    // Visiting each pipeline only after all its upstreams (topological order over _dag) lets
827
    // a single sweep reach the same fixpoint the previous two while-loops iterated to — those
828
    // only existed to reconcile the top-down build's parent-inherited num_tasks guesses.
829
93.2k
    std::map<PipelineId, PipelinePtr> id_to_pipe;
830
93.2k
    std::map<PipelineId, std::vector<PipelineId>> downstreams_of;
831
93.2k
    std::map<PipelineId, int> in_degree;
832
264k
    for (auto& p : _pipelines) {
833
264k
        id_to_pipe[p->id()] = p;
834
264k
        in_degree.try_emplace(p->id(), 0);
835
264k
    }
836
164k
    for (const auto& [downstream_id, upstream_ids] : _dag) {
837
170k
        for (auto upstream_id : upstream_ids) {
838
170k
            downstreams_of[upstream_id].push_back(downstream_id);
839
170k
            in_degree[downstream_id]++;
840
170k
        }
841
164k
    }
842
93.2k
    std::vector<PipelineId> ready;
843
264k
    for (const auto& [id, deg] : in_degree) {
844
264k
        if (deg == 0) {
845
99.1k
            ready.push_back(id);
846
99.1k
        }
847
264k
    }
848
93.2k
    size_t visited = 0;
849
357k
    while (!ready.empty()) {
850
264k
        const auto id = ready.back();
851
264k
        ready.pop_back();
852
264k
        visited++;
853
264k
        auto pit = id_to_pipe.find(id);
854
264k
        if (pit != id_to_pipe.end()) {
855
264k
            auto& pipe = pit->second;
856
264k
            const auto& ops = pipe->operators();
857
264k
            const bool le_source =
858
264k
                    !ops.empty() && dynamic_cast<LocalExchangeSourceOperatorX*>(ops.front().get());
859
264k
            const bool serial_source = !ops.empty() && ops.front()->is_serial_operator();
860
264k
            if (le_source) {
861
113k
                pipe->set_num_tasks(_num_instances);
862
150k
            } else if (!serial_source) {
863
70.5k
                int target = pipe->num_tasks();
864
70.5k
                const auto up_it = _dag.find(id);
865
70.5k
                if (up_it != _dag.end()) {
866
                    // raise: any upstream already at _num_instances (e.g. an LE source)
867
51.4k
                    for (auto upstream_id : up_it->second) {
868
51.4k
                        auto uit = id_to_pipe.find(upstream_id);
869
51.4k
                        if (uit != id_to_pipe.end() && uit->second->num_tasks() >= _num_instances) {
870
51.4k
                            target = _num_instances;
871
51.4k
                            break;
872
51.4k
                        }
873
51.4k
                    }
874
                    // lower: a serial upstream with fewer tasks (wins over the raise above)
875
52.1k
                    for (auto upstream_id : up_it->second) {
876
52.1k
                        auto uit = id_to_pipe.find(upstream_id);
877
52.1k
                        if (uit != id_to_pipe.end() && uit->second->num_tasks() < target &&
878
52.1k
                            !uit->second->operators().empty() &&
879
52.1k
                            uit->second->operators().front()->is_serial_operator()) {
880
0
                            target = uit->second->num_tasks();
881
0
                        }
882
52.1k
                    }
883
51.4k
                }
884
70.5k
                pipe->set_num_tasks(target);
885
70.5k
            }
886
264k
        }
887
264k
        for (auto down : downstreams_of[id]) {
888
170k
            if (--in_degree[down] == 0) {
889
164k
                ready.push_back(down);
890
164k
            }
891
170k
        }
892
264k
    }
893
    // The pipeline DAG is acyclic; if a future change introduces a back-edge, some pipelines
894
    // stay unvisited (in_degree never reaches 0) — fail loudly rather than silently leaving
895
    // their num_tasks unreconciled.
896
93.2k
    DCHECK_EQ(visited, in_degree.size())
897
0
            << "pipeline num_tasks topological sweep visited " << visited << " of "
898
0
            << in_degree.size() << " pipelines (cycle in _dag?)";
899
93.2k
}
900
901
Status PipelineFragmentContext::_create_tree_helper(
902
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
903
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
904
793k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
905
    // propagate error case
906
793k
    if (*node_idx >= tnodes.size()) {
907
0
        return Status::InternalError(
908
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
909
0
                *node_idx, tnodes.size());
910
0
    }
911
793k
    const TPlanNode& tnode = tnodes[*node_idx];
912
913
793k
    int num_children = tnodes[*node_idx].num_children;
914
793k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
915
793k
    bool current_require_bucket_distribution = require_bucket_distribution;
916
    // TODO: Create CacheOperator is confused now
917
793k
    OperatorPtr op = nullptr;
918
793k
    OperatorPtr cache_op = nullptr;
919
793k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
920
793k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
921
793k
                                     followed_by_shuffled_operator,
922
793k
                                     current_require_bucket_distribution, cache_op));
923
    // Initialization must be done here. For example, group by expressions in agg will be used to
924
    // decide if a local shuffle should be planed, so it must be initialized here.
925
793k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
926
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
927
793k
    if (parent != nullptr) {
928
        // add to parent's child(s)
929
343k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
930
449k
    } else {
931
449k
        *root = op;
932
449k
    }
933
    /**
934
     * `TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
935
     *
936
     * For plan:
937
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
938
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
939
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
940
     *
941
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
942
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
943
     */
944
793k
    auto required_data_distribution =
945
793k
            cur_pipe->operators().empty()
946
793k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
947
793k
                    : op->required_data_distribution(_runtime_state.get());
948
793k
    current_followed_by_shuffled_operator =
949
793k
            ((followed_by_shuffled_operator ||
950
793k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
951
726k
                                             : op->is_shuffled_operator())) &&
952
793k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
953
793k
            (followed_by_shuffled_operator &&
954
677k
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
955
956
793k
    current_require_bucket_distribution =
957
793k
            ((require_bucket_distribution ||
958
793k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
959
733k
                                             : op->is_colocated_operator())) &&
960
793k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
961
793k
            (require_bucket_distribution &&
962
684k
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
963
964
793k
    if (num_children == 0) {
965
468k
        _use_serial_source = op->is_serial_operator();
966
468k
    }
967
    // rely on that tnodes is preorder of the plan
968
1.13M
    for (int i = 0; i < num_children; i++) {
969
344k
        ++*node_idx;
970
344k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
971
344k
                                            current_followed_by_shuffled_operator,
972
344k
                                            current_require_bucket_distribution));
973
974
        // we are expecting a child, but have used all nodes
975
        // this means we have been given a bad tree and must fail
976
344k
        if (*node_idx >= tnodes.size()) {
977
0
            return Status::InternalError(
978
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
979
0
                    "nodes: {}",
980
0
                    *node_idx, tnodes.size());
981
0
        }
982
344k
    }
983
984
793k
    return Status::OK();
985
793k
}
986
987
void PipelineFragmentContext::_inherit_pipeline_properties(
988
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
989
1.16k
        PipelinePtr pipe_with_sink) {
990
1.16k
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
991
1.16k
    pipe_with_source->set_num_tasks(_num_instances);
992
1.16k
    pipe_with_source->set_data_distribution(data_distribution);
993
1.16k
}
994
995
Status PipelineFragmentContext::_add_local_exchange_impl(
996
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
997
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
998
        const std::map<int, int>& bucket_seq_to_instance_idx,
999
1.16k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1000
1.16k
    auto& operators = cur_pipe->operators();
1001
1.16k
    const auto downstream_pipeline_id = cur_pipe->id();
1002
1.16k
    auto local_exchange_id = next_operator_id();
1003
    // 1. Create a new pipeline with local exchange sink.
1004
1.16k
    DataSinkOperatorPtr sink;
1005
1.16k
    auto sink_id = next_sink_operator_id();
1006
1007
    /**
1008
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
1009
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
1010
     */
1011
1.16k
    const bool followed_by_shuffled_operator =
1012
1.16k
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
1013
1.16k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
1014
1.16k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
1015
1.16k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
1016
1.16k
                                         followed_by_shuffled_operator && !_use_serial_source;
1017
1.16k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
1018
1.16k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
1019
1.16k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
1020
1.16k
    if (bucket_seq_to_instance_idx.empty() &&
1021
1.16k
        data_distribution.distribution_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
1022
2
        data_distribution.distribution_type =
1023
2
                use_global_hash_shuffle ? TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE
1024
2
                                        : TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1025
2
    }
1026
1.16k
    if (!use_global_hash_shuffle &&
1027
1.16k
        data_distribution.distribution_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
1028
117
        data_distribution.distribution_type = TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1029
117
    }
1030
1.16k
    RETURN_IF_ERROR(new_pip->set_sink(sink));
1031
1.16k
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
1032
1.16k
                                          num_buckets, shuffle_idx_to_instance_idx));
1033
1034
    // 2. Create and initialize LocalExchangeSharedState.
1035
1.16k
    std::shared_ptr<LocalExchangeSharedState> shared_state =
1036
1.16k
            LocalExchangeSharedState::create_shared(_num_instances);
1037
1.16k
    switch (data_distribution.distribution_type) {
1038
117
    case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
1039
120
    case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
1040
120
        shared_state->exchanger = ShuffleExchanger::create_unique(
1041
120
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1042
120
                use_global_hash_shuffle ? _total_instances : _num_instances,
1043
120
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1044
120
                        ? cast_set<int>(
1045
120
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1046
120
                        : 0,
1047
120
                data_distribution.distribution_type);
1048
120
        break;
1049
10
    case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
1050
10
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
1051
10
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
1052
10
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1053
10
                        ? cast_set<int>(
1054
10
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1055
10
                        : 0);
1056
10
        break;
1057
941
    case TLocalPartitionType::PASSTHROUGH:
1058
941
        shared_state->exchanger = PassthroughExchanger::create_unique(
1059
941
                cur_pipe->num_tasks(), _num_instances,
1060
941
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1061
941
                        ? cast_set<int>(
1062
941
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1063
941
                        : 0);
1064
941
        break;
1065
8
    case TLocalPartitionType::BROADCAST:
1066
8
        shared_state->exchanger = BroadcastExchanger::create_unique(
1067
8
                cur_pipe->num_tasks(), _num_instances,
1068
8
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1069
8
                        ? cast_set<int>(
1070
8
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1071
8
                        : 0);
1072
8
        break;
1073
2
    case TLocalPartitionType::PASS_TO_ONE:
1074
2
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
1075
            // If shared hash table is enabled for BJ, hash table will be built by only one task
1076
0
            shared_state->exchanger = PassToOneExchanger::create_unique(
1077
0
                    cur_pipe->num_tasks(), _num_instances,
1078
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1079
0
                            ? cast_set<int>(_runtime_state->query_options()
1080
0
                                                    .local_exchange_free_blocks_limit)
1081
0
                            : 0);
1082
2
        } else {
1083
2
            shared_state->exchanger = BroadcastExchanger::create_unique(
1084
2
                    cur_pipe->num_tasks(), _num_instances,
1085
2
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1086
2
                            ? cast_set<int>(_runtime_state->query_options()
1087
2
                                                    .local_exchange_free_blocks_limit)
1088
2
                            : 0);
1089
2
        }
1090
2
        break;
1091
83
    case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
1092
83
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
1093
83
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1094
83
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1095
83
                        ? cast_set<int>(
1096
83
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1097
83
                        : 0);
1098
83
        break;
1099
0
    default:
1100
0
        return Status::InternalError("Unsupported local exchange type : " +
1101
0
                                     std::to_string((int)data_distribution.distribution_type));
1102
1.16k
    }
1103
1.16k
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
1104
1.16k
                                             "LOCAL_EXCHANGE_OPERATOR");
1105
1.16k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
1106
1.16k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
1107
1108
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
1109
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
1110
1111
    // 3.1 Initialize new pipeline's operator list.
1112
1.16k
    std::copy(operators.begin(), operators.begin() + idx,
1113
1.16k
              std::inserter(new_pip->operators(), new_pip->operators().end()));
1114
1115
    // 3.2 Erase unused operators in previous pipeline.
1116
1.16k
    operators.erase(operators.begin(), operators.begin() + idx);
1117
1118
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
1119
1.16k
    OperatorPtr source_op;
1120
1.16k
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
1121
1.16k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
1122
1.16k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
1123
1.16k
    if (!operators.empty()) {
1124
264
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
1125
264
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
1126
264
    }
1127
1.16k
    operators.insert(operators.begin(), source_op);
1128
1129
    // 5. Set children for two pipelines separately.
1130
1.16k
    std::vector<std::shared_ptr<Pipeline>> new_children;
1131
1.16k
    std::vector<PipelineId> edges_with_source;
1132
2.17k
    for (auto child : cur_pipe->children()) {
1133
2.17k
        bool found = false;
1134
3.01k
        for (auto op : new_pip->operators()) {
1135
3.01k
            if (child->sink()->node_id() == op->node_id()) {
1136
715
                new_pip->set_children(child);
1137
715
                found = true;
1138
715
            };
1139
3.01k
        }
1140
2.17k
        if (!found) {
1141
1.46k
            new_children.push_back(child);
1142
1.46k
            edges_with_source.push_back(child->id());
1143
1.46k
        }
1144
2.17k
    }
1145
1.16k
    new_children.push_back(new_pip);
1146
1.16k
    edges_with_source.push_back(new_pip->id());
1147
1148
    // 6. Set DAG for new pipelines.
1149
1.16k
    if (!new_pip->children().empty()) {
1150
390
        std::vector<PipelineId> edges_with_sink;
1151
715
        for (auto child : new_pip->children()) {
1152
715
            edges_with_sink.push_back(child->id());
1153
715
        }
1154
390
        _dag.insert({new_pip->id(), edges_with_sink});
1155
390
    }
1156
1.16k
    cur_pipe->set_children(new_children);
1157
1.16k
    _dag[downstream_pipeline_id] = edges_with_source;
1158
1.16k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
1159
1.16k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
1160
1.16k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
1161
1162
    // 7. Inherit properties from current pipeline.
1163
1.16k
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
1164
1.16k
    return Status::OK();
1165
1.16k
}
1166
1167
Status PipelineFragmentContext::_add_local_exchange(
1168
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
1169
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
1170
        const std::map<int, int>& bucket_seq_to_instance_idx,
1171
12.6k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1172
12.6k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
1173
10.8k
        return Status::OK();
1174
10.8k
    }
1175
1176
1.80k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
1177
694
        return Status::OK();
1178
694
    }
1179
1.10k
    *do_local_exchange = true;
1180
1181
1.10k
    auto& operators = cur_pipe->operators();
1182
1.10k
    auto total_op_num = operators.size();
1183
1.10k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
1184
1.10k
    RETURN_IF_ERROR(_add_local_exchange_impl(
1185
1.10k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
1186
1.10k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1187
1188
1.10k
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
1189
0
            << "total_op_num: " << total_op_num
1190
0
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
1191
0
            << " new_pip->operators().size(): " << new_pip->operators().size();
1192
1193
    // There are some local shuffles with relatively heavy operations on the sink.
1194
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
1195
    // Therefore, local passthrough is used to increase the concurrency of the sink.
1196
    // op -> local sink(1) -> local source (n)
1197
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
1198
1.10k
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
1199
1.10k
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
1200
55
        RETURN_IF_ERROR(_add_local_exchange_impl(
1201
55
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
1202
55
                add_pipeline(new_pip, pip_idx + 2),
1203
55
                DataDistribution(TLocalPartitionType::PASSTHROUGH), do_local_exchange, num_buckets,
1204
55
                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1205
55
    }
1206
1.10k
    return Status::OK();
1207
1.10k
}
1208
1209
Status PipelineFragmentContext::_plan_local_exchange(
1210
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
1211
146k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1212
333k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
1213
186k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
1214
        // Set property if child pipeline is not join operator's child.
1215
186k
        if (!_pipelines[pip_idx]->children().empty()) {
1216
33.2k
            for (auto& child : _pipelines[pip_idx]->children()) {
1217
33.2k
                if (child->sink()->node_id() ==
1218
33.2k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
1219
26.5k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
1220
26.5k
                }
1221
33.2k
            }
1222
30.1k
        }
1223
1224
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1225
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1226
        // still keep colocate plan after local shuffle
1227
186k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1228
186k
                                             bucket_seq_to_instance_idx,
1229
186k
                                             shuffle_idx_to_instance_idx));
1230
186k
    }
1231
146k
    return Status::OK();
1232
146k
}
1233
1234
Status PipelineFragmentContext::_plan_local_exchange(
1235
        int num_buckets, int pip_idx, PipelinePtr pip,
1236
        const std::map<int, int>& bucket_seq_to_instance_idx,
1237
186k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1238
186k
    int idx = 1;
1239
186k
    bool do_local_exchange = false;
1240
186k
    do {
1241
186k
        auto& ops = pip->operators();
1242
186k
        do_local_exchange = false;
1243
        // Plan local exchange for each operator.
1244
194k
        for (; idx < ops.size();) {
1245
8.90k
            auto _le_req = ops[idx]->required_data_distribution(_runtime_state.get());
1246
8.90k
            if (_le_req.need_local_exchange()) {
1247
6.65k
                RETURN_IF_ERROR(_add_local_exchange(
1248
6.65k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, _le_req,
1249
6.65k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1250
6.65k
                        shuffle_idx_to_instance_idx));
1251
6.65k
            }
1252
8.90k
            if (do_local_exchange) {
1253
                // If local exchange is needed for current operator, we will split this pipeline to
1254
                // two pipelines by local exchange sink/source. And then we need to process remaining
1255
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1256
                // is current operator was already processed) and continue to plan local exchange.
1257
264
                idx = 2;
1258
264
                break;
1259
264
            }
1260
8.64k
            idx++;
1261
8.64k
        }
1262
186k
    } while (do_local_exchange);
1263
186k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1264
5.96k
        RETURN_IF_ERROR(_add_local_exchange(
1265
5.96k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1266
5.96k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1267
5.96k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1268
5.96k
    }
1269
186k
    return Status::OK();
1270
186k
}
1271
1272
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1273
                                                  const std::vector<TExpr>& output_exprs,
1274
                                                  const TPipelineFragmentParams& params,
1275
                                                  const RowDescriptor& row_desc,
1276
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1277
451k
                                                  PipelineId cur_pipeline_id) {
1278
451k
    switch (thrift_sink.type) {
1279
149k
    case TDataSinkType::DATA_STREAM_SINK: {
1280
149k
        if (!thrift_sink.__isset.stream_sink) {
1281
0
            return Status::InternalError("Missing data stream sink.");
1282
0
        }
1283
149k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1284
149k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1285
149k
                params.destinations, _fragment_instance_ids);
1286
149k
        break;
1287
149k
    }
1288
260k
    case TDataSinkType::RESULT_SINK: {
1289
260k
        if (!thrift_sink.__isset.result_sink) {
1290
0
            return Status::InternalError("Missing data buffer sink.");
1291
0
        }
1292
1293
260k
        auto& pipeline = _pipelines[cur_pipeline_id];
1294
260k
        int child_node_id = pipeline->operators().back()->node_id();
1295
260k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), child_node_id + 1,
1296
260k
                                                      row_desc, output_exprs,
1297
260k
                                                      thrift_sink.result_sink);
1298
260k
        break;
1299
260k
    }
1300
104
    case TDataSinkType::DICTIONARY_SINK: {
1301
104
        if (!thrift_sink.__isset.dictionary_sink) {
1302
0
            return Status::InternalError("Missing dict sink.");
1303
0
        }
1304
1305
104
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1306
104
                                                    thrift_sink.dictionary_sink);
1307
104
        break;
1308
104
    }
1309
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1310
34.7k
    case TDataSinkType::OLAP_TABLE_SINK: {
1311
34.7k
        auto& pipeline = _pipelines[cur_pipeline_id];
1312
34.7k
        int child_node_id = pipeline->operators().back()->node_id();
1313
34.7k
        if (state->query_options().enable_memtable_on_sink_node &&
1314
34.7k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1315
34.7k
            !_has_row_binlog(thrift_sink.olap_table_sink) && !config::is_cloud_mode()) {
1316
2.85k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(
1317
2.85k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1318
31.9k
        } else {
1319
31.9k
            _sink = std::make_shared<OlapTableSinkOperatorX>(
1320
31.9k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1321
31.9k
        }
1322
34.7k
        break;
1323
0
    }
1324
168
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1325
168
        DCHECK(thrift_sink.__isset.olap_table_sink);
1326
168
        DCHECK(state->get_query_ctx() != nullptr);
1327
168
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1328
168
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1329
168
                                                                output_exprs);
1330
168
        break;
1331
0
    }
1332
1.48k
    case TDataSinkType::HIVE_TABLE_SINK: {
1333
1.48k
        if (!thrift_sink.__isset.hive_table_sink) {
1334
0
            return Status::InternalError("Missing hive table sink.");
1335
0
        }
1336
1.48k
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1337
1.48k
                                                         output_exprs);
1338
1.48k
        break;
1339
1.48k
    }
1340
1.75k
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1341
1.75k
        if (!thrift_sink.__isset.iceberg_table_sink) {
1342
0
            return Status::InternalError("Missing iceberg table sink.");
1343
0
        }
1344
1.75k
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1345
4
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1346
4
                                                                     row_desc, output_exprs);
1347
1.74k
        } else {
1348
1.74k
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1349
1.74k
                                                                row_desc, output_exprs);
1350
1.74k
        }
1351
1.75k
        break;
1352
1.75k
    }
1353
22
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1354
22
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1355
0
            return Status::InternalError("Missing iceberg delete sink.");
1356
0
        }
1357
22
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1358
22
                                                             row_desc, output_exprs);
1359
22
        break;
1360
22
    }
1361
80
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1362
80
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1363
0
            return Status::InternalError("Missing iceberg merge sink.");
1364
0
        }
1365
80
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1366
80
                                                            output_exprs);
1367
80
        break;
1368
80
    }
1369
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1370
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1371
0
            return Status::InternalError("Missing max compute table sink.");
1372
0
        }
1373
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1374
0
                                                       output_exprs);
1375
0
        break;
1376
0
    }
1377
88
    case TDataSinkType::JDBC_TABLE_SINK: {
1378
88
        if (!thrift_sink.__isset.jdbc_table_sink) {
1379
0
            return Status::InternalError("Missing data jdbc sink.");
1380
0
        }
1381
88
        if (config::enable_java_support) {
1382
88
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1383
88
                                                             output_exprs);
1384
88
        } else {
1385
0
            return Status::InternalError(
1386
0
                    "Jdbc table sink is not enabled, you can change be config "
1387
0
                    "enable_java_support to true and restart be.");
1388
0
        }
1389
88
        break;
1390
88
    }
1391
88
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1392
3
        if (!thrift_sink.__isset.memory_scratch_sink) {
1393
0
            return Status::InternalError("Missing data buffer sink.");
1394
0
        }
1395
1396
3
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1397
3
                                                             output_exprs);
1398
3
        break;
1399
3
    }
1400
503
    case TDataSinkType::RESULT_FILE_SINK: {
1401
503
        if (!thrift_sink.__isset.result_file_sink) {
1402
0
            return Status::InternalError("Missing result file sink.");
1403
0
        }
1404
1405
        // Result file sink is not the top sink
1406
503
        if (params.__isset.destinations && !params.destinations.empty()) {
1407
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1408
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1409
0
                    params.destinations, output_exprs, desc_tbl);
1410
503
        } else {
1411
503
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1412
503
                                                              output_exprs);
1413
503
        }
1414
503
        break;
1415
503
    }
1416
2.33k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1417
2.33k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1418
2.33k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1419
2.33k
        auto sink_id = next_sink_operator_id();
1420
2.33k
        const int multi_cast_node_id = sink_id;
1421
2.33k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1422
        // one sink has multiple sources.
1423
2.33k
        std::vector<int> sources;
1424
9.14k
        for (int i = 0; i < sender_size; ++i) {
1425
6.80k
            auto source_id = next_operator_id();
1426
6.80k
            sources.push_back(source_id);
1427
6.80k
        }
1428
1429
2.33k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1430
2.33k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1431
9.14k
        for (int i = 0; i < sender_size; ++i) {
1432
6.80k
            auto new_pipeline = add_pipeline();
1433
            // use to exchange sink
1434
6.80k
            RowDescriptor* exchange_row_desc = nullptr;
1435
6.80k
            {
1436
6.80k
                const auto& tmp_row_desc =
1437
6.80k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1438
6.80k
                                ? RowDescriptor(state->desc_tbl(),
1439
6.80k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1440
6.80k
                                                         .output_tuple_id})
1441
6.80k
                                : row_desc;
1442
6.80k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1443
6.80k
            }
1444
6.80k
            auto source_id = sources[i];
1445
6.80k
            OperatorPtr source_op;
1446
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1447
6.80k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1448
6.80k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1449
6.80k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1450
6.80k
                    /*operator_id=*/source_id);
1451
6.80k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1452
6.80k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1453
            // 2. create and set sink operator of data stream sender for new pipeline
1454
1455
6.80k
            DataSinkOperatorPtr sink_op;
1456
6.80k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1457
6.80k
                    state, *exchange_row_desc, next_sink_operator_id(),
1458
6.80k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1459
6.80k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1460
1461
6.80k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1462
6.80k
            {
1463
6.80k
                TDataSink* t = pool->add(new TDataSink());
1464
6.80k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1465
6.80k
                RETURN_IF_ERROR(sink_op->init(*t));
1466
6.80k
            }
1467
1468
            // 3. set dependency dag
1469
6.80k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1470
6.80k
        }
1471
2.33k
        if (sources.empty()) {
1472
0
            return Status::InternalError("size of sources must be greater than 0");
1473
0
        }
1474
2.33k
        break;
1475
2.33k
    }
1476
2.33k
    case TDataSinkType::BLACKHOLE_SINK: {
1477
13
        if (!thrift_sink.__isset.blackhole_sink) {
1478
0
            return Status::InternalError("Missing blackhole sink.");
1479
0
        }
1480
1481
13
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1482
13
        break;
1483
13
    }
1484
156
    case TDataSinkType::TVF_TABLE_SINK: {
1485
156
        if (!thrift_sink.__isset.tvf_table_sink) {
1486
0
            return Status::InternalError("Missing TVF table sink.");
1487
0
        }
1488
156
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1489
156
                                                        output_exprs);
1490
156
        break;
1491
156
    }
1492
0
    default:
1493
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1494
451k
    }
1495
451k
    return Status::OK();
1496
451k
}
1497
1498
// NOLINTBEGIN(readability-function-size)
1499
// NOLINTBEGIN(readability-function-cognitive-complexity)
1500
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1501
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1502
                                                 PipelinePtr& cur_pipe, int parent_idx,
1503
                                                 int child_idx,
1504
                                                 const bool followed_by_shuffled_operator,
1505
                                                 const bool require_bucket_distribution,
1506
796k
                                                 OperatorPtr& cache_op) {
1507
796k
    std::vector<DataSinkOperatorPtr> sink_ops;
1508
796k
    Defer defer = Defer([&]() {
1509
795k
        if (op) {
1510
794k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1511
794k
        }
1512
795k
        for (auto& s : sink_ops) {
1513
234k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1514
234k
        }
1515
795k
    });
1516
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1517
    // Therefore, here we need to use a stack-like structure.
1518
796k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1519
796k
    std::stringstream error_msg;
1520
796k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1521
1522
796k
    bool fe_with_old_version = false;
1523
796k
    switch (tnode.node_type) {
1524
221k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1525
221k
        op = std::make_shared<OlapScanOperatorX>(
1526
221k
                pool, tnode, next_operator_id(), descs, _num_instances,
1527
221k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1528
221k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1529
221k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1530
221k
        break;
1531
221k
    }
1532
79
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1533
79
        DCHECK(_query_ctx != nullptr);
1534
79
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1535
79
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1536
79
                                                    _num_instances);
1537
79
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1538
79
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1539
79
        break;
1540
79
    }
1541
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1542
0
        if (config::enable_java_support) {
1543
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1544
0
                                                     _num_instances);
1545
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1546
0
        } else {
1547
0
            return Status::InternalError(
1548
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1549
0
                    "to true and restart be.");
1550
0
        }
1551
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1552
0
        break;
1553
0
    }
1554
26.0k
    case TPlanNodeType::FILE_SCAN_NODE: {
1555
26.0k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1556
26.0k
                                                 _num_instances);
1557
26.0k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1558
26.0k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1559
26.0k
        break;
1560
26.0k
    }
1561
154k
    case TPlanNodeType::EXCHANGE_NODE: {
1562
154k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1563
154k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1564
18.4E
                                  : 0;
1565
154k
        DCHECK_GT(num_senders, 0);
1566
154k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1567
154k
                                                       num_senders);
1568
154k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1569
154k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1570
154k
        break;
1571
154k
    }
1572
143k
    case TPlanNodeType::AGGREGATION_NODE: {
1573
143k
        if (tnode.agg_node.grouping_exprs.empty() &&
1574
143k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1575
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1576
0
                                         ": group by and output is empty");
1577
0
        }
1578
143k
        bool need_create_cache_op =
1579
143k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1580
143k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1581
10
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1582
10
            auto cache_source_id = next_operator_id();
1583
10
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1584
10
                                                        _params.fragment.query_cache_param);
1585
10
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1586
1587
10
            const auto downstream_pipeline_id = cur_pipe->id();
1588
10
            if (!_dag.contains(downstream_pipeline_id)) {
1589
10
                _dag.insert({downstream_pipeline_id, {}});
1590
10
            }
1591
10
            new_pipe = add_pipeline(cur_pipe);
1592
10
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1593
1594
10
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1595
10
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1596
10
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1597
10
            return Status::OK();
1598
10
        };
1599
143k
        const bool group_by_limit_opt =
1600
143k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1601
1602
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1603
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1604
143k
        const bool enable_spill = _runtime_state->enable_spill() &&
1605
143k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1606
143k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1607
143k
                                      tnode.agg_node.use_streaming_preaggregation &&
1608
143k
                                      !tnode.agg_node.grouping_exprs.empty();
1609
        // TODO: distinct streaming agg does not support spill.
1610
143k
        const bool can_use_distinct_streaming_agg =
1611
143k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1612
143k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1613
143k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1614
143k
                _params.query_options.enable_distinct_streaming_aggregation;
1615
1616
143k
        if (can_use_distinct_streaming_agg) {
1617
86.9k
            if (need_create_cache_op) {
1618
8
                PipelinePtr new_pipe;
1619
8
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1620
1621
8
                cache_op = op;
1622
8
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1623
8
                                                                     tnode, descs);
1624
8
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1625
8
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1626
8
                cur_pipe = new_pipe;
1627
86.8k
            } else {
1628
86.8k
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1629
86.8k
                                                                     tnode, descs);
1630
86.8k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1631
86.8k
            }
1632
86.9k
        } else if (is_streaming_agg) {
1633
1.57k
            if (need_create_cache_op) {
1634
0
                PipelinePtr new_pipe;
1635
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1636
0
                cache_op = op;
1637
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1638
0
                                                             descs);
1639
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1640
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1641
0
                cur_pipe = new_pipe;
1642
1.57k
            } else {
1643
1.57k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1644
1.57k
                                                             descs);
1645
1.57k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1646
1.57k
            }
1647
55.0k
        } else {
1648
            // create new pipeline to add query cache operator
1649
55.0k
            PipelinePtr new_pipe;
1650
55.0k
            if (need_create_cache_op) {
1651
2
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1652
2
                cache_op = op;
1653
2
            }
1654
1655
55.0k
            if (enable_spill) {
1656
37
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1657
37
                                                                     next_operator_id(), descs);
1658
55.0k
            } else {
1659
55.0k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1660
55.0k
            }
1661
55.0k
            if (need_create_cache_op) {
1662
2
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1663
2
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1664
2
                cur_pipe = new_pipe;
1665
55.0k
            } else {
1666
55.0k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1667
55.0k
            }
1668
1669
55.0k
            const auto downstream_pipeline_id = cur_pipe->id();
1670
55.0k
            if (!_dag.contains(downstream_pipeline_id)) {
1671
52.3k
                _dag.insert({downstream_pipeline_id, {}});
1672
52.3k
            }
1673
55.0k
            cur_pipe = add_pipeline(cur_pipe);
1674
55.0k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1675
1676
55.0k
            if (enable_spill) {
1677
37
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1678
37
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1679
55.0k
            } else {
1680
55.0k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1681
55.0k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1682
55.0k
            }
1683
55.0k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1684
55.0k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1685
55.0k
        }
1686
143k
        break;
1687
143k
    }
1688
143k
    case TPlanNodeType::BUCKETED_AGGREGATION_NODE: {
1689
84
        if (tnode.bucketed_agg_node.grouping_exprs.empty()) {
1690
0
            return Status::InternalError(
1691
0
                    "Bucketed aggregation node {} should not be used without group by keys",
1692
0
                    tnode.node_id);
1693
0
        }
1694
1695
        // Create source operator (goes on the current / downstream pipeline).
1696
84
        op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1697
84
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1698
1699
        // Create a new pipeline for the sink side.
1700
84
        const auto downstream_pipeline_id = cur_pipe->id();
1701
84
        if (!_dag.contains(downstream_pipeline_id)) {
1702
84
            _dag.insert({downstream_pipeline_id, {}});
1703
84
        }
1704
84
        cur_pipe = add_pipeline(cur_pipe);
1705
84
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1706
1707
        // Create sink operator.
1708
84
        sink_ops.push_back(std::make_shared<BucketedAggSinkOperatorX>(
1709
84
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1710
84
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1711
84
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1712
1713
        // Pre-register a single shared state for ALL instances so that every
1714
        // sink instance writes its per-instance hash table into the same
1715
        // BucketedAggSharedState and every source instance can merge across
1716
        // all of them.
1717
84
        {
1718
84
            auto shared_state = BucketedAggSharedState::create_shared();
1719
84
            shared_state->id = op->operator_id();
1720
84
            shared_state->related_op_ids.insert(op->operator_id());
1721
1722
568
            for (int i = 0; i < _num_instances; i++) {
1723
484
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1724
484
                                                             "BUCKETED_AGG_SINK_DEPENDENCY");
1725
484
                sink_dep->set_shared_state(shared_state.get());
1726
484
                shared_state->sink_deps.push_back(sink_dep);
1727
484
            }
1728
84
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1729
84
                                                     op->node_id(), "BUCKETED_AGG_SOURCE");
1730
84
            _op_id_to_shared_state.insert(
1731
84
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1732
84
        }
1733
84
        break;
1734
84
    }
1735
10.4k
    case TPlanNodeType::HASH_JOIN_NODE: {
1736
10.4k
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1737
10.4k
                                       tnode.hash_join_node.is_broadcast_join;
1738
10.4k
        const auto enable_spill = _runtime_state->enable_spill();
1739
10.4k
        if (enable_spill && !is_broadcast_join) {
1740
0
            auto tnode_ = tnode;
1741
0
            tnode_.runtime_filters.clear();
1742
0
            auto inner_probe_operator =
1743
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1744
1745
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1746
            // So here use `tnode_` which has no runtime filters.
1747
0
            auto probe_side_inner_sink_operator =
1748
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1749
1750
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1751
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1752
1753
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1754
0
                    pool, tnode_, next_operator_id(), descs);
1755
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1756
0
                                                inner_probe_operator);
1757
0
            op = std::move(probe_operator);
1758
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1759
1760
0
            const auto downstream_pipeline_id = cur_pipe->id();
1761
0
            if (!_dag.contains(downstream_pipeline_id)) {
1762
0
                _dag.insert({downstream_pipeline_id, {}});
1763
0
            }
1764
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1765
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1766
1767
0
            auto inner_sink_operator =
1768
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1769
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1770
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1771
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1772
1773
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1774
0
            sink_ops.push_back(std::move(sink_operator));
1775
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1776
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1777
1778
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1779
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1780
10.4k
        } else {
1781
10.4k
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1782
10.4k
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1783
1784
10.4k
            const auto downstream_pipeline_id = cur_pipe->id();
1785
10.4k
            if (!_dag.contains(downstream_pipeline_id)) {
1786
8.81k
                _dag.insert({downstream_pipeline_id, {}});
1787
8.81k
            }
1788
10.4k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1789
10.4k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1790
1791
10.4k
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1792
10.4k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1793
10.4k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1794
10.4k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1795
1796
10.4k
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1797
10.4k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1798
10.4k
        }
1799
10.4k
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1800
3.09k
            std::shared_ptr<HashJoinSharedState> shared_state =
1801
3.09k
                    HashJoinSharedState::create_shared(_num_instances);
1802
19.3k
            for (int i = 0; i < _num_instances; i++) {
1803
16.2k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1804
16.2k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1805
16.2k
                sink_dep->set_shared_state(shared_state.get());
1806
16.2k
                shared_state->sink_deps.push_back(sink_dep);
1807
16.2k
            }
1808
3.09k
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1809
3.09k
                                                     op->node_id(), "HASH_JOIN_PROBE");
1810
3.09k
            _op_id_to_shared_state.insert(
1811
3.09k
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1812
3.09k
        }
1813
10.4k
        break;
1814
10.4k
    }
1815
5.70k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1816
5.70k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1817
5.70k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1818
1819
5.70k
        const auto downstream_pipeline_id = cur_pipe->id();
1820
5.70k
        if (!_dag.contains(downstream_pipeline_id)) {
1821
5.45k
            _dag.insert({downstream_pipeline_id, {}});
1822
5.45k
        }
1823
5.70k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1824
5.70k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1825
1826
5.70k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1827
5.70k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1828
5.70k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1829
5.70k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1830
5.70k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1831
5.70k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1832
5.70k
        break;
1833
5.70k
    }
1834
54.6k
    case TPlanNodeType::UNION_NODE: {
1835
54.6k
        int child_count = tnode.num_children;
1836
54.6k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1837
54.6k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1838
1839
54.6k
        const auto downstream_pipeline_id = cur_pipe->id();
1840
54.6k
        if (!_dag.contains(downstream_pipeline_id)) {
1841
54.0k
            _dag.insert({downstream_pipeline_id, {}});
1842
54.0k
        }
1843
56.1k
        for (int i = 0; i < child_count; i++) {
1844
1.52k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1845
1.52k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1846
1.52k
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1847
1.52k
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1848
1.52k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1849
1.52k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1850
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1851
1.52k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1852
1.52k
        }
1853
54.6k
        break;
1854
54.6k
    }
1855
54.6k
    case TPlanNodeType::SORT_NODE: {
1856
46.0k
        const auto should_spill = _runtime_state->enable_spill() &&
1857
46.0k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1858
46.0k
        const bool use_local_merge =
1859
46.0k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1860
46.0k
        if (should_spill) {
1861
9
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1862
46.0k
        } else if (use_local_merge) {
1863
43.5k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1864
43.5k
                                                                 descs);
1865
43.5k
        } else {
1866
2.45k
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1867
2.45k
        }
1868
46.0k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1869
1870
46.0k
        const auto downstream_pipeline_id = cur_pipe->id();
1871
46.0k
        if (!_dag.contains(downstream_pipeline_id)) {
1872
46.0k
            _dag.insert({downstream_pipeline_id, {}});
1873
46.0k
        }
1874
46.0k
        cur_pipe = add_pipeline(cur_pipe);
1875
46.0k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1876
1877
46.0k
        if (should_spill) {
1878
9
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1879
9
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1880
46.0k
        } else {
1881
46.0k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1882
46.0k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1883
46.0k
        }
1884
46.0k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1885
46.0k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1886
46.0k
        break;
1887
46.0k
    }
1888
46.0k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1889
70
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1890
70
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1891
1892
70
        const auto downstream_pipeline_id = cur_pipe->id();
1893
70
        if (!_dag.contains(downstream_pipeline_id)) {
1894
70
            _dag.insert({downstream_pipeline_id, {}});
1895
70
        }
1896
70
        cur_pipe = add_pipeline(cur_pipe);
1897
70
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1898
1899
70
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1900
70
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1901
70
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1902
70
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1903
70
        break;
1904
70
    }
1905
1.80k
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1906
1.80k
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1907
1.80k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1908
1909
1.80k
        const auto downstream_pipeline_id = cur_pipe->id();
1910
1.80k
        if (!_dag.contains(downstream_pipeline_id)) {
1911
1.79k
            _dag.insert({downstream_pipeline_id, {}});
1912
1.79k
        }
1913
1.80k
        cur_pipe = add_pipeline(cur_pipe);
1914
1.80k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1915
1916
1.80k
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1917
1.80k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1918
1.80k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1919
1.80k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1920
1.80k
        break;
1921
1.80k
    }
1922
1.80k
    case TPlanNodeType::MATERIALIZATION_NODE: {
1923
1.73k
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1924
1.73k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1925
1.73k
        break;
1926
1.73k
    }
1927
1.73k
    case TPlanNodeType::INTERSECT_NODE: {
1928
168
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1929
168
                                                                      cur_pipe, sink_ops));
1930
168
        break;
1931
168
    }
1932
168
    case TPlanNodeType::EXCEPT_NODE: {
1933
159
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1934
159
                                                                       cur_pipe, sink_ops));
1935
159
        break;
1936
159
    }
1937
340
    case TPlanNodeType::REPEAT_NODE: {
1938
340
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1939
340
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1940
340
        break;
1941
340
    }
1942
920
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1943
920
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1944
920
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1945
920
        break;
1946
920
    }
1947
920
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1948
218
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1949
218
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1950
218
        break;
1951
218
    }
1952
1.68k
    case TPlanNodeType::EMPTY_SET_NODE: {
1953
1.68k
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1954
1.68k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1955
1.68k
        break;
1956
1.68k
    }
1957
1.68k
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1958
486
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1959
486
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1960
486
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1961
486
        break;
1962
486
    }
1963
2.09k
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1964
2.09k
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1965
2.09k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1966
2.09k
        break;
1967
2.09k
    }
1968
6.74k
    case TPlanNodeType::META_SCAN_NODE: {
1969
6.74k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1970
6.74k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1971
6.74k
        break;
1972
6.74k
    }
1973
6.74k
    case TPlanNodeType::SELECT_NODE: {
1974
2.34k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1975
2.34k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1976
2.34k
        break;
1977
2.34k
    }
1978
2.34k
    case TPlanNodeType::REC_CTE_NODE: {
1979
146
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1980
146
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1981
1982
146
        const auto downstream_pipeline_id = cur_pipe->id();
1983
146
        if (!_dag.contains(downstream_pipeline_id)) {
1984
144
            _dag.insert({downstream_pipeline_id, {}});
1985
144
        }
1986
1987
146
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1988
146
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1989
1990
146
        DataSinkOperatorPtr anchor_sink;
1991
146
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1992
146
                                                                  op->operator_id(), tnode, descs);
1993
146
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1994
146
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1995
146
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1996
1997
146
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1998
146
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1999
2000
146
        DataSinkOperatorPtr rec_sink;
2001
146
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
2002
146
                                                         tnode, descs);
2003
146
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
2004
146
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
2005
146
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
2006
2007
146
        break;
2008
146
    }
2009
2.08k
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
2010
2.08k
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
2011
2.08k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2012
2.08k
        break;
2013
2.08k
    }
2014
113k
    case TPlanNodeType::LOCAL_EXCHANGE_NODE: {
2015
113k
        op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs);
2016
        // The downstream pipeline (containing LocalExchangeSource) must have
2017
        // _num_instances tasks — matching BE-native _inherit_pipeline_properties
2018
        // which sets pipe_with_source.set_num_tasks(_num_instances).
2019
        // Without this, when the parent pipeline was reduced by a serial operator
2020
        // (e.g., serial Exchange with use_serial_exchange=true, or UNPARTITIONED
2021
        // Exchange), the downstream inherits the reduced num_tasks via
2022
        // add_pipeline(parent).  The deferred exchanger creates _num_instances
2023
        // channels but only fewer source tasks initialize mem_counters — the
2024
        // sink round-robins to all channels and crashes on uninitialized ones.
2025
113k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2026
        // Restore downstream pipeline's num_tasks (mirroring _inherit_pipeline_properties:
2027
        // downstream keeps _num_instances, upstream gets the serial/reduced count)
2028
113k
        cur_pipe->set_num_tasks(_num_instances);
2029
2030
113k
        const auto downstream_pipeline_id = cur_pipe->id();
2031
113k
        if (!_dag.contains(downstream_pipeline_id)) {
2032
108k
            _dag.insert({downstream_pipeline_id, {}});
2033
108k
        }
2034
113k
        cur_pipe = add_pipeline(cur_pipe);
2035
        // If this local exchange was inserted because of a serial scan (is_serial_operator),
2036
        // the upstream pipeline (cur_pipe) should have num_tasks=1 (only 1 scan task).
2037
        // We set this now so the exchanger is created with the correct sender count.
2038
        // Child operators added later (serial scan) will also set num_tasks=1, which is
2039
        // consistent with this.
2040
113k
        if (op->is_serial_operator() && _parallel_instances > 0) {
2041
0
            cur_pipe->set_num_tasks(_parallel_instances);
2042
0
        }
2043
113k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
2044
113k
        int num_partitions = 0;
2045
113k
        std::map<int, int> shuffle_id_to_instance_idx;
2046
113k
        auto partition_type = tnode.local_exchange_node.partition_type;
2047
113k
        switch (partition_type) {
2048
502
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
2049
502
            num_partitions = _params.num_buckets;
2050
502
            shuffle_id_to_instance_idx = _params.bucket_seq_to_instance_idx;
2051
502
            break;
2052
13.8k
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
2053
126k
            for (int i = 0; i < _num_instances; i++) {
2054
112k
                shuffle_id_to_instance_idx[i] = i;
2055
112k
            }
2056
13.8k
            num_partitions = _num_instances;
2057
13.8k
            break;
2058
6
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
2059
6
            num_partitions = _total_instances;
2060
6
            shuffle_id_to_instance_idx = _params.shuffle_idx_to_instance_idx;
2061
6
            break;
2062
98.7k
        default:
2063
98.7k
            break;
2064
113k
        }
2065
113k
        auto local_exchange_id = op->operator_id();
2066
113k
        auto sink_id = next_sink_operator_id();
2067
113k
        DataSinkOperatorPtr sink = std::make_shared<LocalExchangeSinkOperatorX>(
2068
113k
                sink_id, local_exchange_id, tnode, num_partitions, shuffle_id_to_instance_idx);
2069
113k
        sink_ops.push_back(sink);
2070
113k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
2071
113k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
2072
2073
        // For FE-planned local exchange, we need to:
2074
        // 1. Initialize the partitioner for hash shuffle types
2075
        // 2. Defer exchanger creation until after the full plan tree is built
2076
        //    (child operators like serial ExchangeNode may change cur_pipe->num_tasks())
2077
        // 3. Register shared state so pipeline tasks can find it
2078
113k
        RETURN_IF_ERROR(static_cast<LocalExchangeSinkOperatorX*>(cur_pipe->sink())
2079
113k
                                ->init_partitioner(_runtime_state.get()));
2080
2081
113k
        int free_blocks_limit =
2082
113k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
2083
113k
                        ? cast_set<int>(
2084
113k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
2085
18.4E
                        : 0;
2086
113k
        auto shared_state = LocalExchangeSharedState::create_shared(_num_instances);
2087
113k
        shared_state->create_source_dependencies(_num_instances, local_exchange_id,
2088
113k
                                                 local_exchange_id, "LOCAL_EXCHANGE_OPERATOR");
2089
113k
        shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
2090
113k
        _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
2091
        // Defer exchanger creation: sender count depends on final upstream num_tasks
2092
113k
        _deferred_exchangers.push_back({shared_state, cur_pipe, partition_type, num_partitions,
2093
113k
                                        free_blocks_limit, local_exchange_id, sink_id});
2094
113k
        break;
2095
113k
    }
2096
0
    default:
2097
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
2098
0
                                     print_plan_node_type(tnode.node_type));
2099
796k
    }
2100
794k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
2101
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
2102
0
        op->set_serial_operator();
2103
0
    }
2104
2105
794k
    return Status::OK();
2106
796k
}
2107
// NOLINTEND(readability-function-cognitive-complexity)
2108
// NOLINTEND(readability-function-size)
2109
2110
template <bool is_intersect>
2111
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
2112
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
2113
327
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2114
327
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2115
327
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2116
2117
327
    const auto downstream_pipeline_id = cur_pipe->id();
2118
327
    if (!_dag.contains(downstream_pipeline_id)) {
2119
299
        _dag.insert({downstream_pipeline_id, {}});
2120
299
    }
2121
2122
1.07k
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2123
750
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2124
750
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2125
2126
750
        if (child_id == 0) {
2127
327
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2128
327
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2129
423
        } else {
2130
423
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2131
423
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2132
423
        }
2133
750
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2134
750
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2135
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2136
750
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2137
750
    }
2138
2139
327
    return Status::OK();
2140
327
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
2113
168
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2114
168
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2115
168
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2116
2117
168
    const auto downstream_pipeline_id = cur_pipe->id();
2118
168
    if (!_dag.contains(downstream_pipeline_id)) {
2119
151
        _dag.insert({downstream_pipeline_id, {}});
2120
151
    }
2121
2122
585
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2123
417
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2124
417
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2125
2126
417
        if (child_id == 0) {
2127
168
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2128
168
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2129
249
        } else {
2130
249
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2131
249
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2132
249
        }
2133
417
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2134
417
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2135
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2136
417
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2137
417
    }
2138
2139
168
    return Status::OK();
2140
168
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
2113
159
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2114
159
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2115
159
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2116
2117
159
    const auto downstream_pipeline_id = cur_pipe->id();
2118
159
    if (!_dag.contains(downstream_pipeline_id)) {
2119
148
        _dag.insert({downstream_pipeline_id, {}});
2120
148
    }
2121
2122
492
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2123
333
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2124
333
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2125
2126
333
        if (child_id == 0) {
2127
159
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2128
159
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2129
174
        } else {
2130
174
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2131
174
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2132
174
        }
2133
333
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2134
333
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2135
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2136
333
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2137
333
    }
2138
2139
159
    return Status::OK();
2140
159
}
2141
2142
450k
Status PipelineFragmentContext::submit() {
2143
450k
    if (_submitted) {
2144
0
        return Status::InternalError("submitted");
2145
0
    }
2146
450k
    _submitted = true;
2147
2148
450k
    int submit_tasks = 0;
2149
450k
    Status st;
2150
450k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
2151
1.22M
    for (auto& task : _tasks) {
2152
2.01M
        for (auto& t : task) {
2153
2.01M
            st = scheduler->submit(t.first);
2154
2.01M
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
2155
2.01M
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
2156
2.01M
            if (!st) {
2157
0
                cancel(Status::InternalError("submit context to executor fail"));
2158
0
                std::lock_guard<std::mutex> l(_task_mutex);
2159
0
                _total_tasks = submit_tasks;
2160
0
                break;
2161
0
            }
2162
2.01M
            submit_tasks++;
2163
2.01M
        }
2164
1.22M
    }
2165
450k
    if (!st.ok()) {
2166
0
        bool need_remove = false;
2167
0
        {
2168
0
            std::lock_guard<std::mutex> l(_task_mutex);
2169
0
            if (_closed_tasks >= _total_tasks) {
2170
0
                need_remove = _close_fragment_instance();
2171
0
            }
2172
0
        }
2173
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2174
0
        if (need_remove) {
2175
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2176
0
        }
2177
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
2178
0
                                     BackendOptions::get_localhost());
2179
450k
    } else {
2180
450k
        return st;
2181
450k
    }
2182
450k
}
2183
2184
0
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
2185
0
    if (_runtime_state->enable_profile()) {
2186
0
        std::stringstream ss;
2187
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2188
0
            runtime_profile_ptr->pretty_print(&ss);
2189
0
        }
2190
2191
0
        if (_runtime_state->load_channel_profile()) {
2192
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2193
0
        }
2194
2195
0
        auto profile_str =
2196
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
2197
0
                            this->_fragment_id, extra_info, ss.str());
2198
0
        LOG_LONG_STRING(INFO, profile_str);
2199
0
    }
2200
0
}
2201
// If all pipeline tasks binded to the fragment instance are finished, then we could
2202
// close the fragment instance.
2203
// Returns true if the caller should call remove_pipeline_context() **after** releasing
2204
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
2205
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
2206
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
2207
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
2208
// (via debug_string()).
2209
451k
bool PipelineFragmentContext::_close_fragment_instance() {
2210
451k
    if (_is_fragment_instance_closed) {
2211
0
        return false;
2212
0
    }
2213
451k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
2214
451k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
2215
451k
    if (!_need_notify_close) {
2216
448k
        auto st = send_report(true);
2217
448k
        if (!st) {
2218
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
2219
0
                                        print_id(_query_id), _fragment_id, st.to_string());
2220
0
        }
2221
448k
    }
2222
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
2223
    // Since stream load does not have someting like coordinator on FE, so
2224
    // backend can not report profile to FE, ant its profile can not be shown
2225
    // in the same way with other query. So we print the profile content to info log.
2226
2227
451k
    if (_runtime_state->enable_profile() &&
2228
451k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
2229
3.18k
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
2230
3.18k
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
2231
0
        std::stringstream ss;
2232
        // Compute the _local_time_percent before pretty_print the runtime_profile
2233
        // Before add this operation, the print out like that:
2234
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
2235
        // After add the operation, the print out like that:
2236
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
2237
        // We can easily know the exec node execute time without child time consumed.
2238
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2239
0
            runtime_profile_ptr->pretty_print(&ss);
2240
0
        }
2241
2242
0
        if (_runtime_state->load_channel_profile()) {
2243
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2244
0
        }
2245
2246
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
2247
0
    }
2248
2249
451k
    if (_query_ctx->enable_profile()) {
2250
3.18k
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
2251
3.18k
                                         collect_realtime_load_channel_profile());
2252
3.18k
    }
2253
2254
    // Return whether the caller needs to remove from the pipeline map.
2255
    // The caller must do this after releasing _task_mutex.
2256
451k
    return !_need_notify_close;
2257
451k
}
2258
2259
2.00M
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
2260
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
2261
2.00M
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
2262
2.00M
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
2263
694k
        if (_dag.contains(pipeline_id)) {
2264
285k
            for (auto dep : _dag[pipeline_id]) {
2265
243k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
2266
243k
            }
2267
285k
        }
2268
694k
    }
2269
2.00M
    bool need_remove = false;
2270
2.00M
    {
2271
2.00M
        std::lock_guard<std::mutex> l(_task_mutex);
2272
2.00M
        ++_closed_tasks;
2273
        // Update query-level finished task progress in real time.
2274
2.00M
        _query_ctx->inc_finished_task_num();
2275
2.00M
        if (_closed_tasks >= _total_tasks) {
2276
451k
            need_remove = _close_fragment_instance();
2277
451k
        }
2278
2.00M
    }
2279
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2280
2.00M
    if (need_remove) {
2281
448k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2282
448k
    }
2283
2.00M
}
2284
2285
57.0k
std::string PipelineFragmentContext::get_load_error_url() {
2286
57.0k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
2287
0
        return to_load_error_http_path(str);
2288
0
    }
2289
153k
    for (auto& tasks : _tasks) {
2290
242k
        for (auto& task : tasks) {
2291
242k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
2292
190
                return to_load_error_http_path(str);
2293
190
            }
2294
242k
        }
2295
153k
    }
2296
56.8k
    return "";
2297
57.0k
}
2298
2299
57.0k
std::string PipelineFragmentContext::get_first_error_msg() {
2300
57.0k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
2301
0
        return str;
2302
0
    }
2303
153k
    for (auto& tasks : _tasks) {
2304
242k
        for (auto& task : tasks) {
2305
242k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
2306
190
                return str;
2307
190
            }
2308
242k
        }
2309
153k
    }
2310
56.8k
    return "";
2311
57.0k
}
2312
2313
0
std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const {
2314
0
    std::stringstream url;
2315
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
2316
0
        << "/api/_download_load?"
2317
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
2318
0
    return url.str();
2319
0
}
2320
2321
50.4k
void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) {
2322
50.4k
    DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
2323
50.4k
        int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
2324
50.4k
        LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
2325
50.4k
        std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
2326
50.4k
        LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
2327
50.4k
    });
2328
2329
50.4k
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
2330
50.4k
    if (req.coord_addr.hostname == "external") {
2331
        // External query (flink/spark read tablets) not need to report to FE.
2332
0
        return;
2333
0
    }
2334
50.4k
    int callback_retries = 10;
2335
50.4k
    const int sleep_ms = 1000;
2336
50.4k
    Status exec_status = req.status;
2337
50.4k
    Status coord_status;
2338
50.4k
    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
2339
50.4k
    do {
2340
50.4k
        coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
2341
50.4k
                                                            req.coord_addr, &coord_status);
2342
50.4k
        if (!coord_status.ok()) {
2343
0
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
2344
0
        }
2345
50.4k
    } while (!coord_status.ok() && callback_retries-- > 0);
2346
2347
50.4k
    if (!coord_status.ok()) {
2348
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
2349
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
2350
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
2351
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
2352
0
        return;
2353
0
    }
2354
2355
50.4k
    TReportExecStatusParams params;
2356
50.4k
    params.protocol_version = FrontendServiceVersion::V1;
2357
50.4k
    params.__set_query_id(req.query_id);
2358
50.4k
    params.__set_backend_num(req.backend_num);
2359
50.4k
    params.__set_fragment_instance_id(req.fragment_instance_id);
2360
50.4k
    params.__set_fragment_id(req.fragment_id);
2361
50.4k
    params.__set_status(exec_status.to_thrift());
2362
50.4k
    params.__set_done(req.done);
2363
50.4k
    params.__set_query_type(req.runtime_state->query_type());
2364
50.4k
    params.__isset.profile = false;
2365
2366
50.4k
    DCHECK(req.runtime_state != nullptr);
2367
2368
50.4k
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
2369
44.9k
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2370
44.9k
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2371
44.9k
    } else {
2372
5.43k
        DCHECK(!req.runtime_states.empty());
2373
5.43k
        if (!req.runtime_state->output_files().empty()) {
2374
0
            params.__isset.delta_urls = true;
2375
0
            for (auto& it : req.runtime_state->output_files()) {
2376
0
                params.delta_urls.push_back(_to_http_path(it));
2377
0
            }
2378
0
        }
2379
5.43k
        if (!params.delta_urls.empty()) {
2380
0
            params.__isset.delta_urls = true;
2381
0
        }
2382
5.43k
    }
2383
2384
50.4k
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
2385
50.4k
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
2386
50.4k
    static std::string s_unselected_rows = "unselected.rows";
2387
50.4k
    int64_t num_rows_load_success = 0;
2388
50.4k
    int64_t num_rows_load_filtered = 0;
2389
50.4k
    int64_t num_rows_load_unselected = 0;
2390
50.4k
    if (req.runtime_state->num_rows_load_total() > 0 ||
2391
50.4k
        req.runtime_state->num_rows_load_filtered() > 0 ||
2392
50.4k
        req.runtime_state->num_finished_range() > 0) {
2393
0
        params.__isset.load_counters = true;
2394
2395
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
2396
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
2397
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
2398
0
        params.__isset.fragment_instance_reports = true;
2399
0
        TFragmentInstanceReport t;
2400
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
2401
0
        t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
2402
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2403
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2404
0
        params.fragment_instance_reports.push_back(t);
2405
50.4k
    } else if (!req.runtime_states.empty()) {
2406
162k
        for (auto* rs : req.runtime_states) {
2407
162k
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
2408
162k
                rs->num_finished_range() > 0) {
2409
37.9k
                params.__isset.load_counters = true;
2410
37.9k
                num_rows_load_success += rs->num_rows_load_success();
2411
37.9k
                num_rows_load_filtered += rs->num_rows_load_filtered();
2412
37.9k
                num_rows_load_unselected += rs->num_rows_load_unselected();
2413
37.9k
                params.__isset.fragment_instance_reports = true;
2414
37.9k
                TFragmentInstanceReport t;
2415
37.9k
                t.__set_fragment_instance_id(rs->fragment_instance_id());
2416
37.9k
                t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
2417
37.9k
                t.__set_loaded_rows(rs->num_rows_load_total());
2418
37.9k
                t.__set_loaded_bytes(rs->num_bytes_load_total());
2419
37.9k
                params.fragment_instance_reports.push_back(t);
2420
37.9k
            }
2421
162k
        }
2422
50.3k
    }
2423
50.4k
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
2424
50.4k
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
2425
50.4k
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
2426
2427
50.4k
    if (!req.load_error_url.empty()) {
2428
175
        params.__set_tracking_url(req.load_error_url);
2429
175
    }
2430
50.4k
    if (!req.first_error_msg.empty()) {
2431
175
        params.__set_first_error_msg(req.first_error_msg);
2432
175
    }
2433
162k
    for (auto* rs : req.runtime_states) {
2434
162k
        if (rs->wal_id() > 0) {
2435
105
            params.__set_txn_id(rs->wal_id());
2436
105
            params.__set_label(rs->import_label());
2437
105
        }
2438
162k
    }
2439
50.4k
    if (!req.runtime_state->export_output_files().empty()) {
2440
0
        params.__isset.export_files = true;
2441
0
        params.export_files = req.runtime_state->export_output_files();
2442
50.4k
    } else if (!req.runtime_states.empty()) {
2443
162k
        for (auto* rs : req.runtime_states) {
2444
162k
            if (!rs->export_output_files().empty()) {
2445
0
                params.__isset.export_files = true;
2446
0
                params.export_files.insert(params.export_files.end(),
2447
0
                                           rs->export_output_files().begin(),
2448
0
                                           rs->export_output_files().end());
2449
0
            }
2450
162k
        }
2451
50.2k
    }
2452
50.4k
    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
2453
0
        params.__isset.commitInfos = true;
2454
0
        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
2455
50.4k
    } else if (!req.runtime_states.empty()) {
2456
162k
        for (auto* rs : req.runtime_states) {
2457
162k
            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
2458
28.2k
                params.__isset.commitInfos = true;
2459
28.2k
                params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
2460
28.2k
            }
2461
162k
        }
2462
50.3k
    }
2463
50.4k
    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
2464
0
        params.__isset.errorTabletInfos = true;
2465
0
        params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
2466
50.4k
    } else if (!req.runtime_states.empty()) {
2467
162k
        for (auto* rs : req.runtime_states) {
2468
162k
            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
2469
0
                params.__isset.errorTabletInfos = true;
2470
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
2471
0
                                               rs_eti.end());
2472
0
            }
2473
162k
        }
2474
50.2k
    }
2475
50.4k
    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
2476
0
        params.__isset.hive_partition_updates = true;
2477
0
        params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
2478
0
                                             hpu.end());
2479
50.4k
    } else if (!req.runtime_states.empty()) {
2480
162k
        for (auto* rs : req.runtime_states) {
2481
162k
            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
2482
2.17k
                params.__isset.hive_partition_updates = true;
2483
2.17k
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
2484
2.17k
                                                     rs_hpu.begin(), rs_hpu.end());
2485
2.17k
            }
2486
162k
        }
2487
50.3k
    }
2488
50.4k
    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
2489
0
        params.__isset.iceberg_commit_datas = true;
2490
0
        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
2491
0
                                           icd.end());
2492
50.4k
    } else if (!req.runtime_states.empty()) {
2493
162k
        for (auto* rs : req.runtime_states) {
2494
162k
            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
2495
2.10k
                params.__isset.iceberg_commit_datas = true;
2496
2.10k
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
2497
2.10k
                                                   rs_icd.begin(), rs_icd.end());
2498
2.10k
            }
2499
162k
        }
2500
50.3k
    }
2501
2502
50.4k
    if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
2503
0
        params.__isset.mc_commit_datas = true;
2504
0
        params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
2505
50.4k
    } else if (!req.runtime_states.empty()) {
2506
162k
        for (auto* rs : req.runtime_states) {
2507
162k
            if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
2508
0
                params.__isset.mc_commit_datas = true;
2509
0
                params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
2510
0
                                              rs_mcd.end());
2511
0
            }
2512
162k
        }
2513
50.3k
    }
2514
2515
50.4k
    req.runtime_state->get_unreported_errors(&(params.error_log));
2516
50.4k
    params.__isset.error_log = (!params.error_log.empty());
2517
2518
50.4k
    if (_exec_env->cluster_info()->backend_id != 0) {
2519
50.2k
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
2520
50.2k
    }
2521
2522
50.4k
    TReportExecStatusResult res;
2523
50.4k
    Status rpc_status;
2524
2525
50.4k
    VLOG_DEBUG << "reportExecStatus params is "
2526
135
               << apache::thrift::ThriftDebugString(params).c_str();
2527
50.4k
    if (!exec_status.ok()) {
2528
1.69k
        LOG(WARNING) << "report error status: " << exec_status.msg()
2529
1.69k
                     << " to coordinator: " << req.coord_addr
2530
1.69k
                     << ", query id: " << print_id(req.query_id);
2531
1.69k
    }
2532
50.4k
    try {
2533
50.4k
        try {
2534
50.4k
            (*coord)->reportExecStatus(res, params);
2535
50.4k
        } catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) {
2536
#ifndef ADDRESS_SANITIZER
2537
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
2538
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
2539
                         << req.coord_addr << ", err: " << e.what();
2540
#endif
2541
0
            rpc_status = coord->reopen();
2542
2543
0
            if (!rpc_status.ok()) {
2544
0
                req.cancel_fn(rpc_status);
2545
0
                return;
2546
0
            }
2547
0
            (*coord)->reportExecStatus(res, params);
2548
0
        }
2549
2550
50.3k
        rpc_status = Status::create<false>(res.status);
2551
50.3k
    } catch (apache::thrift::TException& e) {
2552
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
2553
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
2554
0
    }
2555
2556
50.3k
    if (!rpc_status.ok()) {
2557
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
2558
0
                 print_id(req.query_id), rpc_status.to_string());
2559
0
        req.cancel_fn(rpc_status);
2560
0
    }
2561
50.3k
}
2562
2563
453k
Status PipelineFragmentContext::send_report(bool done) {
2564
453k
    Status exec_status = _query_ctx->exec_status();
2565
    // If plan is done successfully, but _is_report_success is false,
2566
    // no need to send report.
2567
    // Load will set _is_report_success to true because load wants to know
2568
    // the process.
2569
453k
    if (!_is_report_success && done && exec_status.ok()) {
2570
402k
        return Status::OK();
2571
402k
    }
2572
2573
    // If both _is_report_success and _is_report_on_cancel are false,
2574
    // which means no matter query is success or failed, no report is needed.
2575
    // This may happen when the query limit reached and
2576
    // a internal cancellation being processed
2577
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
2578
    // be set to false, to avoid sending fault report to FE.
2579
50.7k
    if (!_is_report_success && !_is_report_on_cancel) {
2580
304
        if (done) {
2581
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
2582
304
            return Status::OK();
2583
304
        }
2584
0
        return Status::NeedSendAgain("");
2585
304
    }
2586
2587
50.4k
    std::vector<RuntimeState*> runtime_states;
2588
2589
115k
    for (auto& tasks : _tasks) {
2590
162k
        for (auto& task : tasks) {
2591
162k
            runtime_states.push_back(task.second.get());
2592
162k
        }
2593
115k
    }
2594
2595
50.4k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
2596
50.4k
                                        ? get_load_error_url()
2597
50.4k
                                        : _query_ctx->get_load_error_url();
2598
50.4k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2599
50.4k
                                          ? get_first_error_msg()
2600
50.4k
                                          : _query_ctx->get_first_error_msg();
2601
2602
50.4k
    ReportStatusRequest req {.status = exec_status,
2603
50.4k
                             .runtime_states = runtime_states,
2604
50.4k
                             .done = done || !exec_status.ok(),
2605
50.4k
                             .coord_addr = _query_ctx->coord_addr,
2606
50.4k
                             .query_id = _query_id,
2607
50.4k
                             .fragment_id = _fragment_id,
2608
50.4k
                             .fragment_instance_id = TUniqueId(),
2609
50.4k
                             .backend_num = -1,
2610
50.4k
                             .runtime_state = _runtime_state.get(),
2611
50.4k
                             .load_error_url = load_eror_url,
2612
50.4k
                             .first_error_msg = first_error_msg,
2613
50.4k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2614
50.4k
    auto ctx = std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this());
2615
50.4k
    return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() {
2616
50.4k
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
2617
50.4k
        _coordinator_callback(req);
2618
50.4k
        if (!req.done) {
2619
5.07k
            ctx->refresh_next_report_time();
2620
5.07k
        }
2621
50.4k
    });
2622
50.7k
}
2623
2624
0
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2625
0
    size_t res = 0;
2626
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2627
    // here to traverse the vector.
2628
0
    for (const auto& task_instances : _tasks) {
2629
0
        for (const auto& task : task_instances) {
2630
0
            if (task.first->is_running()) {
2631
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2632
0
                                      << " is running, task: " << (void*)task.first.get()
2633
0
                                      << ", is_running: " << task.first->is_running();
2634
0
                *has_running_task = true;
2635
0
                return 0;
2636
0
            }
2637
2638
0
            size_t revocable_size = task.first->get_revocable_size();
2639
0
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2640
0
                res += revocable_size;
2641
0
            }
2642
0
        }
2643
0
    }
2644
0
    return res;
2645
0
}
2646
2647
0
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2648
0
    std::vector<PipelineTask*> revocable_tasks;
2649
0
    for (const auto& task_instances : _tasks) {
2650
0
        for (const auto& task : task_instances) {
2651
0
            size_t revocable_size_ = task.first->get_revocable_size();
2652
2653
0
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2654
0
                revocable_tasks.emplace_back(task.first.get());
2655
0
            }
2656
0
        }
2657
0
    }
2658
0
    return revocable_tasks;
2659
0
}
2660
2661
251
std::string PipelineFragmentContext::debug_string() {
2662
251
    std::lock_guard<std::mutex> l(_task_mutex);
2663
251
    fmt::memory_buffer debug_string_buffer;
2664
251
    fmt::format_to(debug_string_buffer,
2665
251
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2666
251
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2667
251
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2668
1.01k
    for (size_t j = 0; j < _tasks.size(); j++) {
2669
767
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2670
2.22k
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2671
1.45k
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2672
1.45k
                           _tasks[j][i].first->debug_string());
2673
1.45k
        }
2674
767
    }
2675
2676
251
    return fmt::to_string(debug_string_buffer);
2677
251
}
2678
2679
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2680
3.18k
PipelineFragmentContext::collect_realtime_profile() const {
2681
3.18k
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2682
2683
    // we do not have mutex to protect pipeline_id_to_profile
2684
    // so we need to make sure this funciton is invoked after fragment context
2685
    // has already been prepared.
2686
3.18k
    if (!_prepared) {
2687
0
        std::string msg =
2688
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2689
0
        DCHECK(false) << msg;
2690
0
        LOG_ERROR(msg);
2691
0
        return res;
2692
0
    }
2693
2694
    // Make sure first profile is fragment level profile
2695
3.18k
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2696
3.18k
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2697
3.18k
    res.push_back(fragment_profile);
2698
2699
    // pipeline_id_to_profile is initialized in prepare stage
2700
6.22k
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2701
6.22k
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2702
6.22k
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2703
6.22k
        res.push_back(profile_ptr);
2704
6.22k
    }
2705
2706
3.18k
    return res;
2707
3.18k
}
2708
2709
std::shared_ptr<TRuntimeProfileTree>
2710
3.18k
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2711
    // we do not have mutex to protect pipeline_id_to_profile
2712
    // so we need to make sure this funciton is invoked after fragment context
2713
    // has already been prepared.
2714
3.18k
    if (!_prepared) {
2715
0
        std::string msg =
2716
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2717
0
        DCHECK(false) << msg;
2718
0
        LOG_ERROR(msg);
2719
0
        return nullptr;
2720
0
    }
2721
2722
10.5k
    for (const auto& tasks : _tasks) {
2723
22.1k
        for (const auto& task : tasks) {
2724
22.1k
            if (task.second->load_channel_profile() == nullptr) {
2725
0
                continue;
2726
0
            }
2727
2728
22.1k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2729
2730
22.1k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2731
22.1k
                                                           _runtime_state->profile_level());
2732
22.1k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2733
22.1k
        }
2734
10.5k
    }
2735
2736
3.18k
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2737
3.18k
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2738
3.18k
                                                      _runtime_state->profile_level());
2739
3.18k
    return load_channel_profile;
2740
3.18k
}
2741
2742
// Collect runtime filter IDs registered by all tasks in this PFC.
2743
// Used during recursive CTE stage transitions to know which filters to deregister
2744
// before creating the new PFC for the next recursion round.
2745
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2746
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2747
// the vector sizes do not change, and individual RuntimeState filter sets are
2748
// written only during open() which has completed by the time we reach rerun.
2749
3.40k
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2750
3.40k
    std::set<int> result;
2751
5.27k
    for (const auto& _task : _tasks) {
2752
8.07k
        for (const auto& task : _task) {
2753
8.07k
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2754
8.07k
            result.merge(set);
2755
8.07k
        }
2756
5.27k
    }
2757
3.40k
    if (_runtime_state) {
2758
3.40k
        auto set = _runtime_state->get_deregister_runtime_filter();
2759
3.40k
        result.merge(set);
2760
3.40k
    }
2761
3.40k
    return result;
2762
3.40k
}
2763
2764
453k
void PipelineFragmentContext::_release_resource() {
2765
453k
    std::lock_guard<std::mutex> l(_task_mutex);
2766
    // The memory released by the query end is recorded in the query mem tracker.
2767
453k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2768
453k
    auto st = _query_ctx->exec_status();
2769
1.23M
    for (auto& _task : _tasks) {
2770
1.23M
        if (!_task.empty()) {
2771
1.23M
            _call_back(_task.front().first->runtime_state(), &st);
2772
1.23M
        }
2773
1.23M
    }
2774
453k
    _tasks.clear();
2775
453k
    _dag.clear();
2776
453k
    _pip_id_to_pipeline.clear();
2777
453k
    _pipelines.clear();
2778
453k
    _sink.reset();
2779
453k
    _root_op.reset();
2780
453k
    _runtime_filter_mgr_map.clear();
2781
453k
    _op_id_to_shared_state.clear();
2782
453k
}
2783
2784
} // namespace doris