Coverage Report

Created: 2026-07-01 05:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/FrontendService.h>
22
#include <gen_cpp/FrontendService_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/PlanNodes_types.h>
25
#include <pthread.h>
26
27
#include <algorithm>
28
#include <cstdlib>
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <fmt/format.h>
31
#include <thrift/Thrift.h>
32
#include <thrift/protocol/TDebugProtocol.h>
33
#include <thrift/transport/TTransportException.h>
34
35
#include <chrono> // IWYU pragma: keep
36
#include <map>
37
#include <memory>
38
#include <ostream>
39
#include <utility>
40
41
#include "cloud/config.h"
42
#include "common/cast_set.h"
43
#include "common/config.h"
44
#include "common/exception.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "exec/exchange/local_exchange_sink_operator.h"
48
#include "exec/exchange/local_exchange_source_operator.h"
49
#include "exec/exchange/local_exchanger.h"
50
#include "exec/exchange/vdata_stream_mgr.h"
51
#include "exec/operator/aggregation_sink_operator.h"
52
#include "exec/operator/aggregation_source_operator.h"
53
#include "exec/operator/analytic_sink_operator.h"
54
#include "exec/operator/analytic_source_operator.h"
55
#include "exec/operator/assert_num_rows_operator.h"
56
#include "exec/operator/blackhole_sink_operator.h"
57
#include "exec/operator/bucketed_aggregation_sink_operator.h"
58
#include "exec/operator/bucketed_aggregation_source_operator.h"
59
#include "exec/operator/cache_sink_operator.h"
60
#include "exec/operator/cache_source_operator.h"
61
#include "exec/operator/datagen_operator.h"
62
#include "exec/operator/dict_sink_operator.h"
63
#include "exec/operator/distinct_streaming_aggregation_operator.h"
64
#include "exec/operator/empty_set_operator.h"
65
#include "exec/operator/exchange_sink_operator.h"
66
#include "exec/operator/exchange_source_operator.h"
67
#include "exec/operator/file_scan_operator.h"
68
#include "exec/operator/group_commit_block_sink_operator.h"
69
#include "exec/operator/group_commit_scan_operator.h"
70
#include "exec/operator/hashjoin_build_sink.h"
71
#include "exec/operator/hashjoin_probe_operator.h"
72
#include "exec/operator/hive_table_sink_operator.h"
73
#include "exec/operator/iceberg_delete_sink_operator.h"
74
#include "exec/operator/iceberg_merge_sink_operator.h"
75
#include "exec/operator/iceberg_table_sink_operator.h"
76
#include "exec/operator/jdbc_scan_operator.h"
77
#include "exec/operator/jdbc_table_sink_operator.h"
78
#include "exec/operator/local_merge_sort_source_operator.h"
79
#include "exec/operator/materialization_opertor.h"
80
#include "exec/operator/maxcompute_table_sink_operator.h"
81
#include "exec/operator/memory_scratch_sink_operator.h"
82
#include "exec/operator/meta_scan_operator.h"
83
#include "exec/operator/multi_cast_data_stream_sink.h"
84
#include "exec/operator/multi_cast_data_stream_source.h"
85
#include "exec/operator/nested_loop_join_build_operator.h"
86
#include "exec/operator/nested_loop_join_probe_operator.h"
87
#include "exec/operator/olap_scan_operator.h"
88
#include "exec/operator/olap_table_sink_operator.h"
89
#include "exec/operator/olap_table_sink_v2_operator.h"
90
#include "exec/operator/partition_sort_sink_operator.h"
91
#include "exec/operator/partition_sort_source_operator.h"
92
#include "exec/operator/partitioned_aggregation_sink_operator.h"
93
#include "exec/operator/partitioned_aggregation_source_operator.h"
94
#include "exec/operator/partitioned_hash_join_probe_operator.h"
95
#include "exec/operator/partitioned_hash_join_sink_operator.h"
96
#include "exec/operator/rec_cte_anchor_sink_operator.h"
97
#include "exec/operator/rec_cte_scan_operator.h"
98
#include "exec/operator/rec_cte_sink_operator.h"
99
#include "exec/operator/rec_cte_source_operator.h"
100
#include "exec/operator/repeat_operator.h"
101
#include "exec/operator/result_file_sink_operator.h"
102
#include "exec/operator/result_sink_operator.h"
103
#include "exec/operator/schema_scan_operator.h"
104
#include "exec/operator/select_operator.h"
105
#include "exec/operator/set_probe_sink_operator.h"
106
#include "exec/operator/set_sink_operator.h"
107
#include "exec/operator/set_source_operator.h"
108
#include "exec/operator/sort_sink_operator.h"
109
#include "exec/operator/sort_source_operator.h"
110
#include "exec/operator/spill_iceberg_table_sink_operator.h"
111
#include "exec/operator/spill_sort_sink_operator.h"
112
#include "exec/operator/spill_sort_source_operator.h"
113
#include "exec/operator/streaming_aggregation_operator.h"
114
#include "exec/operator/table_function_operator.h"
115
#include "exec/operator/tvf_table_sink_operator.h"
116
#include "exec/operator/union_sink_operator.h"
117
#include "exec/operator/union_source_operator.h"
118
#include "exec/pipeline/dependency.h"
119
#include "exec/pipeline/pipeline_task.h"
120
#include "exec/pipeline/task_scheduler.h"
121
#include "exec/runtime_filter/runtime_filter_mgr.h"
122
#include "exec/sort/topn_sorter.h"
123
#include "exec/spill/spill_file.h"
124
#include "io/fs/stream_load_pipe.h"
125
#include "load/stream_load/new_load_stream_mgr.h"
126
#include "runtime/exec_env.h"
127
#include "runtime/fragment_mgr.h"
128
#include "runtime/result_buffer_mgr.h"
129
#include "runtime/runtime_state.h"
130
#include "runtime/thread_context.h"
131
#include "service/backend_options.h"
132
#include "util/client_cache.h"
133
#include "util/countdown_latch.h"
134
#include "util/debug_util.h"
135
#include "util/network_util.h"
136
#include "util/uid_util.h"
137
138
namespace doris {
139
PipelineFragmentContext::PipelineFragmentContext(
140
        TUniqueId query_id, const TPipelineFragmentParams& request,
141
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
142
        const std::function<void(RuntimeState*, Status*)>& call_back)
143
333k
        : _query_id(std::move(query_id)),
144
333k
          _fragment_id(request.fragment_id),
145
333k
          _exec_env(exec_env),
146
333k
          _query_ctx(std::move(query_ctx)),
147
333k
          _call_back(call_back),
148
333k
          _is_report_on_cancel(true),
149
333k
          _params(request),
150
333k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
151
333k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
152
333k
                                                               : false) {
153
333k
    _fragment_watcher.start();
154
333k
}
155
156
333k
PipelineFragmentContext::~PipelineFragmentContext() {
157
333k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
158
333k
            .tag("query_id", print_id(_query_id))
159
333k
            .tag("fragment_id", _fragment_id);
160
333k
    _release_resource();
161
333k
    {
162
        // The memory released by the query end is recorded in the query mem tracker.
163
333k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
164
333k
        _runtime_state.reset();
165
333k
        _query_ctx.reset();
166
333k
    }
167
333k
}
168
169
136
bool PipelineFragmentContext::is_timeout(timespec now) const {
170
136
    if (_timeout <= 0) {
171
0
        return false;
172
0
    }
173
136
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
174
136
}
175
176
// notify_close() transitions the PFC from "waiting for external close notification" to
177
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
178
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
179
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
180
9.07k
bool PipelineFragmentContext::notify_close() {
181
9.07k
    bool all_closed = false;
182
9.07k
    bool need_remove = false;
183
9.07k
    {
184
9.07k
        std::lock_guard<std::mutex> l(_task_mutex);
185
9.07k
        if (_closed_tasks >= _total_tasks) {
186
3.74k
            if (_need_notify_close) {
187
                // Fragment was cancelled and waiting for notify to close.
188
                // Record that we need to remove from fragment mgr, but do it
189
                // after releasing _task_mutex to avoid ABBA deadlock with
190
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
191
                // first, then _task_mutex via debug_string()).
192
3.69k
                need_remove = true;
193
3.69k
            }
194
3.74k
            all_closed = true;
195
3.74k
        }
196
        // make fragment release by self after cancel
197
9.07k
        _need_notify_close = false;
198
9.07k
    }
199
9.07k
    if (need_remove) {
200
3.69k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
201
3.69k
    }
202
9.07k
    return all_closed;
203
9.07k
}
204
205
// Must not add lock in this method. Because it will call query ctx cancel. And
206
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
207
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
208
// There maybe dead lock.
209
5.36k
void PipelineFragmentContext::cancel(const Status reason) {
210
5.36k
    LOG_INFO("PipelineFragmentContext::cancel")
211
5.36k
            .tag("query_id", print_id(_query_id))
212
5.36k
            .tag("fragment_id", _fragment_id)
213
5.36k
            .tag("reason", reason.to_string());
214
5.36k
    if (notify_close()) {
215
61
        return;
216
61
    }
217
    // Timeout is a special error code, we need print current stack to debug timeout issue.
218
5.29k
    if (reason.is<ErrorCode::TIMEOUT>()) {
219
4
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
220
4
                                   debug_string());
221
4
        LOG_LONG_STRING(WARNING, dbg_str);
222
4
    }
223
224
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
225
5.29k
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
226
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
227
0
                    debug_string());
228
0
    }
229
230
5.29k
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
231
0
        print_profile("cancel pipeline, reason: " + reason.to_string());
232
0
    }
233
234
5.29k
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
235
23
        _query_ctx->set_load_error_url(error_url);
236
23
    }
237
238
5.29k
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
239
23
        _query_ctx->set_first_error_msg(first_error_msg);
240
23
    }
241
242
5.29k
    _query_ctx->cancel(reason, _fragment_id);
243
5.29k
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
244
211
        _is_report_on_cancel = false;
245
5.08k
    } else {
246
21.6k
        for (auto& id : _fragment_instance_ids) {
247
21.6k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
248
21.6k
        }
249
5.08k
    }
250
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
251
    // For stream load the fragment's query_id == load id, it is set in FE.
252
5.29k
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
253
5.29k
    if (stream_load_ctx != nullptr) {
254
31
        stream_load_ctx->pipe->cancel(reason.to_string());
255
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
256
        // We need to set the error URL at this point to ensure error information is properly
257
        // propagated to the client.
258
31
        stream_load_ctx->error_url = get_load_error_url();
259
31
        stream_load_ctx->first_error_msg = get_first_error_msg();
260
31
    }
261
262
22.7k
    for (auto& tasks : _tasks) {
263
53.7k
        for (auto& task : tasks) {
264
53.7k
            task.first->unblock_all_dependencies();
265
53.7k
        }
266
22.7k
    }
267
5.29k
}
268
269
520k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
270
520k
    PipelineId id = _next_pipeline_id++;
271
520k
    auto pipeline = std::make_shared<Pipeline>(
272
520k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
273
520k
            parent ? parent->num_tasks() : _num_instances);
274
520k
    if (idx >= 0) {
275
394
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
276
519k
    } else {
277
519k
        _pipelines.emplace_back(pipeline);
278
519k
    }
279
520k
    if (parent) {
280
185k
        parent->set_children(pipeline);
281
185k
    }
282
520k
    return pipeline;
283
520k
}
284
285
333k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
286
333k
    {
287
333k
        SCOPED_TIMER(_build_pipelines_timer);
288
        // 2. Build pipelines with operators in this fragment.
289
333k
        auto root_pipeline = add_pipeline();
290
333k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
291
333k
                                         &_root_op, root_pipeline));
292
293
        // Propagate _num_instances from LOCAL_EXCHANGE pipelines to ancestor pipelines
294
        // that inherited reduced num_tasks from a serial operator.
295
333k
        _propagate_local_exchange_num_tasks();
296
297
        // Create deferred local exchangers now that all pipelines have final num_tasks.
298
333k
        RETURN_IF_ERROR(_create_deferred_local_exchangers());
299
300
        // Raise num_tasks for pipelines whose serial non-scan operators (e.g.,
301
        // UNPARTITIONED Exchange) reduced num_tasks below _num_instances.
302
        // Without this, fragment instances 1+ have no task for these pipelines
303
        // and downstream operators fail with "must set shared state".
304
        //
305
        // This applies to ALL pipelines (not just deferred exchanger upstreams):
306
        // fragments with UNION/INTERSECT/EXCEPT + serial Exchange in child
307
        // pipelines also need the raise, even without FE-planned local exchange.
308
        //
309
        // Exception: serial scan sources (pooling scan) keep num_tasks=1 — the
310
        // PassthroughExchanger(1, N) handles the fan-out correctly.
311
        // NOTE: Do NOT raise pipelines whose source is a serial operator
312
        // (Exchange or scan) — they legitimately have 1 task, and raising
313
        // them causes crashes (e.g., 4 Exchange tasks but only 1 receives
314
        // data).  The correct fix for shared state injection across
315
        // instances is handled by the FE: it inserts local exchange nodes
316
        // between serial operators and their downstream consumers, creating
317
        // proper pipeline boundaries with _num_instances tasks.
318
319
        // 3. Create sink operator
320
333k
        if (!_params.fragment.__isset.output_sink) {
321
0
            return Status::InternalError("No output sink in this fragment!");
322
0
        }
323
333k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
324
333k
                                          _params.fragment.output_exprs, _params,
325
333k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
326
333k
                                          *_desc_tbl, root_pipeline->id()));
327
333k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
328
333k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
329
330
519k
        for (PipelinePtr& pipeline : _pipelines) {
331
519k
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
332
519k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
333
519k
        }
334
333k
    }
335
    // 4. Build local exchanger
336
333k
    if (_runtime_state->plan_local_shuffle()) {
337
85.4k
        SCOPED_TIMER(_plan_local_exchanger_timer);
338
85.4k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
339
85.4k
                                             _params.bucket_seq_to_instance_idx,
340
85.4k
                                             _params.shuffle_idx_to_instance_idx));
341
85.4k
    }
342
343
    // 5. Initialize global states in pipelines.
344
520k
    for (PipelinePtr& pipeline : _pipelines) {
345
520k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
346
520k
        pipeline->children().clear();
347
520k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
348
520k
    }
349
350
331k
    {
351
331k
        SCOPED_TIMER(_build_tasks_timer);
352
        // 6. Build pipeline tasks and initialize local state.
353
331k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
354
331k
    }
355
356
331k
    return Status::OK();
357
331k
}
358
359
333k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
360
333k
    if (_prepared) {
361
0
        return Status::InternalError("Already prepared");
362
0
    }
363
333k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
364
333k
        _timeout = _params.query_options.execution_timeout;
365
333k
    }
366
367
333k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
368
333k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
369
333k
    SCOPED_TIMER(_prepare_timer);
370
333k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
371
333k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
372
333k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
373
333k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
374
333k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
375
333k
    {
376
333k
        SCOPED_TIMER(_init_context_timer);
377
333k
        cast_set(_num_instances, _params.local_params.size());
378
333k
        _total_instances =
379
333k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
380
381
333k
        auto* fragment_context = this;
382
383
333k
        if (_params.query_options.__isset.is_report_success) {
384
331k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
385
331k
        }
386
387
        // 1. Set up the global runtime state.
388
333k
        _runtime_state = RuntimeState::create_unique(
389
333k
                _params.query_id, _params.fragment_id, _params.query_options,
390
333k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
391
333k
        _runtime_state->set_task_execution_context(shared_from_this());
392
333k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
393
333k
        if (_params.__isset.backend_id) {
394
330k
            _runtime_state->set_backend_id(_params.backend_id);
395
330k
        }
396
333k
        if (_params.__isset.import_label) {
397
241
            _runtime_state->set_import_label(_params.import_label);
398
241
        }
399
333k
        if (_params.__isset.db_name) {
400
192
            _runtime_state->set_db_name(_params.db_name);
401
192
        }
402
333k
        if (_params.__isset.load_job_id) {
403
0
            _runtime_state->set_load_job_id(_params.load_job_id);
404
0
        }
405
406
333k
        if (_params.is_simplified_param) {
407
118k
            _desc_tbl = _query_ctx->desc_tbl;
408
215k
        } else {
409
215k
            DCHECK(_params.__isset.desc_tbl);
410
215k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
411
215k
                                                  &_desc_tbl));
412
215k
        }
413
333k
        _runtime_state->set_desc_tbl(_desc_tbl);
414
333k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
415
333k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
416
333k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
417
333k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
418
419
        // init fragment_instance_ids
420
333k
        const auto target_size = _params.local_params.size();
421
333k
        _fragment_instance_ids.resize(target_size);
422
1.24M
        for (size_t i = 0; i < _params.local_params.size(); i++) {
423
915k
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
424
915k
            _fragment_instance_ids[i] = fragment_instance_id;
425
915k
        }
426
333k
    }
427
428
333k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
429
430
332k
    _init_next_report_time();
431
432
332k
    _prepared = true;
433
332k
    return Status::OK();
434
333k
}
435
436
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
437
        int instance_idx,
438
914k
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
439
914k
    const auto& local_params = _params.local_params[instance_idx];
440
914k
    auto fragment_instance_id = local_params.fragment_instance_id;
441
914k
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
442
914k
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
443
914k
    auto get_shared_state = [&](PipelinePtr pipeline)
444
914k
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
445
1.54M
                                       std::vector<std::shared_ptr<Dependency>>>> {
446
1.54M
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
447
1.54M
                                std::vector<std::shared_ptr<Dependency>>>>
448
1.54M
                shared_state_map;
449
2.01M
        for (auto& op : pipeline->operators()) {
450
2.01M
            auto source_id = op->operator_id();
451
2.01M
            if (auto iter = _op_id_to_shared_state.find(source_id);
452
2.01M
                iter != _op_id_to_shared_state.end()) {
453
664k
                shared_state_map.insert({source_id, iter->second});
454
664k
            }
455
2.01M
        }
456
1.54M
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
457
1.54M
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
458
1.54M
                iter != _op_id_to_shared_state.end()) {
459
308k
                shared_state_map.insert({sink_to_source_id, iter->second});
460
308k
            }
461
1.54M
        }
462
1.54M
        return shared_state_map;
463
1.54M
    };
464
465
2.81M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
466
1.89M
        auto& pipeline = _pipelines[pip_idx];
467
1.89M
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
468
1.54M
            auto task_runtime_state = RuntimeState::create_unique(
469
1.54M
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
470
1.54M
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
471
1.54M
            {
472
                // Initialize runtime state for this task
473
1.54M
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
474
475
1.54M
                task_runtime_state->set_task_execution_context(shared_from_this());
476
1.54M
                task_runtime_state->set_be_number(local_params.backend_num);
477
478
1.54M
                if (_params.__isset.backend_id) {
479
1.54M
                    task_runtime_state->set_backend_id(_params.backend_id);
480
1.54M
                }
481
1.54M
                if (_params.__isset.import_label) {
482
242
                    task_runtime_state->set_import_label(_params.import_label);
483
242
                }
484
1.54M
                if (_params.__isset.db_name) {
485
193
                    task_runtime_state->set_db_name(_params.db_name);
486
193
                }
487
1.54M
                if (_params.__isset.load_job_id) {
488
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
489
0
                }
490
1.54M
                if (_params.__isset.wal_id) {
491
115
                    task_runtime_state->set_wal_id(_params.wal_id);
492
115
                }
493
1.54M
                if (_params.__isset.content_length) {
494
34
                    task_runtime_state->set_content_length(_params.content_length);
495
34
                }
496
497
1.54M
                task_runtime_state->set_desc_tbl(_desc_tbl);
498
1.54M
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
499
1.54M
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
500
1.54M
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
501
1.54M
                task_runtime_state->set_max_operator_id(max_operator_id());
502
1.54M
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
503
1.54M
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
504
1.54M
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
505
506
1.54M
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
507
1.54M
            }
508
1.54M
            auto cur_task_id = _total_tasks++;
509
1.54M
            task_runtime_state->set_task_id(cur_task_id);
510
1.54M
            task_runtime_state->set_task_num(pipeline->num_tasks());
511
1.54M
            auto task = std::make_shared<PipelineTask>(
512
1.54M
                    pipeline, cur_task_id, task_runtime_state.get(),
513
1.54M
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
514
1.54M
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
515
1.54M
                    instance_idx);
516
1.54M
            pipeline->incr_created_tasks(instance_idx, task.get());
517
1.54M
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
518
1.54M
            _tasks[instance_idx].emplace_back(
519
1.54M
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
520
1.54M
                            std::move(task), std::move(task_runtime_state)});
521
1.54M
        }
522
1.89M
    }
523
524
    /**
525
         * Build DAG for pipeline tasks.
526
         * For example, we have
527
         *
528
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
529
         *            \                      /
530
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
531
         *                 \                          /
532
         *               JoinProbeOperator2 (Pipeline1)
533
         *
534
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
535
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
536
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
537
         *
538
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
539
         * and JoinProbeOperator2.
540
         */
541
1.89M
    for (auto& _pipeline : _pipelines) {
542
1.89M
        if (pipeline_id_to_task.contains(_pipeline->id())) {
543
1.54M
            auto* task = pipeline_id_to_task[_pipeline->id()];
544
1.54M
            DCHECK(task != nullptr);
545
546
            // If this task has upstream dependency, then inject it into this task.
547
1.54M
            if (_dag.contains(_pipeline->id())) {
548
995k
                auto& deps = _dag[_pipeline->id()];
549
995k
                for (auto& dep : deps) {
550
988k
                    if (pipeline_id_to_task.contains(dep)) {
551
632k
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
552
632k
                        if (ss) {
553
313k
                            task->inject_shared_state(ss);
554
318k
                        } else {
555
318k
                            pipeline_id_to_task[dep]->inject_shared_state(
556
318k
                                    task->get_source_shared_state());
557
318k
                        }
558
632k
                    }
559
988k
                }
560
995k
            }
561
1.54M
        }
562
1.89M
    }
563
2.81M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
564
1.89M
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
565
1.54M
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
566
1.54M
            DCHECK(pipeline_id_to_profile[pip_idx]);
567
1.54M
            std::vector<TScanRangeParams> scan_ranges;
568
1.54M
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
569
1.54M
            if (local_params.per_node_scan_ranges.contains(node_id)) {
570
243k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
571
243k
            }
572
1.54M
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
573
1.54M
                                                             _params.fragment.output_sink));
574
1.54M
        }
575
1.89M
    }
576
916k
    {
577
916k
        std::lock_guard<std::mutex> l(_state_map_lock);
578
916k
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
579
916k
    }
580
916k
    return Status::OK();
581
914k
}
582
583
332k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
584
332k
    _total_tasks = 0;
585
332k
    _closed_tasks = 0;
586
332k
    const auto target_size = _params.local_params.size();
587
332k
    _tasks.resize(target_size);
588
332k
    _runtime_filter_mgr_map.resize(target_size);
589
852k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
590
519k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
591
519k
    }
592
332k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
593
594
332k
    if (target_size > 1 &&
595
332k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
596
120k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
597
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
598
19.5k
        std::vector<Status> prepare_status(target_size);
599
19.5k
        int submitted_tasks = 0;
600
19.5k
        Status submit_status;
601
19.5k
        CountDownLatch latch((int)target_size);
602
177k
        for (int i = 0; i < target_size; i++) {
603
157k
            submit_status = thread_pool->submit_func([&, i]() {
604
157k
                SCOPED_ATTACH_TASK(_query_ctx.get());
605
157k
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
606
157k
                latch.count_down();
607
157k
            });
608
157k
            if (LIKELY(submit_status.ok())) {
609
157k
                submitted_tasks++;
610
18.4E
            } else {
611
18.4E
                break;
612
18.4E
            }
613
157k
        }
614
19.5k
        latch.arrive_and_wait(target_size - submitted_tasks);
615
19.5k
        if (UNLIKELY(!submit_status.ok())) {
616
0
            return submit_status;
617
0
        }
618
177k
        for (int i = 0; i < submitted_tasks; i++) {
619
157k
            if (!prepare_status[i].ok()) {
620
0
                return prepare_status[i];
621
0
            }
622
157k
        }
623
313k
    } else {
624
1.07M
        for (int i = 0; i < target_size; i++) {
625
758k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
626
758k
        }
627
313k
    }
628
332k
    _pipeline_parent_map.clear();
629
332k
    _op_id_to_shared_state.clear();
630
    // Record task cardinality once when this fragment context finishes task initialization.
631
332k
    _query_ctx->add_total_task_num(_total_tasks.load(std::memory_order_relaxed));
632
633
332k
    return Status::OK();
634
332k
}
635
636
331k
void PipelineFragmentContext::_init_next_report_time() {
637
331k
    auto interval_s = config::pipeline_status_report_interval;
638
331k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
639
32.3k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
640
32.3k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
641
        // We don't want to wait longer than it takes to run the entire fragment.
642
32.3k
        _previous_report_time =
643
32.3k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
644
32.3k
        _disable_period_report = false;
645
32.3k
    }
646
331k
}
647
648
3.70k
void PipelineFragmentContext::refresh_next_report_time() {
649
3.70k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
650
3.70k
    DCHECK(disable == true);
651
3.70k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
652
3.70k
    _disable_period_report.compare_exchange_strong(disable, false);
653
3.70k
}
654
655
5.49M
void PipelineFragmentContext::trigger_report_if_necessary() {
656
5.49M
    if (!_is_report_success) {
657
5.13M
        return;
658
5.13M
    }
659
353k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
660
353k
    if (disable) {
661
5.47k
        return;
662
5.47k
    }
663
347k
    int32_t interval_s = config::pipeline_status_report_interval;
664
347k
    if (interval_s <= 0) {
665
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
666
0
                        "trigger "
667
0
                        "report.";
668
0
    }
669
347k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
670
347k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
671
347k
    if (MonotonicNanos() > next_report_time) {
672
3.70k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
673
3.70k
                                                            std::memory_order_acq_rel)) {
674
1
            return;
675
1
        }
676
3.70k
        if (VLOG_FILE_IS_ON) {
677
0
            VLOG_FILE << "Reporting "
678
0
                      << "profile for query_id " << print_id(_query_id)
679
0
                      << ", fragment id: " << _fragment_id;
680
681
0
            std::stringstream ss;
682
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
683
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
684
0
            if (_runtime_state->load_channel_profile()) {
685
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
686
0
            }
687
688
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
689
0
                      << " profile:\n"
690
0
                      << ss.str();
691
0
        }
692
3.70k
        auto st = send_report(false);
693
3.70k
        if (!st.ok()) {
694
0
            disable = true;
695
0
            _disable_period_report.compare_exchange_strong(disable, false,
696
0
                                                           std::memory_order_acq_rel);
697
0
        }
698
3.70k
    }
699
347k
}
700
701
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
702
329k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
703
329k
    if (_params.fragment.plan.nodes.empty()) {
704
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
705
0
    }
706
707
329k
    int node_idx = 0;
708
709
329k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
710
329k
                                        &node_idx, root, cur_pipe, 0, false, false));
711
712
329k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
713
0
        return Status::InternalError(
714
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
715
0
    }
716
329k
    return Status::OK();
717
329k
}
718
719
332k
Status PipelineFragmentContext::_create_deferred_local_exchangers() {
720
332k
    for (auto& info : _deferred_exchangers) {
721
        // DANGER ZONE — do not "fix" this line without reading the history.
722
        //
723
        // sender_count seeds Exchanger::_running_sink_operators, which the source side
724
        // waits to reach 0 via sub_running_sink_operators on each sink LocalState close.
725
        // The correct value is THIS pipeline-instance's sink task count, which is exactly
726
        // info.upstream_pipe->num_tasks() — one PipelineTask per task, one close per task.
727
        //
728
        // Tempting wrong fix #1: `std::max(num_tasks, _num_instances)` to mirror the
729
        //   BE-planned path in _add_local_exchange_impl (~line 1023).  THIS BREAKS the
730
        //   common FE-planned shape of `serial scan → LE(PT) → ...`: upstream_pipe
731
        //   genuinely has num_tasks=1, only 1 close arrives, but seed becomes
732
        //   _num_instances so _running_sink_operators never reaches 0 — downstream
733
        //   sources hang on SHUFFLE_DATA_DEPENDENCY (e.g. MTMV refresh from
734
        //   mtmv_up_down_job_p0/load.groovy stays at Status=RUNNING and regressed
735
        //   exactly this way).  BE-planned mode uses max() because its
736
        //   `cur_pipe` is the source-side pipeline (always raised to _num_instances by
737
        //   add_pipeline) — not analogous to our `upstream_pipe` here, which is the
738
        //   sink-side pipeline that may legitimately stay at 1 for serial sources.
739
        //
740
        // Tempting wrong fix #2: multiply by _num_instances on the theory shared_state
741
        //   is shared across all instances.  Same hang — each fragment-instance
742
        //   PipelineFragmentContext has its OWN _op_id_to_shared_state map, so the
743
        //   exchanger is per-instance, not per-BE.  num_tasks() is already the right
744
        //   close-count for one instance.
745
        //
746
        // If a hang shows up with `_running_sink_operators < 0`, the bug is upstream:
747
        // _propagate_local_exchange_num_tasks left num_tasks too low (or too high) for
748
        // this fragment shape.  Fix THAT pass, not this seed value.
749
103k
        const int sender_count = info.upstream_pipe->num_tasks();
750
103k
        switch (info.partition_type) {
751
24.5k
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
752
24.5k
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
753
24.5k
            info.shared_state->exchanger = ShuffleExchanger::create_unique(
754
24.5k
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit,
755
24.5k
                    info.partition_type);
756
24.5k
            break;
757
71
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
758
71
            info.shared_state->exchanger = BucketShuffleExchanger::create_unique(
759
71
                    sender_count, _num_instances, info.num_partitions, info.free_blocks_limit);
760
71
            break;
761
75.6k
        case TLocalPartitionType::PASSTHROUGH:
762
75.6k
            info.shared_state->exchanger = PassthroughExchanger::create_unique(
763
75.6k
                    sender_count, _num_instances, info.free_blocks_limit);
764
75.6k
            break;
765
289
        case TLocalPartitionType::BROADCAST:
766
289
            info.shared_state->exchanger = BroadcastExchanger::create_unique(
767
289
                    sender_count, _num_instances, info.free_blocks_limit);
768
289
            break;
769
2.18k
        case TLocalPartitionType::PASS_TO_ONE:
770
2.18k
            if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
771
1.24k
                info.shared_state->exchanger = PassToOneExchanger::create_unique(
772
1.24k
                        sender_count, _num_instances, info.free_blocks_limit);
773
1.24k
            } else {
774
940
                info.shared_state->exchanger = BroadcastExchanger::create_unique(
775
940
                        sender_count, _num_instances, info.free_blocks_limit);
776
940
            }
777
2.18k
            break;
778
849
        case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
779
849
            info.shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
780
849
                    sender_count, _num_instances, info.free_blocks_limit);
781
849
            break;
782
0
        case TLocalPartitionType::NOOP:
783
0
        case TLocalPartitionType::LOCAL_MERGE_SORT:
784
            // FE-planned LocalExchangeNode currently never emits NOOP or LOCAL_MERGE_SORT
785
            // through the deferred-exchanger path.  NOOP means "no exchange needed" and
786
            // is filtered out before reaching here; LOCAL_MERGE_SORT is planned by the
787
            // legacy BE path only.  Crash in debug to surface the protocol violation if
788
            // that ever changes; return an error in release to avoid silently corrupting
789
            // execution.
790
0
            DCHECK(false) << "FE-planned local exchange should not emit partition_type="
791
0
                          << static_cast<int>(info.partition_type);
792
0
            return Status::InternalError("FE-planned local exchange emitted unsupported type: " +
793
0
                                         std::to_string(static_cast<int>(info.partition_type)));
794
0
        default:
795
            // New TLocalPartitionType added on FE side without a BE handler here.
796
0
            DCHECK(false) << "Unhandled TLocalPartitionType in deferred exchangers: "
797
0
                          << static_cast<int>(info.partition_type);
798
0
            return Status::InternalError("Unsupported FE-planned local exchange type: " +
799
0
                                         std::to_string(static_cast<int>(info.partition_type)));
800
103k
        }
801
103k
    }
802
332k
    _deferred_exchangers.clear();
803
332k
    return Status::OK();
804
332k
}
805
806
332k
void PipelineFragmentContext::_propagate_local_exchange_num_tasks() {
807
    // Only runs when FE has planned local exchanges and BE deferred their construction.
808
    // In legacy mode (enable_local_shuffle_planner=false) BE plans LE itself via
809
    // _plan_local_exchange and _deferred_exchangers stays empty — the legacy path
810
    // already gets its num_tasks right at construction time, so the propagate passes
811
    // would be no-ops and are skipped.  This is a transitional design: once the FE
812
    // planner is the only planner, the propagation logic itself should degrade into
813
    // a pure assertion that the FE plan already wired the right num_tasks everywhere.
814
332k
    if (_deferred_exchangers.empty()) {
815
257k
        return;
816
257k
    }
817
    // Reconcile num_tasks across paired pipelines created by pipeline-splitting operators
818
    // (AGG, SORT, JOIN): they share state via inject_shared_state and must agree, or
819
    // instance 1+ tasks access null shared_state.  A pipeline's num_tasks is fully
820
    // determined by its source operator plus its upstreams:
821
    //   - LocalExchangeSource  -> _num_instances (the LE re-parallelizes)
822
    //   - serial source        -> its reduced count (kept as-is, typically 1)
823
    //   - otherwise (splitter) -> inherit from upstreams: raise to _num_instances if any
824
    //                             upstream was raised by an LE, then lower to a serial
825
    //                             upstream's count (lower wins).
826
    // Visiting each pipeline only after all its upstreams (topological order over _dag) lets
827
    // a single sweep reach the same fixpoint the previous two while-loops iterated to — those
828
    // only existed to reconcile the top-down build's parent-inherited num_tasks guesses.
829
74.5k
    std::map<PipelineId, PipelinePtr> id_to_pipe;
830
74.5k
    std::map<PipelineId, std::vector<PipelineId>> downstreams_of;
831
74.5k
    std::map<PipelineId, int> in_degree;
832
222k
    for (auto& p : _pipelines) {
833
222k
        id_to_pipe[p->id()] = p;
834
222k
        in_degree.try_emplace(p->id(), 0);
835
222k
    }
836
143k
    for (const auto& [downstream_id, upstream_ids] : _dag) {
837
148k
        for (auto upstream_id : upstream_ids) {
838
148k
            downstreams_of[upstream_id].push_back(downstream_id);
839
148k
            in_degree[downstream_id]++;
840
148k
        }
841
143k
    }
842
74.5k
    std::vector<PipelineId> ready;
843
222k
    for (const auto& [id, deg] : in_degree) {
844
222k
        if (deg == 0) {
845
79.8k
            ready.push_back(id);
846
79.8k
        }
847
222k
    }
848
74.5k
    size_t visited = 0;
849
297k
    while (!ready.empty()) {
850
222k
        const auto id = ready.back();
851
222k
        ready.pop_back();
852
222k
        visited++;
853
222k
        auto pit = id_to_pipe.find(id);
854
222k
        if (pit != id_to_pipe.end()) {
855
222k
            auto& pipe = pit->second;
856
222k
            const auto& ops = pipe->operators();
857
222k
            const bool le_source =
858
222k
                    !ops.empty() && dynamic_cast<LocalExchangeSourceOperatorX*>(ops.front().get());
859
222k
            const bool serial_source = !ops.empty() && ops.front()->is_serial_operator();
860
222k
            if (le_source) {
861
103k
                pipe->set_num_tasks(_num_instances);
862
119k
            } else if (!serial_source) {
863
56.3k
                int target = pipe->num_tasks();
864
56.3k
                const auto up_it = _dag.find(id);
865
56.3k
                if (up_it != _dag.end()) {
866
                    // raise: any upstream already at _num_instances (e.g. an LE source)
867
39.3k
                    for (auto upstream_id : up_it->second) {
868
39.3k
                        auto uit = id_to_pipe.find(upstream_id);
869
39.3k
                        if (uit != id_to_pipe.end() && uit->second->num_tasks() >= _num_instances) {
870
39.3k
                            target = _num_instances;
871
39.3k
                            break;
872
39.3k
                        }
873
39.3k
                    }
874
                    // lower: a serial upstream with fewer tasks (wins over the raise above)
875
40.1k
                    for (auto upstream_id : up_it->second) {
876
40.1k
                        auto uit = id_to_pipe.find(upstream_id);
877
40.1k
                        if (uit != id_to_pipe.end() && uit->second->num_tasks() < target &&
878
40.1k
                            !uit->second->operators().empty() &&
879
40.1k
                            uit->second->operators().front()->is_serial_operator()) {
880
0
                            target = uit->second->num_tasks();
881
0
                        }
882
40.1k
                    }
883
39.3k
                }
884
56.3k
                pipe->set_num_tasks(target);
885
56.3k
            }
886
222k
        }
887
222k
        for (auto down : downstreams_of[id]) {
888
148k
            if (--in_degree[down] == 0) {
889
143k
                ready.push_back(down);
890
143k
            }
891
148k
        }
892
222k
    }
893
    // The pipeline DAG is acyclic; if a future change introduces a back-edge, some pipelines
894
    // stay unvisited (in_degree never reaches 0) — fail loudly rather than silently leaving
895
    // their num_tasks unreconciled.
896
74.5k
    DCHECK_EQ(visited, in_degree.size())
897
0
            << "pipeline num_tasks topological sweep visited " << visited << " of "
898
0
            << in_degree.size() << " pipelines (cycle in _dag?)";
899
74.5k
}
900
901
Status PipelineFragmentContext::_create_tree_helper(
902
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
903
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
904
613k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
905
    // propagate error case
906
613k
    if (*node_idx >= tnodes.size()) {
907
0
        return Status::InternalError(
908
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
909
0
                *node_idx, tnodes.size());
910
0
    }
911
613k
    const TPlanNode& tnode = tnodes[*node_idx];
912
913
613k
    int num_children = tnodes[*node_idx].num_children;
914
613k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
915
613k
    bool current_require_bucket_distribution = require_bucket_distribution;
916
    // TODO: Create CacheOperator is confused now
917
613k
    OperatorPtr op = nullptr;
918
613k
    OperatorPtr cache_op = nullptr;
919
613k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
920
613k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
921
613k
                                     followed_by_shuffled_operator,
922
613k
                                     current_require_bucket_distribution, cache_op));
923
    // Initialization must be done here. For example, group by expressions in agg will be used to
924
    // decide if a local shuffle should be planed, so it must be initialized here.
925
613k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
926
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
927
613k
    if (parent != nullptr) {
928
        // add to parent's child(s)
929
282k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
930
331k
    } else {
931
331k
        *root = op;
932
331k
    }
933
    /**
934
     * `TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
935
     *
936
     * For plan:
937
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
938
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
939
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
940
     *
941
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
942
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
943
     */
944
613k
    auto required_data_distribution =
945
613k
            cur_pipe->operators().empty()
946
613k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
947
613k
                    : op->required_data_distribution(_runtime_state.get());
948
613k
    current_followed_by_shuffled_operator =
949
613k
            ((followed_by_shuffled_operator ||
950
613k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
951
549k
                                             : op->is_shuffled_operator())) &&
952
613k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
953
613k
            (followed_by_shuffled_operator &&
954
501k
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
955
956
613k
    current_require_bucket_distribution =
957
613k
            ((require_bucket_distribution ||
958
613k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
959
555k
                                             : op->is_colocated_operator())) &&
960
613k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
961
613k
            (require_bucket_distribution &&
962
508k
             required_data_distribution.distribution_type == TLocalPartitionType::NOOP);
963
964
613k
    if (num_children == 0) {
965
345k
        _use_serial_source = op->is_serial_operator();
966
345k
    }
967
    // rely on that tnodes is preorder of the plan
968
896k
    for (int i = 0; i < num_children; i++) {
969
282k
        ++*node_idx;
970
282k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
971
282k
                                            current_followed_by_shuffled_operator,
972
282k
                                            current_require_bucket_distribution));
973
974
        // we are expecting a child, but have used all nodes
975
        // this means we have been given a bad tree and must fail
976
282k
        if (*node_idx >= tnodes.size()) {
977
0
            return Status::InternalError(
978
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
979
0
                    "nodes: {}",
980
0
                    *node_idx, tnodes.size());
981
0
        }
982
282k
    }
983
984
613k
    return Status::OK();
985
613k
}
986
987
void PipelineFragmentContext::_inherit_pipeline_properties(
988
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
989
394
        PipelinePtr pipe_with_sink) {
990
394
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
991
394
    pipe_with_source->set_num_tasks(_num_instances);
992
394
    pipe_with_source->set_data_distribution(data_distribution);
993
394
}
994
995
Status PipelineFragmentContext::_add_local_exchange_impl(
996
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
997
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
998
        const std::map<int, int>& bucket_seq_to_instance_idx,
999
394
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1000
394
    auto& operators = cur_pipe->operators();
1001
394
    const auto downstream_pipeline_id = cur_pipe->id();
1002
394
    auto local_exchange_id = next_operator_id();
1003
    // 1. Create a new pipeline with local exchange sink.
1004
394
    DataSinkOperatorPtr sink;
1005
394
    auto sink_id = next_sink_operator_id();
1006
1007
    /**
1008
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
1009
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
1010
     */
1011
394
    const bool followed_by_shuffled_operator =
1012
394
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
1013
394
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
1014
394
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
1015
394
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
1016
394
                                         followed_by_shuffled_operator && !_use_serial_source;
1017
394
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
1018
394
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
1019
394
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
1020
394
    if (bucket_seq_to_instance_idx.empty() &&
1021
394
        data_distribution.distribution_type == TLocalPartitionType::BUCKET_HASH_SHUFFLE) {
1022
0
        data_distribution.distribution_type =
1023
0
                use_global_hash_shuffle ? TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE
1024
0
                                        : TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1025
0
    }
1026
394
    if (!use_global_hash_shuffle &&
1027
394
        data_distribution.distribution_type == TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE) {
1028
43
        data_distribution.distribution_type = TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE;
1029
43
    }
1030
394
    RETURN_IF_ERROR(new_pip->set_sink(sink));
1031
394
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
1032
394
                                          num_buckets, shuffle_idx_to_instance_idx));
1033
1034
    // 2. Create and initialize LocalExchangeSharedState.
1035
394
    std::shared_ptr<LocalExchangeSharedState> shared_state =
1036
394
            LocalExchangeSharedState::create_shared(_num_instances);
1037
394
    switch (data_distribution.distribution_type) {
1038
43
    case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
1039
43
    case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
1040
43
        shared_state->exchanger = ShuffleExchanger::create_unique(
1041
43
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1042
43
                use_global_hash_shuffle ? _total_instances : _num_instances,
1043
43
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1044
43
                        ? cast_set<int>(
1045
43
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1046
43
                        : 0,
1047
43
                data_distribution.distribution_type);
1048
43
        break;
1049
3
    case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
1050
3
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
1051
3
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
1052
3
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1053
3
                        ? cast_set<int>(
1054
3
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1055
3
                        : 0);
1056
3
        break;
1057
255
    case TLocalPartitionType::PASSTHROUGH:
1058
255
        shared_state->exchanger = PassthroughExchanger::create_unique(
1059
255
                cur_pipe->num_tasks(), _num_instances,
1060
255
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1061
255
                        ? cast_set<int>(
1062
255
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1063
255
                        : 0);
1064
255
        break;
1065
9
    case TLocalPartitionType::BROADCAST:
1066
9
        shared_state->exchanger = BroadcastExchanger::create_unique(
1067
9
                cur_pipe->num_tasks(), _num_instances,
1068
9
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1069
9
                        ? cast_set<int>(
1070
9
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1071
9
                        : 0);
1072
9
        break;
1073
2
    case TLocalPartitionType::PASS_TO_ONE:
1074
2
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
1075
            // If shared hash table is enabled for BJ, hash table will be built by only one task
1076
0
            shared_state->exchanger = PassToOneExchanger::create_unique(
1077
0
                    cur_pipe->num_tasks(), _num_instances,
1078
0
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1079
0
                            ? cast_set<int>(_runtime_state->query_options()
1080
0
                                                    .local_exchange_free_blocks_limit)
1081
0
                            : 0);
1082
2
        } else {
1083
2
            shared_state->exchanger = BroadcastExchanger::create_unique(
1084
2
                    cur_pipe->num_tasks(), _num_instances,
1085
2
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1086
2
                            ? cast_set<int>(_runtime_state->query_options()
1087
2
                                                    .local_exchange_free_blocks_limit)
1088
2
                            : 0);
1089
2
        }
1090
2
        break;
1091
82
    case TLocalPartitionType::ADAPTIVE_PASSTHROUGH:
1092
82
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
1093
82
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
1094
82
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
1095
82
                        ? cast_set<int>(
1096
82
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
1097
82
                        : 0);
1098
82
        break;
1099
0
    default:
1100
0
        return Status::InternalError("Unsupported local exchange type : " +
1101
0
                                     std::to_string((int)data_distribution.distribution_type));
1102
394
    }
1103
394
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
1104
394
                                             "LOCAL_EXCHANGE_OPERATOR");
1105
394
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
1106
394
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
1107
1108
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
1109
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
1110
1111
    // 3.1 Initialize new pipeline's operator list.
1112
394
    std::copy(operators.begin(), operators.begin() + idx,
1113
394
              std::inserter(new_pip->operators(), new_pip->operators().end()));
1114
1115
    // 3.2 Erase unused operators in previous pipeline.
1116
394
    operators.erase(operators.begin(), operators.begin() + idx);
1117
1118
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
1119
394
    OperatorPtr source_op;
1120
394
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
1121
394
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
1122
394
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
1123
394
    if (!operators.empty()) {
1124
126
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
1125
126
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
1126
126
    }
1127
394
    operators.insert(operators.begin(), source_op);
1128
1129
    // 5. Set children for two pipelines separately.
1130
394
    std::vector<std::shared_ptr<Pipeline>> new_children;
1131
394
    std::vector<PipelineId> edges_with_source;
1132
860
    for (auto child : cur_pipe->children()) {
1133
860
        bool found = false;
1134
1.18k
        for (auto op : new_pip->operators()) {
1135
1.18k
            if (child->sink()->node_id() == op->node_id()) {
1136
318
                new_pip->set_children(child);
1137
318
                found = true;
1138
318
            };
1139
1.18k
        }
1140
860
        if (!found) {
1141
542
            new_children.push_back(child);
1142
542
            edges_with_source.push_back(child->id());
1143
542
        }
1144
860
    }
1145
394
    new_children.push_back(new_pip);
1146
394
    edges_with_source.push_back(new_pip->id());
1147
1148
    // 6. Set DAG for new pipelines.
1149
394
    if (!new_pip->children().empty()) {
1150
225
        std::vector<PipelineId> edges_with_sink;
1151
318
        for (auto child : new_pip->children()) {
1152
318
            edges_with_sink.push_back(child->id());
1153
318
        }
1154
225
        _dag.insert({new_pip->id(), edges_with_sink});
1155
225
    }
1156
394
    cur_pipe->set_children(new_children);
1157
394
    _dag[downstream_pipeline_id] = edges_with_source;
1158
394
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
1159
394
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
1160
394
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
1161
1162
    // 7. Inherit properties from current pipeline.
1163
394
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
1164
394
    return Status::OK();
1165
394
}
1166
1167
Status PipelineFragmentContext::_add_local_exchange(
1168
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
1169
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
1170
        const std::map<int, int>& bucket_seq_to_instance_idx,
1171
4.88k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1172
4.88k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
1173
3.80k
        return Status::OK();
1174
3.80k
    }
1175
1176
1.07k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
1177
708
        return Status::OK();
1178
708
    }
1179
368
    *do_local_exchange = true;
1180
1181
368
    auto& operators = cur_pipe->operators();
1182
368
    auto total_op_num = operators.size();
1183
368
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
1184
368
    RETURN_IF_ERROR(_add_local_exchange_impl(
1185
368
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
1186
368
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1187
1188
368
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
1189
0
            << "total_op_num: " << total_op_num
1190
0
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
1191
0
            << " new_pip->operators().size(): " << new_pip->operators().size();
1192
1193
    // There are some local shuffles with relatively heavy operations on the sink.
1194
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
1195
    // Therefore, local passthrough is used to increase the concurrency of the sink.
1196
    // op -> local sink(1) -> local source (n)
1197
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
1198
368
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
1199
368
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
1200
26
        RETURN_IF_ERROR(_add_local_exchange_impl(
1201
26
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
1202
26
                add_pipeline(new_pip, pip_idx + 2),
1203
26
                DataDistribution(TLocalPartitionType::PASSTHROUGH), do_local_exchange, num_buckets,
1204
26
                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1205
26
    }
1206
368
    return Status::OK();
1207
368
}
1208
1209
Status PipelineFragmentContext::_plan_local_exchange(
1210
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
1211
85.3k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1212
189k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
1213
103k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
1214
        // Set property if child pipeline is not join operator's child.
1215
103k
        if (!_pipelines[pip_idx]->children().empty()) {
1216
17.4k
            for (auto& child : _pipelines[pip_idx]->children()) {
1217
17.4k
                if (child->sink()->node_id() ==
1218
17.4k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
1219
14.1k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
1220
14.1k
                }
1221
17.4k
            }
1222
16.0k
        }
1223
1224
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1225
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1226
        // still keep colocate plan after local shuffle
1227
103k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1228
103k
                                             bucket_seq_to_instance_idx,
1229
103k
                                             shuffle_idx_to_instance_idx));
1230
103k
    }
1231
85.3k
    return Status::OK();
1232
85.3k
}
1233
1234
Status PipelineFragmentContext::_plan_local_exchange(
1235
        int num_buckets, int pip_idx, PipelinePtr pip,
1236
        const std::map<int, int>& bucket_seq_to_instance_idx,
1237
103k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1238
103k
    int idx = 1;
1239
103k
    bool do_local_exchange = false;
1240
103k
    do {
1241
103k
        auto& ops = pip->operators();
1242
103k
        do_local_exchange = false;
1243
        // Plan local exchange for each operator.
1244
107k
        for (; idx < ops.size();) {
1245
3.80k
            auto _le_req = ops[idx]->required_data_distribution(_runtime_state.get());
1246
3.80k
            if (_le_req.need_local_exchange()) {
1247
3.13k
                RETURN_IF_ERROR(_add_local_exchange(
1248
3.13k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, _le_req,
1249
3.13k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1250
3.13k
                        shuffle_idx_to_instance_idx));
1251
3.13k
            }
1252
3.80k
            if (do_local_exchange) {
1253
                // If local exchange is needed for current operator, we will split this pipeline to
1254
                // two pipelines by local exchange sink/source. And then we need to process remaining
1255
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1256
                // is current operator was already processed) and continue to plan local exchange.
1257
126
                idx = 2;
1258
126
                break;
1259
126
            }
1260
3.67k
            idx++;
1261
3.67k
        }
1262
103k
    } while (do_local_exchange);
1263
103k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1264
1.75k
        RETURN_IF_ERROR(_add_local_exchange(
1265
1.75k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1266
1.75k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1267
1.75k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1268
1.75k
    }
1269
103k
    return Status::OK();
1270
103k
}
1271
1272
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1273
                                                  const std::vector<TExpr>& output_exprs,
1274
                                                  const TPipelineFragmentParams& params,
1275
                                                  const RowDescriptor& row_desc,
1276
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1277
333k
                                                  PipelineId cur_pipeline_id) {
1278
333k
    switch (thrift_sink.type) {
1279
117k
    case TDataSinkType::DATA_STREAM_SINK: {
1280
117k
        if (!thrift_sink.__isset.stream_sink) {
1281
0
            return Status::InternalError("Missing data stream sink.");
1282
0
        }
1283
117k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1284
117k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1285
117k
                params.destinations, _fragment_instance_ids);
1286
117k
        break;
1287
117k
    }
1288
185k
    case TDataSinkType::RESULT_SINK: {
1289
185k
        if (!thrift_sink.__isset.result_sink) {
1290
0
            return Status::InternalError("Missing data buffer sink.");
1291
0
        }
1292
1293
185k
        auto& pipeline = _pipelines[cur_pipeline_id];
1294
185k
        int child_node_id = pipeline->operators().back()->node_id();
1295
185k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), child_node_id + 1,
1296
185k
                                                      row_desc, output_exprs,
1297
185k
                                                      thrift_sink.result_sink);
1298
185k
        break;
1299
185k
    }
1300
104
    case TDataSinkType::DICTIONARY_SINK: {
1301
104
        if (!thrift_sink.__isset.dictionary_sink) {
1302
0
            return Status::InternalError("Missing dict sink.");
1303
0
        }
1304
1305
104
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1306
104
                                                    thrift_sink.dictionary_sink);
1307
104
        break;
1308
104
    }
1309
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1310
29.3k
    case TDataSinkType::OLAP_TABLE_SINK: {
1311
29.3k
        auto& pipeline = _pipelines[cur_pipeline_id];
1312
29.3k
        int child_node_id = pipeline->operators().back()->node_id();
1313
29.3k
        if (state->query_options().enable_memtable_on_sink_node &&
1314
29.3k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1315
29.3k
            !_has_row_binlog(thrift_sink.olap_table_sink) && !config::is_cloud_mode()) {
1316
58
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(
1317
58
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1318
29.2k
        } else {
1319
29.2k
            _sink = std::make_shared<OlapTableSinkOperatorX>(
1320
29.2k
                    pool, next_sink_operator_id(), child_node_id + 1, row_desc, output_exprs);
1321
29.2k
        }
1322
29.3k
        break;
1323
0
    }
1324
168
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1325
168
        DCHECK(thrift_sink.__isset.olap_table_sink);
1326
168
        DCHECK(state->get_query_ctx() != nullptr);
1327
168
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1328
168
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1329
168
                                                                output_exprs);
1330
168
        break;
1331
0
    }
1332
0
    case TDataSinkType::HIVE_TABLE_SINK: {
1333
0
        if (!thrift_sink.__isset.hive_table_sink) {
1334
0
            return Status::InternalError("Missing hive table sink.");
1335
0
        }
1336
0
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1337
0
                                                         output_exprs);
1338
0
        break;
1339
0
    }
1340
0
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1341
0
        if (!thrift_sink.__isset.iceberg_table_sink) {
1342
0
            return Status::InternalError("Missing iceberg table sink.");
1343
0
        }
1344
0
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1345
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1346
0
                                                                     row_desc, output_exprs);
1347
0
        } else {
1348
0
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1349
0
                                                                row_desc, output_exprs);
1350
0
        }
1351
0
        break;
1352
0
    }
1353
0
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1354
0
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1355
0
            return Status::InternalError("Missing iceberg delete sink.");
1356
0
        }
1357
0
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1358
0
                                                             row_desc, output_exprs);
1359
0
        break;
1360
0
    }
1361
0
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1362
0
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1363
0
            return Status::InternalError("Missing iceberg merge sink.");
1364
0
        }
1365
0
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1366
0
                                                            output_exprs);
1367
0
        break;
1368
0
    }
1369
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1370
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1371
0
            return Status::InternalError("Missing max compute table sink.");
1372
0
        }
1373
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1374
0
                                                       output_exprs);
1375
0
        break;
1376
0
    }
1377
0
    case TDataSinkType::JDBC_TABLE_SINK: {
1378
0
        if (!thrift_sink.__isset.jdbc_table_sink) {
1379
0
            return Status::InternalError("Missing data jdbc sink.");
1380
0
        }
1381
0
        if (config::enable_java_support) {
1382
0
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1383
0
                                                             output_exprs);
1384
0
        } else {
1385
0
            return Status::InternalError(
1386
0
                    "Jdbc table sink is not enabled, you can change be config "
1387
0
                    "enable_java_support to true and restart be.");
1388
0
        }
1389
0
        break;
1390
0
    }
1391
3
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1392
3
        if (!thrift_sink.__isset.memory_scratch_sink) {
1393
0
            return Status::InternalError("Missing data buffer sink.");
1394
0
        }
1395
1396
3
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1397
3
                                                             output_exprs);
1398
3
        break;
1399
3
    }
1400
339
    case TDataSinkType::RESULT_FILE_SINK: {
1401
339
        if (!thrift_sink.__isset.result_file_sink) {
1402
0
            return Status::InternalError("Missing result file sink.");
1403
0
        }
1404
1405
        // Result file sink is not the top sink
1406
339
        if (params.__isset.destinations && !params.destinations.empty()) {
1407
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1408
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1409
0
                    params.destinations, output_exprs, desc_tbl);
1410
339
        } else {
1411
339
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1412
339
                                                              output_exprs);
1413
339
        }
1414
339
        break;
1415
339
    }
1416
642
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1417
642
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1418
642
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1419
642
        auto sink_id = next_sink_operator_id();
1420
642
        const int multi_cast_node_id = sink_id;
1421
642
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1422
        // one sink has multiple sources.
1423
642
        std::vector<int> sources;
1424
2.36k
        for (int i = 0; i < sender_size; ++i) {
1425
1.72k
            auto source_id = next_operator_id();
1426
1.72k
            sources.push_back(source_id);
1427
1.72k
        }
1428
1429
642
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1430
642
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1431
2.36k
        for (int i = 0; i < sender_size; ++i) {
1432
1.72k
            auto new_pipeline = add_pipeline();
1433
            // use to exchange sink
1434
1.72k
            RowDescriptor* exchange_row_desc = nullptr;
1435
1.72k
            {
1436
1.72k
                const auto& tmp_row_desc =
1437
1.72k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1438
1.72k
                                ? RowDescriptor(state->desc_tbl(),
1439
1.72k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1440
1.72k
                                                         .output_tuple_id})
1441
1.72k
                                : row_desc;
1442
1.72k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1443
1.72k
            }
1444
1.72k
            auto source_id = sources[i];
1445
1.72k
            OperatorPtr source_op;
1446
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1447
1.72k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1448
1.72k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1449
1.72k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1450
1.72k
                    /*operator_id=*/source_id);
1451
1.72k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1452
1.72k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1453
            // 2. create and set sink operator of data stream sender for new pipeline
1454
1455
1.72k
            DataSinkOperatorPtr sink_op;
1456
1.72k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1457
1.72k
                    state, *exchange_row_desc, next_sink_operator_id(),
1458
1.72k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1459
1.72k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1460
1461
1.72k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1462
1.72k
            {
1463
1.72k
                TDataSink* t = pool->add(new TDataSink());
1464
1.72k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1465
1.72k
                RETURN_IF_ERROR(sink_op->init(*t));
1466
1.72k
            }
1467
1468
            // 3. set dependency dag
1469
1.72k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1470
1.72k
        }
1471
642
        if (sources.empty()) {
1472
0
            return Status::InternalError("size of sources must be greater than 0");
1473
0
        }
1474
642
        break;
1475
642
    }
1476
642
    case TDataSinkType::BLACKHOLE_SINK: {
1477
3
        if (!thrift_sink.__isset.blackhole_sink) {
1478
0
            return Status::InternalError("Missing blackhole sink.");
1479
0
        }
1480
1481
3
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1482
3
        break;
1483
3
    }
1484
0
    case TDataSinkType::TVF_TABLE_SINK: {
1485
0
        if (!thrift_sink.__isset.tvf_table_sink) {
1486
0
            return Status::InternalError("Missing TVF table sink.");
1487
0
        }
1488
0
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1489
0
                                                        output_exprs);
1490
0
        break;
1491
0
    }
1492
0
    default:
1493
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1494
333k
    }
1495
332k
    return Status::OK();
1496
333k
}
1497
1498
// NOLINTBEGIN(readability-function-size)
1499
// NOLINTBEGIN(readability-function-cognitive-complexity)
1500
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1501
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1502
                                                 PipelinePtr& cur_pipe, int parent_idx,
1503
                                                 int child_idx,
1504
                                                 const bool followed_by_shuffled_operator,
1505
                                                 const bool require_bucket_distribution,
1506
615k
                                                 OperatorPtr& cache_op) {
1507
615k
    std::vector<DataSinkOperatorPtr> sink_ops;
1508
615k
    Defer defer = Defer([&]() {
1509
615k
        if (op) {
1510
615k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1511
615k
        }
1512
614k
        for (auto& s : sink_ops) {
1513
184k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1514
184k
        }
1515
614k
    });
1516
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1517
    // Therefore, here we need to use a stack-like structure.
1518
615k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1519
615k
    std::stringstream error_msg;
1520
615k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1521
1522
615k
    bool fe_with_old_version = false;
1523
615k
    switch (tnode.node_type) {
1524
165k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1525
165k
        op = std::make_shared<OlapScanOperatorX>(
1526
165k
                pool, tnode, next_operator_id(), descs, _num_instances,
1527
165k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1528
165k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1529
165k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1530
165k
        break;
1531
165k
    }
1532
78
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1533
78
        DCHECK(_query_ctx != nullptr);
1534
78
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1535
78
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1536
78
                                                    _num_instances);
1537
78
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1538
78
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1539
78
        break;
1540
78
    }
1541
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1542
0
        if (config::enable_java_support) {
1543
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1544
0
                                                     _num_instances);
1545
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1546
0
        } else {
1547
0
            return Status::InternalError(
1548
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1549
0
                    "to true and restart be.");
1550
0
        }
1551
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1552
0
        break;
1553
0
    }
1554
2.69k
    case TPlanNodeType::FILE_SCAN_NODE: {
1555
2.69k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1556
2.69k
                                                 _num_instances);
1557
2.69k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1558
2.69k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1559
2.69k
        break;
1560
2.69k
    }
1561
116k
    case TPlanNodeType::EXCHANGE_NODE: {
1562
116k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1563
116k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1564
18.4E
                                  : 0;
1565
116k
        DCHECK_GT(num_senders, 0);
1566
116k
        auto exchange_op = std::make_shared<ExchangeSourceOperatorX>(
1567
116k
                pool, tnode, next_operator_id(), descs, num_senders);
1568
116k
        if (!_params.bucket_seq_to_instance_idx.empty()) {
1569
            // Lets bucket-routed exchanges detect orphan instances (owning no bucket) that
1570
            // no sender channel will ever address — their receivers must start at EOS.
1571
1.98k
            exchange_op->set_bucket_dest_instances(_params.bucket_seq_to_instance_idx);
1572
1.98k
        }
1573
116k
        op = exchange_op;
1574
116k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1575
116k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1576
116k
        break;
1577
116k
    }
1578
115k
    case TPlanNodeType::AGGREGATION_NODE: {
1579
115k
        if (tnode.agg_node.grouping_exprs.empty() &&
1580
115k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1581
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1582
0
                                         ": group by and output is empty");
1583
0
        }
1584
115k
        bool need_create_cache_op =
1585
115k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1586
115k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1587
10
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1588
10
            auto cache_source_id = next_operator_id();
1589
10
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1590
10
                                                        _params.fragment.query_cache_param);
1591
10
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1592
1593
10
            const auto downstream_pipeline_id = cur_pipe->id();
1594
10
            if (!_dag.contains(downstream_pipeline_id)) {
1595
10
                _dag.insert({downstream_pipeline_id, {}});
1596
10
            }
1597
10
            new_pipe = add_pipeline(cur_pipe);
1598
10
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1599
1600
10
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1601
10
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1602
10
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1603
10
            return Status::OK();
1604
10
        };
1605
115k
        const bool group_by_limit_opt =
1606
115k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1607
1608
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1609
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1610
115k
        const bool enable_spill = _runtime_state->enable_spill() &&
1611
115k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1612
115k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1613
115k
                                      tnode.agg_node.use_streaming_preaggregation &&
1614
115k
                                      !tnode.agg_node.grouping_exprs.empty();
1615
        // TODO: distinct streaming agg does not support spill.
1616
115k
        const bool can_use_distinct_streaming_agg =
1617
115k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1618
115k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1619
115k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1620
115k
                _params.query_options.enable_distinct_streaming_aggregation;
1621
1622
115k
        if (can_use_distinct_streaming_agg) {
1623
82.7k
            if (need_create_cache_op) {
1624
8
                PipelinePtr new_pipe;
1625
8
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1626
1627
8
                cache_op = op;
1628
8
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1629
8
                                                                     tnode, descs);
1630
8
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1631
8
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1632
8
                cur_pipe = new_pipe;
1633
82.7k
            } else {
1634
82.7k
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1635
82.7k
                                                                     tnode, descs);
1636
82.7k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1637
82.7k
            }
1638
82.7k
        } else if (is_streaming_agg) {
1639
1.06k
            if (need_create_cache_op) {
1640
0
                PipelinePtr new_pipe;
1641
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1642
0
                cache_op = op;
1643
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1644
0
                                                             descs);
1645
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1646
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1647
0
                cur_pipe = new_pipe;
1648
1.06k
            } else {
1649
1.06k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1650
1.06k
                                                             descs);
1651
1.06k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1652
1.06k
            }
1653
31.8k
        } else {
1654
            // create new pipeline to add query cache operator
1655
31.8k
            PipelinePtr new_pipe;
1656
31.8k
            if (need_create_cache_op) {
1657
2
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1658
2
                cache_op = op;
1659
2
            }
1660
1661
31.8k
            if (enable_spill) {
1662
117
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1663
117
                                                                     next_operator_id(), descs);
1664
31.7k
            } else {
1665
31.7k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1666
31.7k
            }
1667
31.8k
            if (need_create_cache_op) {
1668
2
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1669
2
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1670
2
                cur_pipe = new_pipe;
1671
31.8k
            } else {
1672
31.8k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1673
31.8k
            }
1674
1675
31.8k
            const auto downstream_pipeline_id = cur_pipe->id();
1676
31.8k
            if (!_dag.contains(downstream_pipeline_id)) {
1677
31.0k
                _dag.insert({downstream_pipeline_id, {}});
1678
31.0k
            }
1679
31.8k
            cur_pipe = add_pipeline(cur_pipe);
1680
31.8k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1681
1682
31.8k
            if (enable_spill) {
1683
117
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1684
117
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1685
31.7k
            } else {
1686
31.7k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1687
31.7k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1688
31.7k
            }
1689
31.8k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1690
31.8k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1691
31.8k
        }
1692
115k
        break;
1693
115k
    }
1694
115k
    case TPlanNodeType::BUCKETED_AGGREGATION_NODE: {
1695
98
        if (tnode.bucketed_agg_node.grouping_exprs.empty()) {
1696
0
            return Status::InternalError(
1697
0
                    "Bucketed aggregation node {} should not be used without group by keys",
1698
0
                    tnode.node_id);
1699
0
        }
1700
1701
        // Create source operator (goes on the current / downstream pipeline).
1702
98
        op = std::make_shared<BucketedAggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1703
98
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1704
1705
        // Create a new pipeline for the sink side.
1706
98
        const auto downstream_pipeline_id = cur_pipe->id();
1707
98
        if (!_dag.contains(downstream_pipeline_id)) {
1708
98
            _dag.insert({downstream_pipeline_id, {}});
1709
98
        }
1710
98
        cur_pipe = add_pipeline(cur_pipe);
1711
98
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1712
1713
        // Create sink operator.
1714
98
        sink_ops.push_back(std::make_shared<BucketedAggSinkOperatorX>(
1715
98
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1716
98
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1717
98
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1718
1719
        // Pre-register a single shared state for ALL instances so that every
1720
        // sink instance writes its per-instance hash table into the same
1721
        // BucketedAggSharedState and every source instance can merge across
1722
        // all of them.
1723
98
        {
1724
98
            auto shared_state = BucketedAggSharedState::create_shared();
1725
98
            shared_state->id = op->operator_id();
1726
98
            shared_state->related_op_ids.insert(op->operator_id());
1727
1728
657
            for (int i = 0; i < _num_instances; i++) {
1729
559
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1730
559
                                                             "BUCKETED_AGG_SINK_DEPENDENCY");
1731
559
                sink_dep->set_shared_state(shared_state.get());
1732
559
                shared_state->sink_deps.push_back(sink_dep);
1733
559
            }
1734
98
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1735
98
                                                     op->node_id(), "BUCKETED_AGG_SOURCE");
1736
98
            _op_id_to_shared_state.insert(
1737
98
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1738
98
        }
1739
98
        break;
1740
98
    }
1741
9.76k
    case TPlanNodeType::HASH_JOIN_NODE: {
1742
9.76k
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1743
9.76k
                                       tnode.hash_join_node.is_broadcast_join;
1744
9.76k
        const auto enable_spill = _runtime_state->enable_spill();
1745
9.76k
        if (enable_spill && !is_broadcast_join) {
1746
0
            auto tnode_ = tnode;
1747
0
            tnode_.runtime_filters.clear();
1748
0
            auto inner_probe_operator =
1749
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1750
1751
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1752
            // So here use `tnode_` which has no runtime filters.
1753
0
            auto probe_side_inner_sink_operator =
1754
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1755
1756
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1757
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1758
1759
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1760
0
                    pool, tnode_, next_operator_id(), descs);
1761
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1762
0
                                                inner_probe_operator);
1763
0
            op = std::move(probe_operator);
1764
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1765
1766
0
            const auto downstream_pipeline_id = cur_pipe->id();
1767
0
            if (!_dag.contains(downstream_pipeline_id)) {
1768
0
                _dag.insert({downstream_pipeline_id, {}});
1769
0
            }
1770
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1771
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1772
1773
0
            auto inner_sink_operator =
1774
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1775
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1776
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1777
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1778
1779
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1780
0
            sink_ops.push_back(std::move(sink_operator));
1781
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1782
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1783
1784
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1785
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1786
9.76k
        } else {
1787
9.76k
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1788
9.76k
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1789
1790
9.76k
            const auto downstream_pipeline_id = cur_pipe->id();
1791
9.76k
            if (!_dag.contains(downstream_pipeline_id)) {
1792
8.09k
                _dag.insert({downstream_pipeline_id, {}});
1793
8.09k
            }
1794
9.76k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1795
9.76k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1796
1797
9.76k
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1798
9.76k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1799
9.76k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1800
9.76k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1801
1802
9.76k
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1803
9.76k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1804
9.76k
        }
1805
9.76k
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1806
4.51k
            std::shared_ptr<HashJoinSharedState> shared_state =
1807
4.51k
                    HashJoinSharedState::create_shared(_num_instances);
1808
20.6k
            for (int i = 0; i < _num_instances; i++) {
1809
16.1k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1810
16.1k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1811
16.1k
                sink_dep->set_shared_state(shared_state.get());
1812
16.1k
                shared_state->sink_deps.push_back(sink_dep);
1813
16.1k
            }
1814
4.51k
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1815
4.51k
                                                     op->node_id(), "HASH_JOIN_PROBE");
1816
4.51k
            _op_id_to_shared_state.insert(
1817
4.51k
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1818
4.51k
        }
1819
9.76k
        break;
1820
9.76k
    }
1821
2.23k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1822
2.23k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1823
2.23k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1824
1825
2.23k
        const auto downstream_pipeline_id = cur_pipe->id();
1826
2.23k
        if (!_dag.contains(downstream_pipeline_id)) {
1827
1.99k
            _dag.insert({downstream_pipeline_id, {}});
1828
1.99k
        }
1829
2.23k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1830
2.23k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1831
1832
2.23k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1833
2.23k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1834
2.23k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1835
2.23k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1836
2.23k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1837
2.23k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1838
2.23k
        break;
1839
2.23k
    }
1840
50.3k
    case TPlanNodeType::UNION_NODE: {
1841
50.3k
        int child_count = tnode.num_children;
1842
50.3k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1843
50.3k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1844
1845
50.3k
        const auto downstream_pipeline_id = cur_pipe->id();
1846
50.3k
        if (!_dag.contains(downstream_pipeline_id)) {
1847
49.8k
            _dag.insert({downstream_pipeline_id, {}});
1848
49.8k
        }
1849
51.4k
        for (int i = 0; i < child_count; i++) {
1850
1.09k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1851
1.09k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1852
1.09k
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1853
1.09k
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1854
1.09k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1855
1.09k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1856
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1857
1.09k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1858
1.09k
        }
1859
50.3k
        break;
1860
50.3k
    }
1861
50.3k
    case TPlanNodeType::SORT_NODE: {
1862
33.7k
        const auto should_spill = _runtime_state->enable_spill() &&
1863
33.7k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1864
33.7k
        const bool use_local_merge =
1865
33.7k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1866
33.7k
        if (should_spill) {
1867
9
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1868
33.7k
        } else if (use_local_merge) {
1869
31.7k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1870
31.7k
                                                                 descs);
1871
31.7k
        } else {
1872
1.99k
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1873
1.99k
        }
1874
33.7k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1875
1876
33.7k
        const auto downstream_pipeline_id = cur_pipe->id();
1877
33.7k
        if (!_dag.contains(downstream_pipeline_id)) {
1878
33.7k
            _dag.insert({downstream_pipeline_id, {}});
1879
33.7k
        }
1880
33.7k
        cur_pipe = add_pipeline(cur_pipe);
1881
33.7k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1882
1883
33.7k
        if (should_spill) {
1884
9
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1885
9
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1886
33.7k
        } else {
1887
33.7k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1888
33.7k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1889
33.7k
        }
1890
33.7k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1891
33.7k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1892
33.7k
        break;
1893
33.7k
    }
1894
33.7k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1895
71
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1896
71
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1897
1898
71
        const auto downstream_pipeline_id = cur_pipe->id();
1899
71
        if (!_dag.contains(downstream_pipeline_id)) {
1900
71
            _dag.insert({downstream_pipeline_id, {}});
1901
71
        }
1902
71
        cur_pipe = add_pipeline(cur_pipe);
1903
71
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1904
1905
71
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1906
71
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1907
71
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1908
71
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1909
71
        break;
1910
71
    }
1911
1.79k
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1912
1.79k
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1913
1.79k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1914
1915
1.79k
        const auto downstream_pipeline_id = cur_pipe->id();
1916
1.79k
        if (!_dag.contains(downstream_pipeline_id)) {
1917
1.78k
            _dag.insert({downstream_pipeline_id, {}});
1918
1.78k
        }
1919
1.79k
        cur_pipe = add_pipeline(cur_pipe);
1920
1.79k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1921
1922
1.79k
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1923
1.79k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1924
1.79k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1925
1.79k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1926
1.79k
        break;
1927
1.79k
    }
1928
1.79k
    case TPlanNodeType::MATERIALIZATION_NODE: {
1929
797
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1930
797
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1931
797
        break;
1932
797
    }
1933
797
    case TPlanNodeType::INTERSECT_NODE: {
1934
168
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1935
168
                                                                      cur_pipe, sink_ops));
1936
168
        break;
1937
168
    }
1938
168
    case TPlanNodeType::EXCEPT_NODE: {
1939
159
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1940
159
                                                                       cur_pipe, sink_ops));
1941
159
        break;
1942
159
    }
1943
333
    case TPlanNodeType::REPEAT_NODE: {
1944
333
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1945
333
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1946
333
        break;
1947
333
    }
1948
914
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1949
914
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1950
914
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1951
914
        break;
1952
914
    }
1953
914
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1954
18
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1955
18
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1956
18
        break;
1957
18
    }
1958
1.49k
    case TPlanNodeType::EMPTY_SET_NODE: {
1959
1.49k
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1960
1.49k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1961
1.49k
        break;
1962
1.49k
    }
1963
1.49k
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1964
296
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1965
296
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1966
296
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1967
296
        break;
1968
296
    }
1969
1.54k
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1970
1.54k
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1971
1.54k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1972
1.54k
        break;
1973
1.54k
    }
1974
5.24k
    case TPlanNodeType::META_SCAN_NODE: {
1975
5.24k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1976
5.24k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1977
5.24k
        break;
1978
5.24k
    }
1979
5.24k
    case TPlanNodeType::SELECT_NODE: {
1980
616
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1981
616
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1982
616
        break;
1983
616
    }
1984
616
    case TPlanNodeType::REC_CTE_NODE: {
1985
164
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1986
164
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1987
1988
164
        const auto downstream_pipeline_id = cur_pipe->id();
1989
164
        if (!_dag.contains(downstream_pipeline_id)) {
1990
160
            _dag.insert({downstream_pipeline_id, {}});
1991
160
        }
1992
1993
164
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1994
164
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1995
1996
164
        DataSinkOperatorPtr anchor_sink;
1997
164
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1998
164
                                                                  op->operator_id(), tnode, descs);
1999
164
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
2000
164
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
2001
164
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
2002
2003
164
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
2004
164
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
2005
2006
164
        DataSinkOperatorPtr rec_sink;
2007
164
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
2008
164
                                                         tnode, descs);
2009
164
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
2010
164
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
2011
164
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
2012
2013
164
        break;
2014
164
    }
2015
2.18k
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
2016
2.18k
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
2017
2.18k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2018
2.18k
        break;
2019
2.18k
    }
2020
103k
    case TPlanNodeType::LOCAL_EXCHANGE_NODE: {
2021
103k
        op = std::make_shared<LocalExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs);
2022
        // The downstream pipeline (containing LocalExchangeSource) must have
2023
        // _num_instances tasks — matching BE-native _inherit_pipeline_properties
2024
        // which sets pipe_with_source.set_num_tasks(_num_instances).
2025
        // Without this, when the parent pipeline was reduced by a serial operator
2026
        // (e.g., serial Exchange with use_serial_exchange=true, or UNPARTITIONED
2027
        // Exchange), the downstream inherits the reduced num_tasks via
2028
        // add_pipeline(parent).  The deferred exchanger creates _num_instances
2029
        // channels but only fewer source tasks initialize mem_counters — the
2030
        // sink round-robins to all channels and crashes on uninitialized ones.
2031
103k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2032
        // Restore downstream pipeline's num_tasks (mirroring _inherit_pipeline_properties:
2033
        // downstream keeps _num_instances, upstream gets the serial/reduced count)
2034
103k
        cur_pipe->set_num_tasks(_num_instances);
2035
2036
103k
        const auto downstream_pipeline_id = cur_pipe->id();
2037
103k
        if (!_dag.contains(downstream_pipeline_id)) {
2038
99.4k
            _dag.insert({downstream_pipeline_id, {}});
2039
99.4k
        }
2040
103k
        cur_pipe = add_pipeline(cur_pipe);
2041
        // If this local exchange was inserted because of a serial scan (is_serial_operator),
2042
        // the upstream pipeline (cur_pipe) should have num_tasks=1 (only 1 scan task).
2043
        // We set this now so the exchanger is created with the correct sender count.
2044
        // Child operators added later (serial scan) will also set num_tasks=1, which is
2045
        // consistent with this.
2046
103k
        if (op->is_serial_operator() && _parallel_instances > 0) {
2047
0
            cur_pipe->set_num_tasks(_parallel_instances);
2048
0
        }
2049
103k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
2050
103k
        int num_partitions = 0;
2051
103k
        std::map<int, int> shuffle_id_to_instance_idx;
2052
103k
        auto partition_type = tnode.local_exchange_node.partition_type;
2053
103k
        switch (partition_type) {
2054
71
        case TLocalPartitionType::BUCKET_HASH_SHUFFLE:
2055
71
            num_partitions = _params.num_buckets;
2056
71
            shuffle_id_to_instance_idx = _params.bucket_seq_to_instance_idx;
2057
71
            break;
2058
24.5k
        case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
2059
177k
            for (int i = 0; i < _num_instances; i++) {
2060
153k
                shuffle_id_to_instance_idx[i] = i;
2061
153k
            }
2062
24.5k
            num_partitions = _num_instances;
2063
24.5k
            break;
2064
7
        case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
2065
7
            num_partitions = _total_instances;
2066
7
            shuffle_id_to_instance_idx = _params.shuffle_idx_to_instance_idx;
2067
7
            break;
2068
78.8k
        default:
2069
78.8k
            break;
2070
103k
        }
2071
103k
        auto local_exchange_id = op->operator_id();
2072
103k
        auto sink_id = next_sink_operator_id();
2073
103k
        DataSinkOperatorPtr sink = std::make_shared<LocalExchangeSinkOperatorX>(
2074
103k
                sink_id, local_exchange_id, tnode, num_partitions, shuffle_id_to_instance_idx);
2075
103k
        sink_ops.push_back(sink);
2076
103k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
2077
103k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
2078
2079
        // For FE-planned local exchange, we need to:
2080
        // 1. Initialize the partitioner for hash shuffle types
2081
        // 2. Defer exchanger creation until after the full plan tree is built
2082
        //    (child operators like serial ExchangeNode may change cur_pipe->num_tasks())
2083
        // 3. Register shared state so pipeline tasks can find it
2084
103k
        RETURN_IF_ERROR(static_cast<LocalExchangeSinkOperatorX*>(cur_pipe->sink())
2085
103k
                                ->init_partitioner(_runtime_state.get()));
2086
2087
103k
        int free_blocks_limit =
2088
103k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
2089
103k
                        ? cast_set<int>(
2090
103k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
2091
103k
                        : 0;
2092
103k
        auto shared_state = LocalExchangeSharedState::create_shared(_num_instances);
2093
103k
        shared_state->create_source_dependencies(_num_instances, local_exchange_id,
2094
103k
                                                 local_exchange_id, "LOCAL_EXCHANGE_OPERATOR");
2095
103k
        shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
2096
103k
        _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
2097
        // Defer exchanger creation: sender count depends on final upstream num_tasks
2098
103k
        _deferred_exchangers.push_back({shared_state, cur_pipe, partition_type, num_partitions,
2099
103k
                                        free_blocks_limit, local_exchange_id, sink_id});
2100
103k
        break;
2101
103k
    }
2102
0
    default:
2103
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
2104
0
                                     print_plan_node_type(tnode.node_type));
2105
615k
    }
2106
615k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
2107
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
2108
0
        op->set_serial_operator();
2109
0
    }
2110
2111
615k
    return Status::OK();
2112
615k
}
2113
// NOLINTEND(readability-function-cognitive-complexity)
2114
// NOLINTEND(readability-function-size)
2115
2116
template <bool is_intersect>
2117
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
2118
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
2119
327
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2120
327
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2121
327
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2122
2123
327
    const auto downstream_pipeline_id = cur_pipe->id();
2124
327
    if (!_dag.contains(downstream_pipeline_id)) {
2125
302
        _dag.insert({downstream_pipeline_id, {}});
2126
302
    }
2127
2128
1.07k
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2129
750
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2130
750
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2131
2132
750
        if (child_id == 0) {
2133
327
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2134
327
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2135
423
        } else {
2136
423
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2137
423
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2138
423
        }
2139
750
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2140
750
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2141
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2142
750
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2143
750
    }
2144
2145
327
    return Status::OK();
2146
327
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
2119
168
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2120
168
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2121
168
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2122
2123
168
    const auto downstream_pipeline_id = cur_pipe->id();
2124
168
    if (!_dag.contains(downstream_pipeline_id)) {
2125
155
        _dag.insert({downstream_pipeline_id, {}});
2126
155
    }
2127
2128
585
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2129
417
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2130
417
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2131
2132
417
        if (child_id == 0) {
2133
168
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2134
168
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2135
249
        } else {
2136
249
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2137
249
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2138
249
        }
2139
417
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2140
417
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2141
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2142
417
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2143
417
    }
2144
2145
168
    return Status::OK();
2146
168
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
2119
159
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
2120
159
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
2121
159
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
2122
2123
159
    const auto downstream_pipeline_id = cur_pipe->id();
2124
159
    if (!_dag.contains(downstream_pipeline_id)) {
2125
147
        _dag.insert({downstream_pipeline_id, {}});
2126
147
    }
2127
2128
492
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
2129
333
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
2130
333
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
2131
2132
333
        if (child_id == 0) {
2133
159
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
2134
159
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2135
174
        } else {
2136
174
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
2137
174
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
2138
174
        }
2139
333
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
2140
333
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
2141
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
2142
333
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
2143
333
    }
2144
2145
159
    return Status::OK();
2146
159
}
2147
2148
330k
Status PipelineFragmentContext::submit() {
2149
330k
    if (_submitted) {
2150
0
        return Status::InternalError("submitted");
2151
0
    }
2152
330k
    _submitted = true;
2153
2154
330k
    int submit_tasks = 0;
2155
330k
    Status st;
2156
330k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
2157
915k
    for (auto& task : _tasks) {
2158
1.54M
        for (auto& t : task) {
2159
1.54M
            st = scheduler->submit(t.first);
2160
1.54M
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
2161
1.54M
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
2162
1.54M
            if (!st) {
2163
0
                cancel(Status::InternalError("submit context to executor fail"));
2164
0
                std::lock_guard<std::mutex> l(_task_mutex);
2165
0
                _total_tasks = submit_tasks;
2166
0
                break;
2167
0
            }
2168
1.54M
            submit_tasks++;
2169
1.54M
        }
2170
915k
    }
2171
330k
    if (!st.ok()) {
2172
0
        bool need_remove = false;
2173
0
        {
2174
0
            std::lock_guard<std::mutex> l(_task_mutex);
2175
0
            if (_closed_tasks >= _total_tasks) {
2176
0
                need_remove = _close_fragment_instance();
2177
0
            }
2178
0
        }
2179
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2180
0
        if (need_remove) {
2181
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2182
0
        }
2183
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
2184
0
                                     BackendOptions::get_localhost());
2185
330k
    } else {
2186
330k
        return st;
2187
330k
    }
2188
330k
}
2189
2190
0
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
2191
0
    if (_runtime_state->enable_profile()) {
2192
0
        std::stringstream ss;
2193
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2194
0
            runtime_profile_ptr->pretty_print(&ss);
2195
0
        }
2196
2197
0
        if (_runtime_state->load_channel_profile()) {
2198
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2199
0
        }
2200
2201
0
        auto profile_str =
2202
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
2203
0
                            this->_fragment_id, extra_info, ss.str());
2204
0
        LOG_LONG_STRING(INFO, profile_str);
2205
0
    }
2206
0
}
2207
// If all pipeline tasks binded to the fragment instance are finished, then we could
2208
// close the fragment instance.
2209
// Returns true if the caller should call remove_pipeline_context() **after** releasing
2210
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
2211
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
2212
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
2213
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
2214
// (via debug_string()).
2215
332k
bool PipelineFragmentContext::_close_fragment_instance() {
2216
332k
    if (_is_fragment_instance_closed) {
2217
0
        return false;
2218
0
    }
2219
332k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
2220
332k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
2221
332k
    if (!_need_notify_close) {
2222
329k
        auto st = send_report(true);
2223
329k
        if (!st) {
2224
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
2225
0
                                        print_id(_query_id), _fragment_id, st.to_string());
2226
0
        }
2227
329k
    }
2228
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
2229
    // Since stream load does not have someting like coordinator on FE, so
2230
    // backend can not report profile to FE, ant its profile can not be shown
2231
    // in the same way with other query. So we print the profile content to info log.
2232
2233
332k
    if (_runtime_state->enable_profile() &&
2234
332k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
2235
2.45k
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
2236
2.45k
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
2237
0
        std::stringstream ss;
2238
        // Compute the _local_time_percent before pretty_print the runtime_profile
2239
        // Before add this operation, the print out like that:
2240
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
2241
        // After add the operation, the print out like that:
2242
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
2243
        // We can easily know the exec node execute time without child time consumed.
2244
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
2245
0
            runtime_profile_ptr->pretty_print(&ss);
2246
0
        }
2247
2248
0
        if (_runtime_state->load_channel_profile()) {
2249
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
2250
0
        }
2251
2252
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
2253
0
    }
2254
2255
332k
    if (_query_ctx->enable_profile()) {
2256
2.45k
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
2257
2.45k
                                         collect_realtime_load_channel_profile());
2258
2.45k
    }
2259
2260
    // Return whether the caller needs to remove from the pipeline map.
2261
    // The caller must do this after releasing _task_mutex.
2262
332k
    return !_need_notify_close;
2263
332k
}
2264
2265
1.54M
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
2266
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
2267
1.54M
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
2268
1.54M
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
2269
520k
        if (_dag.contains(pipeline_id)) {
2270
228k
            for (auto dep : _dag[pipeline_id]) {
2271
187k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
2272
187k
            }
2273
228k
        }
2274
520k
    }
2275
1.54M
    bool need_remove = false;
2276
1.54M
    {
2277
1.54M
        std::lock_guard<std::mutex> l(_task_mutex);
2278
1.54M
        ++_closed_tasks;
2279
        // Update query-level finished task progress in real time.
2280
1.54M
        _query_ctx->inc_finished_task_num();
2281
1.54M
        if (_closed_tasks >= _total_tasks) {
2282
332k
            need_remove = _close_fragment_instance();
2283
332k
        }
2284
1.54M
    }
2285
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
2286
1.54M
    if (need_remove) {
2287
329k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
2288
329k
    }
2289
1.54M
}
2290
2291
42.7k
std::string PipelineFragmentContext::get_load_error_url() {
2292
42.7k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
2293
0
        return to_load_error_http_path(str);
2294
0
    }
2295
97.6k
    for (auto& tasks : _tasks) {
2296
167k
        for (auto& task : tasks) {
2297
167k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
2298
184
                return to_load_error_http_path(str);
2299
184
            }
2300
167k
        }
2301
97.6k
    }
2302
42.5k
    return "";
2303
42.7k
}
2304
2305
42.6k
std::string PipelineFragmentContext::get_first_error_msg() {
2306
42.6k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
2307
0
        return str;
2308
0
    }
2309
97.6k
    for (auto& tasks : _tasks) {
2310
167k
        for (auto& task : tasks) {
2311
167k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
2312
184
                return str;
2313
184
            }
2314
167k
        }
2315
97.6k
    }
2316
42.4k
    return "";
2317
42.6k
}
2318
2319
0
std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const {
2320
0
    std::stringstream url;
2321
0
    url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
2322
0
        << "/api/_download_load?"
2323
0
        << "token=" << _exec_env->token() << "&file=" << file_name;
2324
0
    return url.str();
2325
0
}
2326
2327
37.4k
void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) {
2328
37.4k
    DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
2329
37.4k
        int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
2330
37.4k
        LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
2331
37.4k
        std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
2332
37.4k
        LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
2333
37.4k
    });
2334
2335
37.4k
    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
2336
37.4k
    if (req.coord_addr.hostname == "external") {
2337
        // External query (flink/spark read tablets) not need to report to FE.
2338
0
        return;
2339
0
    }
2340
37.4k
    int callback_retries = 10;
2341
37.4k
    const int sleep_ms = 1000;
2342
37.4k
    Status exec_status = req.status;
2343
37.4k
    Status coord_status;
2344
37.4k
    std::unique_ptr<FrontendServiceConnection> coord = nullptr;
2345
37.4k
    do {
2346
37.4k
        coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
2347
37.4k
                                                            req.coord_addr, &coord_status);
2348
37.4k
        if (!coord_status.ok()) {
2349
0
            std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
2350
0
        }
2351
37.4k
    } while (!coord_status.ok() && callback_retries-- > 0);
2352
2353
37.4k
    if (!coord_status.ok()) {
2354
0
        UniqueId uid(req.query_id.hi, req.query_id.lo);
2355
0
        static_cast<void>(req.cancel_fn(Status::InternalError(
2356
0
                "query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
2357
0
                PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
2358
0
        return;
2359
0
    }
2360
2361
37.4k
    TReportExecStatusParams params;
2362
37.4k
    params.protocol_version = FrontendServiceVersion::V1;
2363
37.4k
    params.__set_query_id(req.query_id);
2364
37.4k
    params.__set_backend_num(req.backend_num);
2365
37.4k
    params.__set_fragment_instance_id(req.fragment_instance_id);
2366
37.4k
    params.__set_fragment_id(req.fragment_id);
2367
37.4k
    params.__set_status(exec_status.to_thrift());
2368
37.4k
    params.__set_done(req.done);
2369
37.4k
    params.__set_query_type(req.runtime_state->query_type());
2370
37.4k
    params.__isset.profile = false;
2371
2372
37.4k
    DCHECK(req.runtime_state != nullptr);
2373
2374
37.4k
    if (req.runtime_state->query_type() == TQueryType::LOAD) {
2375
33.3k
        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2376
33.3k
        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2377
33.3k
    } else {
2378
4.05k
        DCHECK(!req.runtime_states.empty());
2379
4.05k
        if (!req.runtime_state->output_files().empty()) {
2380
0
            params.__isset.delta_urls = true;
2381
0
            for (auto& it : req.runtime_state->output_files()) {
2382
0
                params.delta_urls.push_back(_to_http_path(it));
2383
0
            }
2384
0
        }
2385
4.05k
        if (!params.delta_urls.empty()) {
2386
0
            params.__isset.delta_urls = true;
2387
0
        }
2388
4.05k
    }
2389
2390
37.4k
    static std::string s_dpp_normal_all = "dpp.norm.ALL";
2391
37.4k
    static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
2392
37.4k
    static std::string s_unselected_rows = "unselected.rows";
2393
37.4k
    int64_t num_rows_load_success = 0;
2394
37.4k
    int64_t num_rows_load_filtered = 0;
2395
37.4k
    int64_t num_rows_load_unselected = 0;
2396
37.4k
    if (req.runtime_state->num_rows_load_total() > 0 ||
2397
37.4k
        req.runtime_state->num_rows_load_filtered() > 0 ||
2398
37.4k
        req.runtime_state->num_finished_range() > 0) {
2399
0
        params.__isset.load_counters = true;
2400
2401
0
        num_rows_load_success = req.runtime_state->num_rows_load_success();
2402
0
        num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
2403
0
        num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
2404
0
        params.__isset.fragment_instance_reports = true;
2405
0
        TFragmentInstanceReport t;
2406
0
        t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
2407
0
        t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
2408
0
        t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
2409
0
        t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
2410
0
        params.fragment_instance_reports.push_back(t);
2411
37.4k
    } else if (!req.runtime_states.empty()) {
2412
114k
        for (auto* rs : req.runtime_states) {
2413
114k
            if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
2414
114k
                rs->num_finished_range() > 0) {
2415
31.4k
                params.__isset.load_counters = true;
2416
31.4k
                num_rows_load_success += rs->num_rows_load_success();
2417
31.4k
                num_rows_load_filtered += rs->num_rows_load_filtered();
2418
31.4k
                num_rows_load_unselected += rs->num_rows_load_unselected();
2419
31.4k
                params.__isset.fragment_instance_reports = true;
2420
31.4k
                TFragmentInstanceReport t;
2421
31.4k
                t.__set_fragment_instance_id(rs->fragment_instance_id());
2422
31.4k
                t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
2423
31.4k
                t.__set_loaded_rows(rs->num_rows_load_total());
2424
31.4k
                t.__set_loaded_bytes(rs->num_bytes_load_total());
2425
31.4k
                params.fragment_instance_reports.push_back(t);
2426
31.4k
            }
2427
114k
        }
2428
37.4k
    }
2429
37.4k
    params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
2430
37.4k
    params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
2431
37.4k
    params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
2432
2433
37.4k
    if (!req.load_error_url.empty()) {
2434
169
        params.__set_tracking_url(req.load_error_url);
2435
169
    }
2436
37.4k
    if (!req.first_error_msg.empty()) {
2437
169
        params.__set_first_error_msg(req.first_error_msg);
2438
169
    }
2439
114k
    for (auto* rs : req.runtime_states) {
2440
114k
        if (rs->wal_id() > 0) {
2441
108
            params.__set_txn_id(rs->wal_id());
2442
108
            params.__set_label(rs->import_label());
2443
108
        }
2444
114k
    }
2445
37.4k
    if (!req.runtime_state->export_output_files().empty()) {
2446
0
        params.__isset.export_files = true;
2447
0
        params.export_files = req.runtime_state->export_output_files();
2448
37.4k
    } else if (!req.runtime_states.empty()) {
2449
114k
        for (auto* rs : req.runtime_states) {
2450
114k
            if (!rs->export_output_files().empty()) {
2451
0
                params.__isset.export_files = true;
2452
0
                params.export_files.insert(params.export_files.end(),
2453
0
                                           rs->export_output_files().begin(),
2454
0
                                           rs->export_output_files().end());
2455
0
            }
2456
114k
        }
2457
37.3k
    }
2458
37.4k
    if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
2459
0
        params.__isset.commitInfos = true;
2460
0
        params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
2461
37.4k
    } else if (!req.runtime_states.empty()) {
2462
114k
        for (auto* rs : req.runtime_states) {
2463
114k
            if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
2464
25.3k
                params.__isset.commitInfos = true;
2465
25.3k
                params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
2466
25.3k
            }
2467
114k
        }
2468
37.4k
    }
2469
37.4k
    if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
2470
0
        params.__isset.errorTabletInfos = true;
2471
0
        params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
2472
37.4k
    } else if (!req.runtime_states.empty()) {
2473
114k
        for (auto* rs : req.runtime_states) {
2474
114k
            if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
2475
0
                params.__isset.errorTabletInfos = true;
2476
0
                params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
2477
0
                                               rs_eti.end());
2478
0
            }
2479
114k
        }
2480
37.3k
    }
2481
37.4k
    if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
2482
0
        params.__isset.hive_partition_updates = true;
2483
0
        params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
2484
0
                                             hpu.end());
2485
37.4k
    } else if (!req.runtime_states.empty()) {
2486
114k
        for (auto* rs : req.runtime_states) {
2487
114k
            if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
2488
0
                params.__isset.hive_partition_updates = true;
2489
0
                params.hive_partition_updates.insert(params.hive_partition_updates.end(),
2490
0
                                                     rs_hpu.begin(), rs_hpu.end());
2491
0
            }
2492
114k
        }
2493
37.3k
    }
2494
37.4k
    if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
2495
0
        params.__isset.iceberg_commit_datas = true;
2496
0
        params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
2497
0
                                           icd.end());
2498
37.4k
    } else if (!req.runtime_states.empty()) {
2499
114k
        for (auto* rs : req.runtime_states) {
2500
114k
            if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
2501
0
                params.__isset.iceberg_commit_datas = true;
2502
0
                params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
2503
0
                                                   rs_icd.begin(), rs_icd.end());
2504
0
            }
2505
114k
        }
2506
37.3k
    }
2507
2508
37.4k
    if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
2509
0
        params.__isset.mc_commit_datas = true;
2510
0
        params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
2511
37.4k
    } else if (!req.runtime_states.empty()) {
2512
114k
        for (auto* rs : req.runtime_states) {
2513
114k
            if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
2514
0
                params.__isset.mc_commit_datas = true;
2515
0
                params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
2516
0
                                              rs_mcd.end());
2517
0
            }
2518
114k
        }
2519
37.3k
    }
2520
2521
37.4k
    req.runtime_state->get_unreported_errors(&(params.error_log));
2522
37.4k
    params.__isset.error_log = (!params.error_log.empty());
2523
2524
37.4k
    if (_exec_env->cluster_info()->backend_id != 0) {
2525
37.3k
        params.__set_backend_id(_exec_env->cluster_info()->backend_id);
2526
37.3k
    }
2527
2528
37.4k
    TReportExecStatusResult res;
2529
37.4k
    Status rpc_status;
2530
2531
37.4k
    VLOG_DEBUG << "reportExecStatus params is "
2532
71
               << apache::thrift::ThriftDebugString(params).c_str();
2533
37.4k
    if (!exec_status.ok()) {
2534
1.53k
        LOG(WARNING) << "report error status: " << exec_status.msg()
2535
1.53k
                     << " to coordinator: " << req.coord_addr
2536
1.53k
                     << ", query id: " << print_id(req.query_id);
2537
1.53k
    }
2538
37.4k
    try {
2539
37.4k
        try {
2540
37.4k
            (*coord)->reportExecStatus(res, params);
2541
37.4k
        } catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) {
2542
#ifndef ADDRESS_SANITIZER
2543
            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
2544
                         << ", instance id: " << print_id(req.fragment_instance_id) << " to "
2545
                         << req.coord_addr << ", err: " << e.what();
2546
#endif
2547
0
            rpc_status = coord->reopen();
2548
2549
0
            if (!rpc_status.ok()) {
2550
0
                req.cancel_fn(rpc_status);
2551
0
                return;
2552
0
            }
2553
0
            (*coord)->reportExecStatus(res, params);
2554
0
        }
2555
2556
37.3k
        rpc_status = Status::create<false>(res.status);
2557
37.3k
    } catch (apache::thrift::TException& e) {
2558
0
        rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
2559
0
                                           PrintThriftNetworkAddress(req.coord_addr), e.what());
2560
0
    }
2561
2562
37.3k
    if (!rpc_status.ok()) {
2563
0
        LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
2564
0
                 print_id(req.query_id), rpc_status.to_string());
2565
0
        req.cancel_fn(rpc_status);
2566
0
    }
2567
37.3k
}
2568
2569
332k
Status PipelineFragmentContext::send_report(bool done) {
2570
332k
    Status exec_status = _query_ctx->exec_status();
2571
    // If plan is done successfully, but _is_report_success is false,
2572
    // no need to send report.
2573
    // Load will set _is_report_success to true because load wants to know
2574
    // the process.
2575
332k
    if (!_is_report_success && done && exec_status.ok()) {
2576
295k
        return Status::OK();
2577
295k
    }
2578
2579
    // If both _is_report_success and _is_report_on_cancel are false,
2580
    // which means no matter query is success or failed, no report is needed.
2581
    // This may happen when the query limit reached and
2582
    // a internal cancellation being processed
2583
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
2584
    // be set to false, to avoid sending fault report to FE.
2585
37.5k
    if (!_is_report_success && !_is_report_on_cancel) {
2586
108
        if (done) {
2587
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
2588
108
            return Status::OK();
2589
108
        }
2590
0
        return Status::NeedSendAgain("");
2591
108
    }
2592
2593
37.4k
    std::vector<RuntimeState*> runtime_states;
2594
2595
75.2k
    for (auto& tasks : _tasks) {
2596
114k
        for (auto& task : tasks) {
2597
114k
            runtime_states.push_back(task.second.get());
2598
114k
        }
2599
75.2k
    }
2600
2601
37.4k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
2602
37.4k
                                        ? get_load_error_url()
2603
37.4k
                                        : _query_ctx->get_load_error_url();
2604
37.4k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2605
37.4k
                                          ? get_first_error_msg()
2606
37.4k
                                          : _query_ctx->get_first_error_msg();
2607
2608
37.4k
    ReportStatusRequest req {.status = exec_status,
2609
37.4k
                             .runtime_states = runtime_states,
2610
37.4k
                             .done = done || !exec_status.ok(),
2611
37.4k
                             .coord_addr = _query_ctx->coord_addr,
2612
37.4k
                             .query_id = _query_id,
2613
37.4k
                             .fragment_id = _fragment_id,
2614
37.4k
                             .fragment_instance_id = TUniqueId(),
2615
37.4k
                             .backend_num = -1,
2616
37.4k
                             .runtime_state = _runtime_state.get(),
2617
37.4k
                             .load_error_url = load_eror_url,
2618
37.4k
                             .first_error_msg = first_error_msg,
2619
37.4k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2620
37.4k
    auto ctx = std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this());
2621
37.4k
    return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() {
2622
37.4k
        SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
2623
37.4k
        _coordinator_callback(req);
2624
37.4k
        if (!req.done) {
2625
3.70k
            ctx->refresh_next_report_time();
2626
3.70k
        }
2627
37.4k
    });
2628
37.5k
}
2629
2630
0
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2631
0
    size_t res = 0;
2632
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2633
    // here to traverse the vector.
2634
0
    for (const auto& task_instances : _tasks) {
2635
0
        for (const auto& task : task_instances) {
2636
0
            if (task.first->is_running()) {
2637
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2638
0
                                      << " is running, task: " << (void*)task.first.get()
2639
0
                                      << ", is_running: " << task.first->is_running();
2640
0
                *has_running_task = true;
2641
0
                return 0;
2642
0
            }
2643
2644
0
            size_t revocable_size = task.first->get_revocable_size();
2645
0
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2646
0
                res += revocable_size;
2647
0
            }
2648
0
        }
2649
0
    }
2650
0
    return res;
2651
0
}
2652
2653
0
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2654
0
    std::vector<PipelineTask*> revocable_tasks;
2655
0
    for (const auto& task_instances : _tasks) {
2656
0
        for (const auto& task : task_instances) {
2657
0
            size_t revocable_size_ = task.first->get_revocable_size();
2658
2659
0
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2660
0
                revocable_tasks.emplace_back(task.first.get());
2661
0
            }
2662
0
        }
2663
0
    }
2664
0
    return revocable_tasks;
2665
0
}
2666
2667
140
std::string PipelineFragmentContext::debug_string() {
2668
140
    std::lock_guard<std::mutex> l(_task_mutex);
2669
140
    fmt::memory_buffer debug_string_buffer;
2670
140
    fmt::format_to(debug_string_buffer,
2671
140
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2672
140
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2673
140
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2674
560
    for (size_t j = 0; j < _tasks.size(); j++) {
2675
420
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2676
1.23k
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2677
813
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2678
813
                           _tasks[j][i].first->debug_string());
2679
813
        }
2680
420
    }
2681
2682
140
    return fmt::to_string(debug_string_buffer);
2683
140
}
2684
2685
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2686
2.45k
PipelineFragmentContext::collect_realtime_profile() const {
2687
2.45k
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2688
2689
    // we do not have mutex to protect pipeline_id_to_profile
2690
    // so we need to make sure this funciton is invoked after fragment context
2691
    // has already been prepared.
2692
2.45k
    if (!_prepared) {
2693
0
        std::string msg =
2694
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2695
0
        DCHECK(false) << msg;
2696
0
        LOG_ERROR(msg);
2697
0
        return res;
2698
0
    }
2699
2700
    // Make sure first profile is fragment level profile
2701
2.45k
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2702
2.45k
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2703
2.45k
    res.push_back(fragment_profile);
2704
2705
    // pipeline_id_to_profile is initialized in prepare stage
2706
5.04k
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2707
5.04k
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2708
5.04k
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2709
5.04k
        res.push_back(profile_ptr);
2710
5.04k
    }
2711
2712
2.45k
    return res;
2713
2.45k
}
2714
2715
std::shared_ptr<TRuntimeProfileTree>
2716
2.45k
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2717
    // we do not have mutex to protect pipeline_id_to_profile
2718
    // so we need to make sure this funciton is invoked after fragment context
2719
    // has already been prepared.
2720
2.45k
    if (!_prepared) {
2721
0
        std::string msg =
2722
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2723
0
        DCHECK(false) << msg;
2724
0
        LOG_ERROR(msg);
2725
0
        return nullptr;
2726
0
    }
2727
2728
7.98k
    for (const auto& tasks : _tasks) {
2729
17.4k
        for (const auto& task : tasks) {
2730
17.4k
            if (task.second->load_channel_profile() == nullptr) {
2731
0
                continue;
2732
0
            }
2733
2734
17.4k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2735
2736
17.4k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2737
17.4k
                                                           _runtime_state->profile_level());
2738
17.4k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2739
17.4k
        }
2740
7.98k
    }
2741
2742
2.45k
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2743
2.45k
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2744
2.45k
                                                      _runtime_state->profile_level());
2745
2.45k
    return load_channel_profile;
2746
2.45k
}
2747
2748
// Collect runtime filter IDs registered by all tasks in this PFC.
2749
// Used during recursive CTE stage transitions to know which filters to deregister
2750
// before creating the new PFC for the next recursion round.
2751
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2752
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2753
// the vector sizes do not change, and individual RuntimeState filter sets are
2754
// written only during open() which has completed by the time we reach rerun.
2755
3.51k
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2756
3.51k
    std::set<int> result;
2757
6.27k
    for (const auto& _task : _tasks) {
2758
10.6k
        for (const auto& task : _task) {
2759
10.6k
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2760
10.6k
            result.merge(set);
2761
10.6k
        }
2762
6.27k
    }
2763
3.51k
    if (_runtime_state) {
2764
3.51k
        auto set = _runtime_state->get_deregister_runtime_filter();
2765
3.51k
        result.merge(set);
2766
3.51k
    }
2767
3.51k
    return result;
2768
3.51k
}
2769
2770
333k
void PipelineFragmentContext::_release_resource() {
2771
333k
    std::lock_guard<std::mutex> l(_task_mutex);
2772
    // The memory released by the query end is recorded in the query mem tracker.
2773
333k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2774
333k
    auto st = _query_ctx->exec_status();
2775
916k
    for (auto& _task : _tasks) {
2776
916k
        if (!_task.empty()) {
2777
916k
            _call_back(_task.front().first->runtime_state(), &st);
2778
916k
        }
2779
916k
    }
2780
333k
    _tasks.clear();
2781
333k
    _dag.clear();
2782
333k
    _pip_id_to_pipeline.clear();
2783
333k
    _pipelines.clear();
2784
333k
    _sink.reset();
2785
333k
    _root_op.reset();
2786
333k
    _runtime_filter_mgr_map.clear();
2787
333k
    _op_id_to_shared_state.clear();
2788
333k
}
2789
2790
} // namespace doris