Coverage Report

Created: 2026-03-27 21:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <pthread.h>
24
25
#include <algorithm>
26
#include <cstdlib>
27
// IWYU pragma: no_include <bits/chrono.h>
28
#include <fmt/format.h>
29
30
#include <chrono> // IWYU pragma: keep
31
#include <map>
32
#include <memory>
33
#include <ostream>
34
#include <utility>
35
36
#include "cloud/config.h"
37
#include "common/cast_set.h"
38
#include "common/config.h"
39
#include "common/exception.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "exec/exchange/local_exchange_sink_operator.h"
43
#include "exec/exchange/local_exchange_source_operator.h"
44
#include "exec/exchange/local_exchanger.h"
45
#include "exec/exchange/vdata_stream_mgr.h"
46
#include "exec/operator/aggregation_sink_operator.h"
47
#include "exec/operator/aggregation_source_operator.h"
48
#include "exec/operator/analytic_sink_operator.h"
49
#include "exec/operator/analytic_source_operator.h"
50
#include "exec/operator/assert_num_rows_operator.h"
51
#include "exec/operator/blackhole_sink_operator.h"
52
#include "exec/operator/cache_sink_operator.h"
53
#include "exec/operator/cache_source_operator.h"
54
#include "exec/operator/datagen_operator.h"
55
#include "exec/operator/dict_sink_operator.h"
56
#include "exec/operator/distinct_streaming_aggregation_operator.h"
57
#include "exec/operator/empty_set_operator.h"
58
#include "exec/operator/es_scan_operator.h"
59
#include "exec/operator/exchange_sink_operator.h"
60
#include "exec/operator/exchange_source_operator.h"
61
#include "exec/operator/file_scan_operator.h"
62
#include "exec/operator/group_commit_block_sink_operator.h"
63
#include "exec/operator/group_commit_scan_operator.h"
64
#include "exec/operator/hashjoin_build_sink.h"
65
#include "exec/operator/hashjoin_probe_operator.h"
66
#include "exec/operator/hive_table_sink_operator.h"
67
#include "exec/operator/iceberg_delete_sink_operator.h"
68
#include "exec/operator/iceberg_merge_sink_operator.h"
69
#include "exec/operator/iceberg_table_sink_operator.h"
70
#include "exec/operator/jdbc_scan_operator.h"
71
#include "exec/operator/jdbc_table_sink_operator.h"
72
#include "exec/operator/local_merge_sort_source_operator.h"
73
#include "exec/operator/materialization_opertor.h"
74
#include "exec/operator/maxcompute_table_sink_operator.h"
75
#include "exec/operator/memory_scratch_sink_operator.h"
76
#include "exec/operator/meta_scan_operator.h"
77
#include "exec/operator/multi_cast_data_stream_sink.h"
78
#include "exec/operator/multi_cast_data_stream_source.h"
79
#include "exec/operator/nested_loop_join_build_operator.h"
80
#include "exec/operator/nested_loop_join_probe_operator.h"
81
#include "exec/operator/olap_scan_operator.h"
82
#include "exec/operator/olap_table_sink_operator.h"
83
#include "exec/operator/olap_table_sink_v2_operator.h"
84
#include "exec/operator/partition_sort_sink_operator.h"
85
#include "exec/operator/partition_sort_source_operator.h"
86
#include "exec/operator/partitioned_aggregation_sink_operator.h"
87
#include "exec/operator/partitioned_aggregation_source_operator.h"
88
#include "exec/operator/partitioned_hash_join_probe_operator.h"
89
#include "exec/operator/partitioned_hash_join_sink_operator.h"
90
#include "exec/operator/rec_cte_anchor_sink_operator.h"
91
#include "exec/operator/rec_cte_scan_operator.h"
92
#include "exec/operator/rec_cte_sink_operator.h"
93
#include "exec/operator/rec_cte_source_operator.h"
94
#include "exec/operator/repeat_operator.h"
95
#include "exec/operator/result_file_sink_operator.h"
96
#include "exec/operator/result_sink_operator.h"
97
#include "exec/operator/schema_scan_operator.h"
98
#include "exec/operator/select_operator.h"
99
#include "exec/operator/set_probe_sink_operator.h"
100
#include "exec/operator/set_sink_operator.h"
101
#include "exec/operator/set_source_operator.h"
102
#include "exec/operator/sort_sink_operator.h"
103
#include "exec/operator/sort_source_operator.h"
104
#include "exec/operator/spill_iceberg_table_sink_operator.h"
105
#include "exec/operator/spill_sort_sink_operator.h"
106
#include "exec/operator/spill_sort_source_operator.h"
107
#include "exec/operator/streaming_aggregation_operator.h"
108
#include "exec/operator/table_function_operator.h"
109
#include "exec/operator/tvf_table_sink_operator.h"
110
#include "exec/operator/union_sink_operator.h"
111
#include "exec/operator/union_source_operator.h"
112
#include "exec/pipeline/dependency.h"
113
#include "exec/pipeline/pipeline_task.h"
114
#include "exec/pipeline/task_scheduler.h"
115
#include "exec/runtime_filter/runtime_filter_mgr.h"
116
#include "exec/sort/topn_sorter.h"
117
#include "exec/spill/spill_file.h"
118
#include "io/fs/stream_load_pipe.h"
119
#include "load/stream_load/new_load_stream_mgr.h"
120
#include "runtime/exec_env.h"
121
#include "runtime/fragment_mgr.h"
122
#include "runtime/result_block_buffer.h"
123
#include "runtime/result_buffer_mgr.h"
124
#include "runtime/runtime_state.h"
125
#include "runtime/thread_context.h"
126
#include "service/backend_options.h"
127
#include "util/countdown_latch.h"
128
#include "util/debug_util.h"
129
#include "util/uid_util.h"
130
131
namespace doris {
132
#include "common/compile_check_begin.h"
133
PipelineFragmentContext::PipelineFragmentContext(
134
        TUniqueId query_id, const TPipelineFragmentParams& request,
135
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
136
        const std::function<void(RuntimeState*, Status*)>& call_back,
137
        report_status_callback report_status_cb)
138
443k
        : _query_id(std::move(query_id)),
139
443k
          _fragment_id(request.fragment_id),
140
443k
          _exec_env(exec_env),
141
443k
          _query_ctx(std::move(query_ctx)),
142
443k
          _call_back(call_back),
143
443k
          _is_report_on_cancel(true),
144
443k
          _report_status_cb(std::move(report_status_cb)),
145
443k
          _params(request),
146
443k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
147
443k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
148
443k
                                                               : false) {
149
443k
    _fragment_watcher.start();
150
443k
}
151
152
443k
PipelineFragmentContext::~PipelineFragmentContext() {
153
443k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
154
443k
            .tag("query_id", print_id(_query_id))
155
443k
            .tag("fragment_id", _fragment_id);
156
443k
    _release_resource();
157
443k
    {
158
        // The memory released by the query end is recorded in the query mem tracker.
159
443k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
160
443k
        _runtime_state.reset();
161
443k
        _query_ctx.reset();
162
443k
    }
163
443k
}
164
165
54
bool PipelineFragmentContext::is_timeout(timespec now) const {
166
54
    if (_timeout <= 0) {
167
0
        return false;
168
0
    }
169
54
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
170
54
}
171
172
// notify_close() transitions the PFC from "waiting for external close notification" to
173
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
174
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
175
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
176
8.86k
bool PipelineFragmentContext::notify_close() {
177
8.86k
    bool all_closed = false;
178
8.86k
    bool need_remove = false;
179
8.86k
    {
180
8.86k
        std::lock_guard<std::mutex> l(_task_mutex);
181
8.86k
        if (_closed_tasks >= _total_tasks) {
182
3.28k
            if (_need_notify_close) {
183
                // Fragment was cancelled and waiting for notify to close.
184
                // Record that we need to remove from fragment mgr, but do it
185
                // after releasing _task_mutex to avoid ABBA deadlock with
186
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
187
                // first, then _task_mutex via debug_string()).
188
3.24k
                need_remove = true;
189
3.24k
            }
190
3.28k
            all_closed = true;
191
3.28k
        }
192
        // make fragment release by self after cancel
193
8.86k
        _need_notify_close = false;
194
8.86k
    }
195
8.86k
    if (need_remove) {
196
3.24k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
197
3.24k
    }
198
8.86k
    return all_closed;
199
8.86k
}
200
201
// Must not add lock in this method. Because it will call query ctx cancel. And
202
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
203
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
204
// There maybe dead lock.
205
5.58k
void PipelineFragmentContext::cancel(const Status reason) {
206
5.58k
    LOG_INFO("PipelineFragmentContext::cancel")
207
5.58k
            .tag("query_id", print_id(_query_id))
208
5.58k
            .tag("fragment_id", _fragment_id)
209
5.58k
            .tag("reason", reason.to_string());
210
5.58k
    if (notify_close()) {
211
56
        return;
212
56
    }
213
    // Timeout is a special error code, we need print current stack to debug timeout issue.
214
5.52k
    if (reason.is<ErrorCode::TIMEOUT>()) {
215
8
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
216
8
                                   debug_string());
217
8
        LOG_LONG_STRING(WARNING, dbg_str);
218
8
    }
219
220
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
221
5.52k
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
222
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
223
0
                    debug_string());
224
0
    }
225
226
5.52k
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
227
42
        print_profile("cancel pipeline, reason: " + reason.to_string());
228
42
    }
229
230
5.52k
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
231
22
        _query_ctx->set_load_error_url(error_url);
232
22
    }
233
234
5.52k
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
235
22
        _query_ctx->set_first_error_msg(first_error_msg);
236
22
    }
237
238
5.52k
    _query_ctx->cancel(reason, _fragment_id);
239
5.52k
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
240
331
        _is_report_on_cancel = false;
241
5.19k
    } else {
242
21.5k
        for (auto& id : _fragment_instance_ids) {
243
21.5k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
244
21.5k
        }
245
5.19k
    }
246
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
247
    // For stream load the fragment's query_id == load id, it is set in FE.
248
5.52k
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
249
5.52k
    if (stream_load_ctx != nullptr) {
250
30
        stream_load_ctx->pipe->cancel(reason.to_string());
251
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
252
        // We need to set the error URL at this point to ensure error information is properly
253
        // propagated to the client.
254
30
        stream_load_ctx->error_url = get_load_error_url();
255
30
        stream_load_ctx->first_error_msg = get_first_error_msg();
256
30
    }
257
258
22.5k
    for (auto& tasks : _tasks) {
259
49.3k
        for (auto& task : tasks) {
260
49.3k
            task.first->terminate();
261
49.3k
        }
262
22.5k
    }
263
5.52k
}
264
265
705k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
266
705k
    PipelineId id = _next_pipeline_id++;
267
705k
    auto pipeline = std::make_shared<Pipeline>(
268
705k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
269
705k
            parent ? parent->num_tasks() : _num_instances);
270
705k
    if (idx >= 0) {
271
122k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
272
582k
    } else {
273
582k
        _pipelines.emplace_back(pipeline);
274
582k
    }
275
705k
    if (parent) {
276
254k
        parent->set_children(pipeline);
277
254k
    }
278
705k
    return pipeline;
279
705k
}
280
281
443k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
282
443k
    {
283
443k
        SCOPED_TIMER(_build_pipelines_timer);
284
        // 2. Build pipelines with operators in this fragment.
285
443k
        auto root_pipeline = add_pipeline();
286
443k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
287
443k
                                         &_root_op, root_pipeline));
288
289
        // 3. Create sink operator
290
443k
        if (!_params.fragment.__isset.output_sink) {
291
0
            return Status::InternalError("No output sink in this fragment!");
292
0
        }
293
443k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
294
443k
                                          _params.fragment.output_exprs, _params,
295
443k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
296
443k
                                          *_desc_tbl, root_pipeline->id()));
297
443k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
298
443k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
299
300
582k
        for (PipelinePtr& pipeline : _pipelines) {
301
18.4E
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
302
582k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
303
582k
        }
304
443k
    }
305
    // 4. Build local exchanger
306
443k
    if (_runtime_state->enable_local_shuffle()) {
307
440k
        SCOPED_TIMER(_plan_local_exchanger_timer);
308
440k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
309
440k
                                             _params.bucket_seq_to_instance_idx,
310
440k
                                             _params.shuffle_idx_to_instance_idx));
311
440k
    }
312
313
    // 5. Initialize global states in pipelines.
314
705k
    for (PipelinePtr& pipeline : _pipelines) {
315
705k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
316
705k
        pipeline->children().clear();
317
705k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
318
705k
    }
319
320
442k
    {
321
442k
        SCOPED_TIMER(_build_tasks_timer);
322
        // 6. Build pipeline tasks and initialize local state.
323
442k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
324
442k
    }
325
326
442k
    return Status::OK();
327
442k
}
328
329
443k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
330
443k
    if (_prepared) {
331
0
        return Status::InternalError("Already prepared");
332
0
    }
333
443k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
334
443k
        _timeout = _params.query_options.execution_timeout;
335
443k
    }
336
337
443k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
338
443k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
339
443k
    SCOPED_TIMER(_prepare_timer);
340
443k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
341
443k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
342
443k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
343
443k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
344
443k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
345
443k
    {
346
443k
        SCOPED_TIMER(_init_context_timer);
347
443k
        cast_set(_num_instances, _params.local_params.size());
348
443k
        _total_instances =
349
443k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
350
351
443k
        auto* fragment_context = this;
352
353
443k
        if (_params.query_options.__isset.is_report_success) {
354
442k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
355
442k
        }
356
357
        // 1. Set up the global runtime state.
358
443k
        _runtime_state = RuntimeState::create_unique(
359
443k
                _params.query_id, _params.fragment_id, _params.query_options,
360
443k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
361
443k
        _runtime_state->set_task_execution_context(shared_from_this());
362
443k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
363
443k
        if (_params.__isset.backend_id) {
364
440k
            _runtime_state->set_backend_id(_params.backend_id);
365
440k
        }
366
443k
        if (_params.__isset.import_label) {
367
235
            _runtime_state->set_import_label(_params.import_label);
368
235
        }
369
443k
        if (_params.__isset.db_name) {
370
187
            _runtime_state->set_db_name(_params.db_name);
371
187
        }
372
443k
        if (_params.__isset.load_job_id) {
373
0
            _runtime_state->set_load_job_id(_params.load_job_id);
374
0
        }
375
376
443k
        if (_params.is_simplified_param) {
377
156k
            _desc_tbl = _query_ctx->desc_tbl;
378
286k
        } else {
379
286k
            DCHECK(_params.__isset.desc_tbl);
380
286k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
381
286k
                                                  &_desc_tbl));
382
286k
        }
383
443k
        _runtime_state->set_desc_tbl(_desc_tbl);
384
443k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
385
443k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
386
443k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
387
443k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
388
389
        // init fragment_instance_ids
390
443k
        const auto target_size = _params.local_params.size();
391
443k
        _fragment_instance_ids.resize(target_size);
392
1.81M
        for (size_t i = 0; i < _params.local_params.size(); i++) {
393
1.37M
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
394
1.37M
            _fragment_instance_ids[i] = fragment_instance_id;
395
1.37M
        }
396
443k
    }
397
398
443k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
399
400
442k
    _init_next_report_time();
401
402
442k
    _prepared = true;
403
442k
    return Status::OK();
404
443k
}
405
406
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
407
        int instance_idx,
408
1.37M
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
409
1.37M
    const auto& local_params = _params.local_params[instance_idx];
410
1.37M
    auto fragment_instance_id = local_params.fragment_instance_id;
411
1.37M
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
412
1.37M
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
413
1.37M
    auto get_shared_state = [&](PipelinePtr pipeline)
414
1.37M
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
415
2.41M
                                       std::vector<std::shared_ptr<Dependency>>>> {
416
2.41M
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
417
2.41M
                                std::vector<std::shared_ptr<Dependency>>>>
418
2.41M
                shared_state_map;
419
3.16M
        for (auto& op : pipeline->operators()) {
420
3.16M
            auto source_id = op->operator_id();
421
3.16M
            if (auto iter = _op_id_to_shared_state.find(source_id);
422
3.16M
                iter != _op_id_to_shared_state.end()) {
423
928k
                shared_state_map.insert({source_id, iter->second});
424
928k
            }
425
3.16M
        }
426
2.41M
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
427
2.41M
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
428
2.41M
                iter != _op_id_to_shared_state.end()) {
429
413k
                shared_state_map.insert({sink_to_source_id, iter->second});
430
413k
            }
431
2.41M
        }
432
2.41M
        return shared_state_map;
433
2.41M
    };
434
435
4.29M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
436
2.92M
        auto& pipeline = _pipelines[pip_idx];
437
2.92M
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
438
2.41M
            auto task_runtime_state = RuntimeState::create_unique(
439
2.41M
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
440
2.41M
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
441
2.41M
            {
442
                // Initialize runtime state for this task
443
2.41M
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
444
445
2.41M
                task_runtime_state->set_task_execution_context(shared_from_this());
446
2.41M
                task_runtime_state->set_be_number(local_params.backend_num);
447
448
2.41M
                if (_params.__isset.backend_id) {
449
2.41M
                    task_runtime_state->set_backend_id(_params.backend_id);
450
2.41M
                }
451
2.41M
                if (_params.__isset.import_label) {
452
236
                    task_runtime_state->set_import_label(_params.import_label);
453
236
                }
454
2.41M
                if (_params.__isset.db_name) {
455
188
                    task_runtime_state->set_db_name(_params.db_name);
456
188
                }
457
2.41M
                if (_params.__isset.load_job_id) {
458
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
459
0
                }
460
2.41M
                if (_params.__isset.wal_id) {
461
114
                    task_runtime_state->set_wal_id(_params.wal_id);
462
114
                }
463
2.41M
                if (_params.__isset.content_length) {
464
31
                    task_runtime_state->set_content_length(_params.content_length);
465
31
                }
466
467
2.41M
                task_runtime_state->set_desc_tbl(_desc_tbl);
468
2.41M
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
469
2.41M
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
470
2.41M
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
471
2.41M
                task_runtime_state->set_max_operator_id(max_operator_id());
472
2.41M
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
473
2.41M
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
474
2.41M
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
475
476
2.41M
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
477
2.41M
            }
478
2.41M
            auto cur_task_id = _total_tasks++;
479
2.41M
            task_runtime_state->set_task_id(cur_task_id);
480
2.41M
            task_runtime_state->set_task_num(pipeline->num_tasks());
481
2.41M
            auto task = std::make_shared<PipelineTask>(
482
2.41M
                    pipeline, cur_task_id, task_runtime_state.get(),
483
2.41M
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
484
2.41M
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
485
2.41M
                    instance_idx);
486
2.41M
            pipeline->incr_created_tasks(instance_idx, task.get());
487
2.41M
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
488
2.41M
            _tasks[instance_idx].emplace_back(
489
2.41M
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
490
2.41M
                            std::move(task), std::move(task_runtime_state)});
491
2.41M
        }
492
2.92M
    }
493
494
    /**
495
         * Build DAG for pipeline tasks.
496
         * For example, we have
497
         *
498
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
499
         *            \                      /
500
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
501
         *                 \                          /
502
         *               JoinProbeOperator2 (Pipeline1)
503
         *
504
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
505
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
506
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
507
         *
508
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
509
         * and JoinProbeOperator2.
510
         */
511
2.92M
    for (auto& _pipeline : _pipelines) {
512
2.92M
        if (pipeline_id_to_task.contains(_pipeline->id())) {
513
2.40M
            auto* task = pipeline_id_to_task[_pipeline->id()];
514
2.40M
            DCHECK(task != nullptr);
515
516
            // If this task has upstream dependency, then inject it into this task.
517
2.40M
            if (_dag.contains(_pipeline->id())) {
518
1.54M
                auto& deps = _dag[_pipeline->id()];
519
2.45M
                for (auto& dep : deps) {
520
2.45M
                    if (pipeline_id_to_task.contains(dep)) {
521
1.43M
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
522
1.43M
                        if (ss) {
523
615k
                            task->inject_shared_state(ss);
524
815k
                        } else {
525
815k
                            pipeline_id_to_task[dep]->inject_shared_state(
526
815k
                                    task->get_source_shared_state());
527
815k
                        }
528
1.43M
                    }
529
2.45M
                }
530
1.54M
            }
531
2.40M
        }
532
2.92M
    }
533
4.28M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
534
2.91M
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
535
2.40M
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
536
2.40M
            DCHECK(pipeline_id_to_profile[pip_idx]);
537
2.40M
            std::vector<TScanRangeParams> scan_ranges;
538
2.40M
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
539
2.40M
            if (local_params.per_node_scan_ranges.contains(node_id)) {
540
403k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
541
403k
            }
542
2.40M
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
543
2.40M
                                                             _params.fragment.output_sink));
544
2.40M
        }
545
2.91M
    }
546
1.37M
    {
547
1.37M
        std::lock_guard<std::mutex> l(_state_map_lock);
548
1.37M
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
549
1.37M
    }
550
1.37M
    return Status::OK();
551
1.37M
}
552
553
442k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
554
442k
    _total_tasks = 0;
555
442k
    _closed_tasks = 0;
556
442k
    const auto target_size = _params.local_params.size();
557
442k
    _tasks.resize(target_size);
558
442k
    _runtime_filter_mgr_map.resize(target_size);
559
1.14M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
560
704k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
561
704k
    }
562
442k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
563
564
442k
    if (target_size > 1 &&
565
442k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
566
141k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
567
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
568
22.1k
        std::vector<Status> prepare_status(target_size);
569
22.1k
        int submitted_tasks = 0;
570
22.1k
        Status submit_status;
571
22.1k
        CountDownLatch latch((int)target_size);
572
409k
        for (int i = 0; i < target_size; i++) {
573
386k
            submit_status = thread_pool->submit_func([&, i]() {
574
386k
                SCOPED_ATTACH_TASK(_query_ctx.get());
575
386k
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
576
386k
                latch.count_down();
577
386k
            });
578
386k
            if (LIKELY(submit_status.ok())) {
579
386k
                submitted_tasks++;
580
18.4E
            } else {
581
18.4E
                break;
582
18.4E
            }
583
386k
        }
584
22.1k
        latch.arrive_and_wait(target_size - submitted_tasks);
585
22.1k
        if (UNLIKELY(!submit_status.ok())) {
586
0
            return submit_status;
587
0
        }
588
409k
        for (int i = 0; i < submitted_tasks; i++) {
589
386k
            if (!prepare_status[i].ok()) {
590
0
                return prepare_status[i];
591
0
            }
592
386k
        }
593
420k
    } else {
594
1.40M
        for (int i = 0; i < target_size; i++) {
595
988k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
596
988k
        }
597
420k
    }
598
442k
    _pipeline_parent_map.clear();
599
442k
    _op_id_to_shared_state.clear();
600
601
442k
    return Status::OK();
602
442k
}
603
604
441k
void PipelineFragmentContext::_init_next_report_time() {
605
441k
    auto interval_s = config::pipeline_status_report_interval;
606
441k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
607
43.7k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
608
43.7k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
609
        // We don't want to wait longer than it takes to run the entire fragment.
610
43.7k
        _previous_report_time =
611
43.7k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
612
43.7k
        _disable_period_report = false;
613
43.7k
    }
614
441k
}
615
616
5.01k
void PipelineFragmentContext::refresh_next_report_time() {
617
5.01k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
618
5.01k
    DCHECK(disable == true);
619
5.01k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
620
5.01k
    _disable_period_report.compare_exchange_strong(disable, false);
621
5.01k
}
622
623
8.23M
void PipelineFragmentContext::trigger_report_if_necessary() {
624
8.23M
    if (!_is_report_success) {
625
7.56M
        return;
626
7.56M
    }
627
675k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
628
675k
    if (disable) {
629
11.0k
        return;
630
11.0k
    }
631
664k
    int32_t interval_s = config::pipeline_status_report_interval;
632
664k
    if (interval_s <= 0) {
633
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
634
0
                        "trigger "
635
0
                        "report.";
636
0
    }
637
664k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
638
664k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
639
664k
    if (MonotonicNanos() > next_report_time) {
640
5.01k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
641
5.01k
                                                            std::memory_order_acq_rel)) {
642
8
            return;
643
8
        }
644
5.00k
        if (VLOG_FILE_IS_ON) {
645
0
            VLOG_FILE << "Reporting "
646
0
                      << "profile for query_id " << print_id(_query_id)
647
0
                      << ", fragment id: " << _fragment_id;
648
649
0
            std::stringstream ss;
650
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
651
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
652
0
            if (_runtime_state->load_channel_profile()) {
653
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
654
0
            }
655
656
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
657
0
                      << " profile:\n"
658
0
                      << ss.str();
659
0
        }
660
5.00k
        auto st = send_report(false);
661
5.00k
        if (!st.ok()) {
662
0
            disable = true;
663
0
            _disable_period_report.compare_exchange_strong(disable, false,
664
0
                                                           std::memory_order_acq_rel);
665
0
        }
666
5.00k
    }
667
664k
}
668
669
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
670
442k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
671
442k
    if (_params.fragment.plan.nodes.empty()) {
672
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
673
0
    }
674
675
442k
    int node_idx = 0;
676
677
442k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
678
442k
                                        &node_idx, root, cur_pipe, 0, false, false));
679
680
442k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
681
0
        return Status::InternalError(
682
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
683
0
    }
684
442k
    return Status::OK();
685
442k
}
686
687
Status PipelineFragmentContext::_create_tree_helper(
688
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
689
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
690
695k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
691
    // propagate error case
692
695k
    if (*node_idx >= tnodes.size()) {
693
0
        return Status::InternalError(
694
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
695
0
                *node_idx, tnodes.size());
696
0
    }
697
695k
    const TPlanNode& tnode = tnodes[*node_idx];
698
699
695k
    int num_children = tnodes[*node_idx].num_children;
700
695k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
701
695k
    bool current_require_bucket_distribution = require_bucket_distribution;
702
    // TODO: Create CacheOperator is confused now
703
695k
    OperatorPtr op = nullptr;
704
695k
    OperatorPtr cache_op = nullptr;
705
695k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
706
695k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
707
695k
                                     followed_by_shuffled_operator,
708
695k
                                     current_require_bucket_distribution, cache_op));
709
    // Initialization must be done here. For example, group by expressions in agg will be used to
710
    // decide if a local shuffle should be planed, so it must be initialized here.
711
695k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
712
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
713
695k
    if (parent != nullptr) {
714
        // add to parent's child(s)
715
253k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
716
442k
    } else {
717
442k
        *root = op;
718
442k
    }
719
    /**
720
     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
721
     *
722
     * For plan:
723
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
724
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
725
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
726
     *
727
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
728
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
729
     */
730
695k
    auto required_data_distribution =
731
695k
            cur_pipe->operators().empty()
732
695k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
733
695k
                    : op->required_data_distribution(_runtime_state.get());
734
695k
    current_followed_by_shuffled_operator =
735
695k
            ((followed_by_shuffled_operator ||
736
695k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
737
630k
                                             : op->is_shuffled_operator())) &&
738
695k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
739
695k
            (followed_by_shuffled_operator &&
740
574k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
741
742
695k
    current_require_bucket_distribution =
743
695k
            ((require_bucket_distribution ||
744
695k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
745
635k
                                             : op->is_colocated_operator())) &&
746
695k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
747
695k
            (require_bucket_distribution &&
748
579k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
749
750
695k
    if (num_children == 0) {
751
457k
        _use_serial_source = op->is_serial_operator();
752
457k
    }
753
    // rely on that tnodes is preorder of the plan
754
949k
    for (int i = 0; i < num_children; i++) {
755
253k
        ++*node_idx;
756
253k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
757
253k
                                            current_followed_by_shuffled_operator,
758
253k
                                            current_require_bucket_distribution));
759
760
        // we are expecting a child, but have used all nodes
761
        // this means we have been given a bad tree and must fail
762
253k
        if (*node_idx >= tnodes.size()) {
763
0
            return Status::InternalError(
764
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
765
0
                    "nodes: {}",
766
0
                    *node_idx, tnodes.size());
767
0
        }
768
253k
    }
769
770
695k
    return Status::OK();
771
695k
}
772
773
void PipelineFragmentContext::_inherit_pipeline_properties(
774
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
775
122k
        PipelinePtr pipe_with_sink) {
776
122k
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
777
122k
    pipe_with_source->set_num_tasks(_num_instances);
778
122k
    pipe_with_source->set_data_distribution(data_distribution);
779
122k
}
780
781
Status PipelineFragmentContext::_add_local_exchange_impl(
782
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
783
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
784
        const std::map<int, int>& bucket_seq_to_instance_idx,
785
122k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
786
122k
    auto& operators = cur_pipe->operators();
787
122k
    const auto downstream_pipeline_id = cur_pipe->id();
788
122k
    auto local_exchange_id = next_operator_id();
789
    // 1. Create a new pipeline with local exchange sink.
790
122k
    DataSinkOperatorPtr sink;
791
122k
    auto sink_id = next_sink_operator_id();
792
793
    /**
794
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
795
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
796
     */
797
122k
    const bool followed_by_shuffled_operator =
798
122k
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
799
122k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
800
122k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
801
122k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
802
122k
                                         followed_by_shuffled_operator && !_use_serial_source;
803
122k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
804
122k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
805
122k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
806
122k
    if (bucket_seq_to_instance_idx.empty() &&
807
122k
        data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
808
14
        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
809
14
    }
810
122k
    RETURN_IF_ERROR(new_pip->set_sink(sink));
811
122k
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
812
122k
                                          num_buckets, use_global_hash_shuffle,
813
122k
                                          shuffle_idx_to_instance_idx));
814
815
    // 2. Create and initialize LocalExchangeSharedState.
816
122k
    std::shared_ptr<LocalExchangeSharedState> shared_state =
817
122k
            LocalExchangeSharedState::create_shared(_num_instances);
818
122k
    switch (data_distribution.distribution_type) {
819
24.4k
    case ExchangeType::HASH_SHUFFLE:
820
24.4k
        shared_state->exchanger = ShuffleExchanger::create_unique(
821
24.4k
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
822
24.4k
                use_global_hash_shuffle ? _total_instances : _num_instances,
823
24.4k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
824
24.4k
                        ? cast_set<int>(
825
24.4k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
826
24.4k
                        : 0);
827
24.4k
        break;
828
363
    case ExchangeType::BUCKET_HASH_SHUFFLE:
829
363
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
830
363
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
831
363
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
832
363
                        ? cast_set<int>(
833
363
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
834
363
                        : 0);
835
363
        break;
836
93.9k
    case ExchangeType::PASSTHROUGH:
837
93.9k
        shared_state->exchanger = PassthroughExchanger::create_unique(
838
93.9k
                cur_pipe->num_tasks(), _num_instances,
839
93.9k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
840
93.9k
                        ? cast_set<int>(
841
93.9k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
842
93.9k
                        : 0);
843
93.9k
        break;
844
321
    case ExchangeType::BROADCAST:
845
321
        shared_state->exchanger = BroadcastExchanger::create_unique(
846
321
                cur_pipe->num_tasks(), _num_instances,
847
321
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
848
321
                        ? cast_set<int>(
849
321
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
850
321
                        : 0);
851
321
        break;
852
2.34k
    case ExchangeType::PASS_TO_ONE:
853
2.34k
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
854
            // If shared hash table is enabled for BJ, hash table will be built by only one task
855
1.59k
            shared_state->exchanger = PassToOneExchanger::create_unique(
856
1.59k
                    cur_pipe->num_tasks(), _num_instances,
857
1.59k
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
858
1.59k
                            ? cast_set<int>(_runtime_state->query_options()
859
1.59k
                                                    .local_exchange_free_blocks_limit)
860
1.59k
                            : 0);
861
1.59k
        } else {
862
745
            shared_state->exchanger = BroadcastExchanger::create_unique(
863
745
                    cur_pipe->num_tasks(), _num_instances,
864
745
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
865
745
                            ? cast_set<int>(_runtime_state->query_options()
866
745
                                                    .local_exchange_free_blocks_limit)
867
745
                            : 0);
868
745
        }
869
2.34k
        break;
870
946
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
871
946
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
872
946
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
873
946
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
874
946
                        ? cast_set<int>(
875
946
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
876
946
                        : 0);
877
946
        break;
878
0
    default:
879
0
        return Status::InternalError("Unsupported local exchange type : " +
880
0
                                     std::to_string((int)data_distribution.distribution_type));
881
122k
    }
882
122k
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
883
122k
                                             "LOCAL_EXCHANGE_OPERATOR");
884
122k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
885
122k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
886
887
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
888
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
889
890
    // 3.1 Initialize new pipeline's operator list.
891
122k
    std::copy(operators.begin(), operators.begin() + idx,
892
122k
              std::inserter(new_pip->operators(), new_pip->operators().end()));
893
894
    // 3.2 Erase unused operators in previous pipeline.
895
122k
    operators.erase(operators.begin(), operators.begin() + idx);
896
897
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
898
122k
    OperatorPtr source_op;
899
122k
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
900
122k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
901
122k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
902
122k
    if (!operators.empty()) {
903
45.6k
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
904
45.6k
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
905
45.6k
    }
906
122k
    operators.insert(operators.begin(), source_op);
907
908
    // 5. Set children for two pipelines separately.
909
122k
    std::vector<std::shared_ptr<Pipeline>> new_children;
910
122k
    std::vector<PipelineId> edges_with_source;
911
140k
    for (auto child : cur_pipe->children()) {
912
140k
        bool found = false;
913
155k
        for (auto op : new_pip->operators()) {
914
155k
            if (child->sink()->node_id() == op->node_id()) {
915
13.4k
                new_pip->set_children(child);
916
13.4k
                found = true;
917
13.4k
            };
918
155k
        }
919
140k
        if (!found) {
920
127k
            new_children.push_back(child);
921
127k
            edges_with_source.push_back(child->id());
922
127k
        }
923
140k
    }
924
122k
    new_children.push_back(new_pip);
925
122k
    edges_with_source.push_back(new_pip->id());
926
927
    // 6. Set DAG for new pipelines.
928
122k
    if (!new_pip->children().empty()) {
929
7.86k
        std::vector<PipelineId> edges_with_sink;
930
13.4k
        for (auto child : new_pip->children()) {
931
13.4k
            edges_with_sink.push_back(child->id());
932
13.4k
        }
933
7.86k
        _dag.insert({new_pip->id(), edges_with_sink});
934
7.86k
    }
935
122k
    cur_pipe->set_children(new_children);
936
122k
    _dag[downstream_pipeline_id] = edges_with_source;
937
122k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
938
122k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
939
122k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
940
941
    // 7. Inherit properties from current pipeline.
942
122k
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
943
122k
    return Status::OK();
944
122k
}
945
946
Status PipelineFragmentContext::_add_local_exchange(
947
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
948
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
949
        const std::map<int, int>& bucket_seq_to_instance_idx,
950
197k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
951
197k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
952
49.7k
        return Status::OK();
953
49.7k
    }
954
955
148k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
956
50.7k
        return Status::OK();
957
50.7k
    }
958
97.2k
    *do_local_exchange = true;
959
960
97.2k
    auto& operators = cur_pipe->operators();
961
97.2k
    auto total_op_num = operators.size();
962
97.2k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
963
97.2k
    RETURN_IF_ERROR(_add_local_exchange_impl(
964
97.2k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
965
97.2k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
966
967
18.4E
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
968
18.4E
            << "total_op_num: " << total_op_num
969
18.4E
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
970
18.4E
            << " new_pip->operators().size(): " << new_pip->operators().size();
971
972
    // There are some local shuffles with relatively heavy operations on the sink.
973
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
974
    // Therefore, local passthrough is used to increase the concurrency of the sink.
975
    // op -> local sink(1) -> local source (n)
976
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
977
97.9k
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
978
97.2k
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
979
24.5k
        RETURN_IF_ERROR(_add_local_exchange_impl(
980
24.5k
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
981
24.5k
                add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
982
24.5k
                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
983
24.5k
                shuffle_idx_to_instance_idx));
984
24.5k
    }
985
97.2k
    return Status::OK();
986
97.2k
}
987
988
Status PipelineFragmentContext::_plan_local_exchange(
989
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
990
440k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
991
1.02M
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
992
580k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
993
        // Set property if child pipeline is not join operator's child.
994
580k
        if (!_pipelines[pip_idx]->children().empty()) {
995
132k
            for (auto& child : _pipelines[pip_idx]->children()) {
996
132k
                if (child->sink()->node_id() ==
997
132k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
998
119k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
999
119k
                }
1000
132k
            }
1001
127k
        }
1002
1003
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1004
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1005
        // still keep colocate plan after local shuffle
1006
580k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1007
580k
                                             bucket_seq_to_instance_idx,
1008
580k
                                             shuffle_idx_to_instance_idx));
1009
580k
    }
1010
440k
    return Status::OK();
1011
440k
}
1012
1013
Status PipelineFragmentContext::_plan_local_exchange(
1014
        int num_buckets, int pip_idx, PipelinePtr pip,
1015
        const std::map<int, int>& bucket_seq_to_instance_idx,
1016
579k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1017
579k
    int idx = 1;
1018
579k
    bool do_local_exchange = false;
1019
625k
    do {
1020
625k
        auto& ops = pip->operators();
1021
625k
        do_local_exchange = false;
1022
        // Plan local exchange for each operator.
1023
700k
        for (; idx < ops.size();) {
1024
121k
            if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1025
109k
                RETURN_IF_ERROR(_add_local_exchange(
1026
109k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
1027
109k
                        ops[idx]->required_data_distribution(_runtime_state.get()),
1028
109k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1029
109k
                        shuffle_idx_to_instance_idx));
1030
109k
            }
1031
121k
            if (do_local_exchange) {
1032
                // If local exchange is needed for current operator, we will split this pipeline to
1033
                // two pipelines by local exchange sink/source. And then we need to process remaining
1034
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1035
                // is current operator was already processed) and continue to plan local exchange.
1036
45.7k
                idx = 2;
1037
45.7k
                break;
1038
45.7k
            }
1039
75.5k
            idx++;
1040
75.5k
        }
1041
625k
    } while (do_local_exchange);
1042
579k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1043
88.8k
        RETURN_IF_ERROR(_add_local_exchange(
1044
88.8k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1045
88.8k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1046
88.8k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1047
88.8k
    }
1048
579k
    return Status::OK();
1049
579k
}
1050
1051
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1052
                                                  const std::vector<TExpr>& output_exprs,
1053
                                                  const TPipelineFragmentParams& params,
1054
                                                  const RowDescriptor& row_desc,
1055
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1056
443k
                                                  PipelineId cur_pipeline_id) {
1057
443k
    switch (thrift_sink.type) {
1058
154k
    case TDataSinkType::DATA_STREAM_SINK: {
1059
154k
        if (!thrift_sink.__isset.stream_sink) {
1060
0
            return Status::InternalError("Missing data stream sink.");
1061
0
        }
1062
154k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1063
154k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1064
154k
                params.destinations, _fragment_instance_ids);
1065
154k
        break;
1066
154k
    }
1067
251k
    case TDataSinkType::RESULT_SINK: {
1068
251k
        if (!thrift_sink.__isset.result_sink) {
1069
0
            return Status::InternalError("Missing data buffer sink.");
1070
0
        }
1071
1072
251k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), row_desc,
1073
251k
                                                      output_exprs, thrift_sink.result_sink);
1074
251k
        break;
1075
251k
    }
1076
105
    case TDataSinkType::DICTIONARY_SINK: {
1077
105
        if (!thrift_sink.__isset.dictionary_sink) {
1078
0
            return Status::InternalError("Missing dict sink.");
1079
0
        }
1080
1081
105
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1082
105
                                                    thrift_sink.dictionary_sink);
1083
105
        break;
1084
105
    }
1085
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1086
30.9k
    case TDataSinkType::OLAP_TABLE_SINK: {
1087
30.9k
        if (state->query_options().enable_memtable_on_sink_node &&
1088
30.9k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1089
30.9k
            !config::is_cloud_mode()) {
1090
2.07k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(pool, next_sink_operator_id(),
1091
2.07k
                                                               row_desc, output_exprs);
1092
28.8k
        } else {
1093
28.8k
            _sink = std::make_shared<OlapTableSinkOperatorX>(pool, next_sink_operator_id(),
1094
28.8k
                                                             row_desc, output_exprs);
1095
28.8k
        }
1096
30.9k
        break;
1097
0
    }
1098
165
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1099
165
        DCHECK(thrift_sink.__isset.olap_table_sink);
1100
165
        DCHECK(state->get_query_ctx() != nullptr);
1101
165
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1102
165
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1103
165
                                                                output_exprs);
1104
165
        break;
1105
0
    }
1106
1.46k
    case TDataSinkType::HIVE_TABLE_SINK: {
1107
1.46k
        if (!thrift_sink.__isset.hive_table_sink) {
1108
0
            return Status::InternalError("Missing hive table sink.");
1109
0
        }
1110
1.46k
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1111
1.46k
                                                         output_exprs);
1112
1.46k
        break;
1113
1.46k
    }
1114
1.62k
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1115
1.62k
        if (!thrift_sink.__isset.iceberg_table_sink) {
1116
0
            return Status::InternalError("Missing iceberg table sink.");
1117
0
        }
1118
1.62k
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1119
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1120
0
                                                                     row_desc, output_exprs);
1121
1.62k
        } else {
1122
1.62k
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1123
1.62k
                                                                row_desc, output_exprs);
1124
1.62k
        }
1125
1.62k
        break;
1126
1.62k
    }
1127
12
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1128
12
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1129
0
            return Status::InternalError("Missing iceberg delete sink.");
1130
0
        }
1131
12
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1132
12
                                                             row_desc, output_exprs);
1133
12
        break;
1134
12
    }
1135
64
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1136
64
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1137
0
            return Status::InternalError("Missing iceberg merge sink.");
1138
0
        }
1139
64
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1140
64
                                                            output_exprs);
1141
64
        break;
1142
64
    }
1143
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1144
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1145
0
            return Status::InternalError("Missing max compute table sink.");
1146
0
        }
1147
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1148
0
                                                       output_exprs);
1149
0
        break;
1150
0
    }
1151
80
    case TDataSinkType::JDBC_TABLE_SINK: {
1152
80
        if (!thrift_sink.__isset.jdbc_table_sink) {
1153
0
            return Status::InternalError("Missing data jdbc sink.");
1154
0
        }
1155
80
        if (config::enable_java_support) {
1156
80
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1157
80
                                                             output_exprs);
1158
80
        } else {
1159
0
            return Status::InternalError(
1160
0
                    "Jdbc table sink is not enabled, you can change be config "
1161
0
                    "enable_java_support to true and restart be.");
1162
0
        }
1163
80
        break;
1164
80
    }
1165
80
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1166
3
        if (!thrift_sink.__isset.memory_scratch_sink) {
1167
0
            return Status::InternalError("Missing data buffer sink.");
1168
0
        }
1169
1170
3
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1171
3
                                                             output_exprs);
1172
3
        break;
1173
3
    }
1174
473
    case TDataSinkType::RESULT_FILE_SINK: {
1175
473
        if (!thrift_sink.__isset.result_file_sink) {
1176
0
            return Status::InternalError("Missing result file sink.");
1177
0
        }
1178
1179
        // Result file sink is not the top sink
1180
473
        if (params.__isset.destinations && !params.destinations.empty()) {
1181
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1182
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1183
0
                    params.destinations, output_exprs, desc_tbl);
1184
473
        } else {
1185
473
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1186
473
                                                              output_exprs);
1187
473
        }
1188
473
        break;
1189
473
    }
1190
2.33k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1191
2.33k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1192
2.33k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1193
2.33k
        auto sink_id = next_sink_operator_id();
1194
2.33k
        const int multi_cast_node_id = sink_id;
1195
2.33k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1196
        // one sink has multiple sources.
1197
2.33k
        std::vector<int> sources;
1198
9.12k
        for (int i = 0; i < sender_size; ++i) {
1199
6.78k
            auto source_id = next_operator_id();
1200
6.78k
            sources.push_back(source_id);
1201
6.78k
        }
1202
1203
2.33k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1204
2.33k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1205
9.12k
        for (int i = 0; i < sender_size; ++i) {
1206
6.78k
            auto new_pipeline = add_pipeline();
1207
            // use to exchange sink
1208
6.78k
            RowDescriptor* exchange_row_desc = nullptr;
1209
6.78k
            {
1210
6.78k
                const auto& tmp_row_desc =
1211
6.78k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1212
6.78k
                                ? RowDescriptor(state->desc_tbl(),
1213
6.78k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1214
6.78k
                                                         .output_tuple_id})
1215
6.78k
                                : row_desc;
1216
6.78k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1217
6.78k
            }
1218
6.78k
            auto source_id = sources[i];
1219
6.78k
            OperatorPtr source_op;
1220
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1221
6.78k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1222
6.78k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1223
6.78k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1224
6.78k
                    /*operator_id=*/source_id);
1225
6.78k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1226
6.78k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1227
            // 2. create and set sink operator of data stream sender for new pipeline
1228
1229
6.78k
            DataSinkOperatorPtr sink_op;
1230
6.78k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1231
6.78k
                    state, *exchange_row_desc, next_sink_operator_id(),
1232
6.78k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1233
6.78k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1234
1235
6.78k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1236
6.78k
            {
1237
6.78k
                TDataSink* t = pool->add(new TDataSink());
1238
6.78k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1239
6.78k
                RETURN_IF_ERROR(sink_op->init(*t));
1240
6.78k
            }
1241
1242
            // 3. set dependency dag
1243
6.78k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1244
6.78k
        }
1245
2.33k
        if (sources.empty()) {
1246
0
            return Status::InternalError("size of sources must be greater than 0");
1247
0
        }
1248
2.33k
        break;
1249
2.33k
    }
1250
2.33k
    case TDataSinkType::BLACKHOLE_SINK: {
1251
13
        if (!thrift_sink.__isset.blackhole_sink) {
1252
0
            return Status::InternalError("Missing blackhole sink.");
1253
0
        }
1254
1255
13
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1256
13
        break;
1257
13
    }
1258
156
    case TDataSinkType::TVF_TABLE_SINK: {
1259
156
        if (!thrift_sink.__isset.tvf_table_sink) {
1260
0
            return Status::InternalError("Missing TVF table sink.");
1261
0
        }
1262
156
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1263
156
                                                        output_exprs);
1264
156
        break;
1265
156
    }
1266
0
    default:
1267
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1268
443k
    }
1269
443k
    return Status::OK();
1270
443k
}
1271
1272
// NOLINTBEGIN(readability-function-size)
1273
// NOLINTBEGIN(readability-function-cognitive-complexity)
1274
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1275
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1276
                                                 PipelinePtr& cur_pipe, int parent_idx,
1277
                                                 int child_idx,
1278
                                                 const bool followed_by_shuffled_operator,
1279
                                                 const bool require_bucket_distribution,
1280
696k
                                                 OperatorPtr& cache_op) {
1281
696k
    std::vector<DataSinkOperatorPtr> sink_ops;
1282
696k
    Defer defer = Defer([&]() {
1283
696k
        if (op) {
1284
696k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1285
696k
        }
1286
696k
        for (auto& s : sink_ops) {
1287
132k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1288
132k
        }
1289
696k
    });
1290
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1291
    // Therefore, here we need to use a stack-like structure.
1292
696k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1293
696k
    std::stringstream error_msg;
1294
696k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1295
1296
696k
    bool fe_with_old_version = false;
1297
696k
    switch (tnode.node_type) {
1298
211k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1299
211k
        op = std::make_shared<OlapScanOperatorX>(
1300
211k
                pool, tnode, next_operator_id(), descs, _num_instances,
1301
211k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1302
211k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1303
211k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1304
211k
        break;
1305
211k
    }
1306
74
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1307
74
        DCHECK(_query_ctx != nullptr);
1308
74
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1309
74
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1310
74
                                                    _num_instances);
1311
74
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1312
74
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1313
74
        break;
1314
74
    }
1315
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1316
0
        if (config::enable_java_support) {
1317
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1318
0
                                                     _num_instances);
1319
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1320
0
        } else {
1321
0
            return Status::InternalError(
1322
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1323
0
                    "to true and restart be.");
1324
0
        }
1325
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1326
0
        break;
1327
0
    }
1328
22.0k
    case TPlanNodeType::FILE_SCAN_NODE: {
1329
22.0k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1330
22.0k
                                                 _num_instances);
1331
22.0k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1332
22.0k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1333
22.0k
        break;
1334
22.0k
    }
1335
0
    case TPlanNodeType::ES_SCAN_NODE:
1336
592
    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
1337
592
        op = std::make_shared<EsScanOperatorX>(pool, tnode, next_operator_id(), descs,
1338
592
                                               _num_instances);
1339
592
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1340
592
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1341
592
        break;
1342
592
    }
1343
158k
    case TPlanNodeType::EXCHANGE_NODE: {
1344
158k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1345
159k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1346
18.4E
                                  : 0;
1347
158k
        DCHECK_GT(num_senders, 0);
1348
158k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1349
158k
                                                       num_senders);
1350
158k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1351
158k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1352
158k
        break;
1353
158k
    }
1354
172k
    case TPlanNodeType::AGGREGATION_NODE: {
1355
172k
        if (tnode.agg_node.grouping_exprs.empty() &&
1356
172k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1357
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1358
0
                                         ": group by and output is empty");
1359
0
        }
1360
172k
        bool need_create_cache_op =
1361
172k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1362
172k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1363
24
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1364
24
            auto cache_source_id = next_operator_id();
1365
24
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1366
24
                                                        _params.fragment.query_cache_param);
1367
24
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1368
1369
24
            const auto downstream_pipeline_id = cur_pipe->id();
1370
24
            if (!_dag.contains(downstream_pipeline_id)) {
1371
24
                _dag.insert({downstream_pipeline_id, {}});
1372
24
            }
1373
24
            new_pipe = add_pipeline(cur_pipe);
1374
24
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1375
1376
24
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1377
24
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1378
24
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1379
24
            return Status::OK();
1380
24
        };
1381
172k
        const bool group_by_limit_opt =
1382
172k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1383
1384
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1385
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1386
172k
        const bool enable_spill = _runtime_state->enable_spill() &&
1387
172k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1388
172k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1389
172k
                                      tnode.agg_node.use_streaming_preaggregation &&
1390
172k
                                      !tnode.agg_node.grouping_exprs.empty();
1391
        // TODO: distinct streaming agg does not support spill.
1392
172k
        const bool can_use_distinct_streaming_agg =
1393
172k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1394
172k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1395
172k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1396
172k
                _params.query_options.enable_distinct_streaming_aggregation;
1397
1398
172k
        if (can_use_distinct_streaming_agg) {
1399
99.4k
            if (need_create_cache_op) {
1400
8
                PipelinePtr new_pipe;
1401
8
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1402
1403
8
                cache_op = op;
1404
8
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1405
8
                                                                     tnode, descs);
1406
8
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1407
8
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1408
8
                cur_pipe = new_pipe;
1409
99.4k
            } else {
1410
99.4k
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1411
99.4k
                                                                     tnode, descs);
1412
99.4k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1413
99.4k
            }
1414
99.4k
        } else if (is_streaming_agg) {
1415
3.42k
            if (need_create_cache_op) {
1416
4
                PipelinePtr new_pipe;
1417
4
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1418
4
                cache_op = op;
1419
4
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1420
4
                                                             descs);
1421
4
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1422
4
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1423
4
                cur_pipe = new_pipe;
1424
3.41k
            } else {
1425
3.41k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1426
3.41k
                                                             descs);
1427
3.41k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1428
3.41k
            }
1429
70.0k
        } else {
1430
            // create new pipeline to add query cache operator
1431
70.0k
            PipelinePtr new_pipe;
1432
70.0k
            if (need_create_cache_op) {
1433
12
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1434
12
                cache_op = op;
1435
12
            }
1436
1437
70.0k
            if (enable_spill) {
1438
592
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1439
592
                                                                     next_operator_id(), descs);
1440
69.4k
            } else {
1441
69.4k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1442
69.4k
            }
1443
70.0k
            if (need_create_cache_op) {
1444
12
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1445
12
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1446
12
                cur_pipe = new_pipe;
1447
70.0k
            } else {
1448
70.0k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1449
70.0k
            }
1450
1451
70.0k
            const auto downstream_pipeline_id = cur_pipe->id();
1452
70.0k
            if (!_dag.contains(downstream_pipeline_id)) {
1453
67.5k
                _dag.insert({downstream_pipeline_id, {}});
1454
67.5k
            }
1455
70.0k
            cur_pipe = add_pipeline(cur_pipe);
1456
70.0k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1457
1458
70.0k
            if (enable_spill) {
1459
592
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1460
592
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1461
69.4k
            } else {
1462
69.4k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1463
69.4k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1464
69.4k
            }
1465
70.0k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1466
70.0k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1467
70.0k
        }
1468
172k
        break;
1469
172k
    }
1470
172k
    case TPlanNodeType::HASH_JOIN_NODE: {
1471
8.05k
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1472
8.05k
                                       tnode.hash_join_node.is_broadcast_join;
1473
8.05k
        const auto enable_spill = _runtime_state->enable_spill();
1474
8.05k
        if (enable_spill && !is_broadcast_join) {
1475
0
            auto tnode_ = tnode;
1476
0
            tnode_.runtime_filters.clear();
1477
0
            auto inner_probe_operator =
1478
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1479
1480
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1481
            // So here use `tnode_` which has no runtime filters.
1482
0
            auto probe_side_inner_sink_operator =
1483
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1484
1485
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1486
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1487
1488
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1489
0
                    pool, tnode_, next_operator_id(), descs);
1490
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1491
0
                                                inner_probe_operator);
1492
0
            op = std::move(probe_operator);
1493
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1494
1495
0
            const auto downstream_pipeline_id = cur_pipe->id();
1496
0
            if (!_dag.contains(downstream_pipeline_id)) {
1497
0
                _dag.insert({downstream_pipeline_id, {}});
1498
0
            }
1499
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1500
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1501
1502
0
            auto inner_sink_operator =
1503
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1504
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1505
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1506
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1507
1508
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1509
0
            sink_ops.push_back(std::move(sink_operator));
1510
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1511
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1512
1513
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1514
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1515
8.05k
        } else {
1516
8.05k
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1517
8.05k
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1518
1519
8.05k
            const auto downstream_pipeline_id = cur_pipe->id();
1520
8.05k
            if (!_dag.contains(downstream_pipeline_id)) {
1521
7.24k
                _dag.insert({downstream_pipeline_id, {}});
1522
7.24k
            }
1523
8.05k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1524
8.05k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1525
1526
8.05k
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1527
8.05k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1528
8.05k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1529
8.05k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1530
1531
8.05k
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1532
8.05k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1533
8.05k
        }
1534
8.05k
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1535
3.42k
            std::shared_ptr<HashJoinSharedState> shared_state =
1536
3.42k
                    HashJoinSharedState::create_shared(_num_instances);
1537
22.4k
            for (int i = 0; i < _num_instances; i++) {
1538
18.9k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1539
18.9k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1540
18.9k
                sink_dep->set_shared_state(shared_state.get());
1541
18.9k
                shared_state->sink_deps.push_back(sink_dep);
1542
18.9k
            }
1543
3.42k
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1544
3.42k
                                                     op->node_id(), "HASH_JOIN_PROBE");
1545
3.42k
            _op_id_to_shared_state.insert(
1546
3.42k
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1547
3.42k
        }
1548
8.05k
        break;
1549
8.05k
    }
1550
5.38k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1551
5.38k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1552
5.38k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1553
1554
5.38k
        const auto downstream_pipeline_id = cur_pipe->id();
1555
5.38k
        if (!_dag.contains(downstream_pipeline_id)) {
1556
5.14k
            _dag.insert({downstream_pipeline_id, {}});
1557
5.14k
        }
1558
5.38k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1559
5.38k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1560
1561
5.38k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1562
5.38k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1563
5.38k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1564
5.38k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1565
5.38k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1566
5.38k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1567
5.38k
        break;
1568
5.38k
    }
1569
53.4k
    case TPlanNodeType::UNION_NODE: {
1570
53.4k
        int child_count = tnode.num_children;
1571
53.4k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1572
53.4k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1573
1574
53.4k
        const auto downstream_pipeline_id = cur_pipe->id();
1575
53.4k
        if (!_dag.contains(downstream_pipeline_id)) {
1576
53.2k
            _dag.insert({downstream_pipeline_id, {}});
1577
53.2k
        }
1578
55.3k
        for (int i = 0; i < child_count; i++) {
1579
1.83k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1580
1.83k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1581
1.83k
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1582
1.83k
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1583
1.83k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1584
1.83k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1585
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1586
1.83k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1587
1.83k
        }
1588
53.4k
        break;
1589
53.4k
    }
1590
53.4k
    case TPlanNodeType::SORT_NODE: {
1591
44.6k
        const auto should_spill = _runtime_state->enable_spill() &&
1592
44.6k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1593
44.6k
        const bool use_local_merge =
1594
44.6k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1595
44.6k
        if (should_spill) {
1596
9
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1597
44.5k
        } else if (use_local_merge) {
1598
39.8k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1599
39.8k
                                                                 descs);
1600
39.8k
        } else {
1601
4.71k
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1602
4.71k
        }
1603
44.6k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1604
1605
44.6k
        const auto downstream_pipeline_id = cur_pipe->id();
1606
44.6k
        if (!_dag.contains(downstream_pipeline_id)) {
1607
44.5k
            _dag.insert({downstream_pipeline_id, {}});
1608
44.5k
        }
1609
44.6k
        cur_pipe = add_pipeline(cur_pipe);
1610
44.6k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1611
1612
44.6k
        if (should_spill) {
1613
9
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1614
9
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1615
44.5k
        } else {
1616
44.5k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1617
44.5k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1618
44.5k
        }
1619
44.6k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1620
44.6k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1621
44.6k
        break;
1622
44.6k
    }
1623
44.6k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1624
64
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1625
64
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1626
1627
64
        const auto downstream_pipeline_id = cur_pipe->id();
1628
64
        if (!_dag.contains(downstream_pipeline_id)) {
1629
64
            _dag.insert({downstream_pipeline_id, {}});
1630
64
        }
1631
64
        cur_pipe = add_pipeline(cur_pipe);
1632
64
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1633
1634
64
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1635
64
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1636
64
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1637
64
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1638
64
        break;
1639
64
    }
1640
1.81k
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1641
1.81k
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1642
1.81k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1643
1644
1.81k
        const auto downstream_pipeline_id = cur_pipe->id();
1645
1.81k
        if (!_dag.contains(downstream_pipeline_id)) {
1646
1.79k
            _dag.insert({downstream_pipeline_id, {}});
1647
1.79k
        }
1648
1.81k
        cur_pipe = add_pipeline(cur_pipe);
1649
1.81k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1650
1651
1.81k
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1652
1.81k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1653
1.81k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1654
1.81k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1655
1.81k
        break;
1656
1.81k
    }
1657
1.81k
    case TPlanNodeType::MATERIALIZATION_NODE: {
1658
1.61k
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1659
1.61k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1660
1.61k
        break;
1661
1.61k
    }
1662
1.61k
    case TPlanNodeType::INTERSECT_NODE: {
1663
119
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1664
119
                                                                      cur_pipe, sink_ops));
1665
119
        break;
1666
119
    }
1667
129
    case TPlanNodeType::EXCEPT_NODE: {
1668
129
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1669
129
                                                                       cur_pipe, sink_ops));
1670
129
        break;
1671
129
    }
1672
312
    case TPlanNodeType::REPEAT_NODE: {
1673
312
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1674
312
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1675
312
        break;
1676
312
    }
1677
926
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1678
926
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1679
926
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1680
926
        break;
1681
926
    }
1682
926
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1683
218
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1684
218
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1685
218
        break;
1686
218
    }
1687
1.69k
    case TPlanNodeType::EMPTY_SET_NODE: {
1688
1.69k
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1689
1.69k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1690
1.69k
        break;
1691
1.69k
    }
1692
1.69k
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1693
459
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1694
459
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1695
459
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1696
459
        break;
1697
459
    }
1698
2.31k
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1699
2.31k
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1700
2.31k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1701
2.31k
        break;
1702
2.31k
    }
1703
6.56k
    case TPlanNodeType::META_SCAN_NODE: {
1704
6.56k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1705
6.56k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1706
6.56k
        break;
1707
6.56k
    }
1708
6.56k
    case TPlanNodeType::SELECT_NODE: {
1709
2.09k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1710
2.09k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1711
2.09k
        break;
1712
2.09k
    }
1713
2.09k
    case TPlanNodeType::REC_CTE_NODE: {
1714
151
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1715
151
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1716
1717
151
        const auto downstream_pipeline_id = cur_pipe->id();
1718
151
        if (!_dag.contains(downstream_pipeline_id)) {
1719
148
            _dag.insert({downstream_pipeline_id, {}});
1720
148
        }
1721
1722
151
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1723
151
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1724
1725
151
        DataSinkOperatorPtr anchor_sink;
1726
151
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1727
151
                                                                  op->operator_id(), tnode, descs);
1728
151
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1729
151
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1730
151
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1731
1732
151
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1733
151
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1734
1735
151
        DataSinkOperatorPtr rec_sink;
1736
151
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
1737
151
                                                         tnode, descs);
1738
151
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
1739
151
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
1740
151
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
1741
1742
151
        break;
1743
151
    }
1744
1.95k
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
1745
1.95k
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
1746
1.95k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1747
1.95k
        break;
1748
1.95k
    }
1749
1.95k
    default:
1750
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
1751
0
                                     print_plan_node_type(tnode.node_type));
1752
696k
    }
1753
696k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
1754
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
1755
0
        op->set_serial_operator();
1756
0
    }
1757
1758
696k
    return Status::OK();
1759
696k
}
1760
// NOLINTEND(readability-function-cognitive-complexity)
1761
// NOLINTEND(readability-function-size)
1762
1763
template <bool is_intersect>
1764
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
1765
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1766
248
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1767
248
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1768
248
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1769
1770
248
    const auto downstream_pipeline_id = cur_pipe->id();
1771
248
    if (!_dag.contains(downstream_pipeline_id)) {
1772
231
        _dag.insert({downstream_pipeline_id, {}});
1773
231
    }
1774
1775
837
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1776
589
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1777
589
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1778
1779
589
        if (child_id == 0) {
1780
248
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1781
248
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1782
341
        } else {
1783
341
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1784
341
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
341
        }
1786
589
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1787
589
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1788
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1789
589
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1790
589
    }
1791
1792
248
    return Status::OK();
1793
248
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1766
119
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1767
119
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1768
119
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1769
1770
119
    const auto downstream_pipeline_id = cur_pipe->id();
1771
119
    if (!_dag.contains(downstream_pipeline_id)) {
1772
110
        _dag.insert({downstream_pipeline_id, {}});
1773
110
    }
1774
1775
435
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1776
316
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1777
316
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1778
1779
316
        if (child_id == 0) {
1780
119
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1781
119
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1782
197
        } else {
1783
197
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1784
197
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
197
        }
1786
316
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1787
316
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1788
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1789
316
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1790
316
    }
1791
1792
119
    return Status::OK();
1793
119
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1766
129
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1767
129
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1768
129
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1769
1770
129
    const auto downstream_pipeline_id = cur_pipe->id();
1771
129
    if (!_dag.contains(downstream_pipeline_id)) {
1772
121
        _dag.insert({downstream_pipeline_id, {}});
1773
121
    }
1774
1775
402
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1776
273
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1777
273
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1778
1779
273
        if (child_id == 0) {
1780
129
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1781
129
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1782
144
        } else {
1783
144
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1784
144
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
144
        }
1786
273
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1787
273
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1788
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1789
273
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1790
273
    }
1791
1792
129
    return Status::OK();
1793
129
}
1794
1795
441k
Status PipelineFragmentContext::submit() {
1796
441k
    if (_submitted) {
1797
0
        return Status::InternalError("submitted");
1798
0
    }
1799
441k
    _submitted = true;
1800
1801
441k
    int submit_tasks = 0;
1802
441k
    Status st;
1803
441k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
1804
1.37M
    for (auto& task : _tasks) {
1805
2.41M
        for (auto& t : task) {
1806
2.41M
            st = scheduler->submit(t.first);
1807
2.41M
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
1808
2.41M
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
1809
2.41M
            if (!st) {
1810
0
                cancel(Status::InternalError("submit context to executor fail"));
1811
0
                std::lock_guard<std::mutex> l(_task_mutex);
1812
0
                _total_tasks = submit_tasks;
1813
0
                break;
1814
0
            }
1815
2.41M
            submit_tasks++;
1816
2.41M
        }
1817
1.37M
    }
1818
441k
    if (!st.ok()) {
1819
0
        bool need_remove = false;
1820
0
        {
1821
0
            std::lock_guard<std::mutex> l(_task_mutex);
1822
0
            if (_closed_tasks >= _total_tasks) {
1823
0
                need_remove = _close_fragment_instance();
1824
0
            }
1825
0
        }
1826
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1827
0
        if (need_remove) {
1828
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1829
0
        }
1830
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
1831
0
                                     BackendOptions::get_localhost());
1832
441k
    } else {
1833
441k
        return st;
1834
441k
    }
1835
441k
}
1836
1837
42
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
1838
42
    if (_runtime_state->enable_profile()) {
1839
0
        std::stringstream ss;
1840
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1841
0
            runtime_profile_ptr->pretty_print(&ss);
1842
0
        }
1843
1844
0
        if (_runtime_state->load_channel_profile()) {
1845
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1846
0
        }
1847
1848
0
        auto profile_str =
1849
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
1850
0
                            this->_fragment_id, extra_info, ss.str());
1851
0
        LOG_LONG_STRING(INFO, profile_str);
1852
0
    }
1853
42
}
1854
// If all pipeline tasks binded to the fragment instance are finished, then we could
1855
// close the fragment instance.
1856
// Returns true if the caller should call remove_pipeline_context() **after** releasing
1857
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
1858
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
1859
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
1860
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
1861
// (via debug_string()).
1862
442k
bool PipelineFragmentContext::_close_fragment_instance() {
1863
442k
    if (_is_fragment_instance_closed) {
1864
0
        return false;
1865
0
    }
1866
442k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
1867
442k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1868
442k
    if (!_need_notify_close) {
1869
439k
        auto st = send_report(true);
1870
439k
        if (!st) {
1871
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1872
0
                                        print_id(_query_id), _fragment_id, st.to_string());
1873
0
        }
1874
439k
    }
1875
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
1876
    // Since stream load does not have someting like coordinator on FE, so
1877
    // backend can not report profile to FE, ant its profile can not be shown
1878
    // in the same way with other query. So we print the profile content to info log.
1879
1880
442k
    if (_runtime_state->enable_profile() &&
1881
442k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
1882
2.41k
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
1883
2.41k
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
1884
0
        std::stringstream ss;
1885
        // Compute the _local_time_percent before pretty_print the runtime_profile
1886
        // Before add this operation, the print out like that:
1887
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
1888
        // After add the operation, the print out like that:
1889
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
1890
        // We can easily know the exec node execute time without child time consumed.
1891
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1892
0
            runtime_profile_ptr->pretty_print(&ss);
1893
0
        }
1894
1895
0
        if (_runtime_state->load_channel_profile()) {
1896
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1897
0
        }
1898
1899
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
1900
0
    }
1901
1902
442k
    if (_query_ctx->enable_profile()) {
1903
2.41k
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
1904
2.41k
                                         collect_realtime_load_channel_profile());
1905
2.41k
    }
1906
1907
    // Return whether the caller needs to remove from the pipeline map.
1908
    // The caller must do this after releasing _task_mutex.
1909
442k
    return !_need_notify_close;
1910
442k
}
1911
1912
2.40M
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
1913
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
1914
2.40M
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
1915
2.40M
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
1916
704k
        if (_dag.contains(pipeline_id)) {
1917
384k
            for (auto dep : _dag[pipeline_id]) {
1918
384k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
1919
384k
            }
1920
304k
        }
1921
704k
    }
1922
2.40M
    bool need_remove = false;
1923
2.40M
    {
1924
2.40M
        std::lock_guard<std::mutex> l(_task_mutex);
1925
2.40M
        ++_closed_tasks;
1926
2.40M
        if (_closed_tasks >= _total_tasks) {
1927
442k
            need_remove = _close_fragment_instance();
1928
442k
        }
1929
2.40M
    }
1930
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1931
2.40M
    if (need_remove) {
1932
439k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1933
439k
    }
1934
2.40M
}
1935
1936
55.8k
std::string PipelineFragmentContext::get_load_error_url() {
1937
55.8k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
1938
0
        return to_load_error_http_path(str);
1939
0
    }
1940
167k
    for (auto& tasks : _tasks) {
1941
270k
        for (auto& task : tasks) {
1942
270k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
1943
162
                return to_load_error_http_path(str);
1944
162
            }
1945
270k
        }
1946
167k
    }
1947
55.6k
    return "";
1948
55.8k
}
1949
1950
55.8k
std::string PipelineFragmentContext::get_first_error_msg() {
1951
55.8k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
1952
0
        return str;
1953
0
    }
1954
167k
    for (auto& tasks : _tasks) {
1955
270k
        for (auto& task : tasks) {
1956
270k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
1957
162
                return str;
1958
162
            }
1959
270k
        }
1960
167k
    }
1961
55.6k
    return "";
1962
55.8k
}
1963
1964
444k
Status PipelineFragmentContext::send_report(bool done) {
1965
444k
    Status exec_status = _query_ctx->exec_status();
1966
    // If plan is done successfully, but _is_report_success is false,
1967
    // no need to send report.
1968
    // Load will set _is_report_success to true because load wants to know
1969
    // the process.
1970
444k
    if (!_is_report_success && done && exec_status.ok()) {
1971
394k
        return Status::OK();
1972
394k
    }
1973
1974
    // If both _is_report_success and _is_report_on_cancel are false,
1975
    // which means no matter query is success or failed, no report is needed.
1976
    // This may happen when the query limit reached and
1977
    // a internal cancellation being processed
1978
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
1979
    // be set to false, to avoid sending fault report to FE.
1980
50.4k
    if (!_is_report_success && !_is_report_on_cancel) {
1981
280
        if (done) {
1982
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
1983
280
            return Status::OK();
1984
280
        }
1985
0
        return Status::NeedSendAgain("");
1986
280
    }
1987
1988
50.1k
    std::vector<RuntimeState*> runtime_states;
1989
1990
145k
    for (auto& tasks : _tasks) {
1991
221k
        for (auto& task : tasks) {
1992
221k
            runtime_states.push_back(task.second.get());
1993
221k
        }
1994
145k
    }
1995
1996
50.1k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
1997
50.2k
                                        ? get_load_error_url()
1998
18.4E
                                        : _query_ctx->get_load_error_url();
1999
50.1k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2000
50.2k
                                          ? get_first_error_msg()
2001
18.4E
                                          : _query_ctx->get_first_error_msg();
2002
2003
50.1k
    ReportStatusRequest req {.status = exec_status,
2004
50.1k
                             .runtime_states = runtime_states,
2005
50.1k
                             .done = done || !exec_status.ok(),
2006
50.1k
                             .coord_addr = _query_ctx->coord_addr,
2007
50.1k
                             .query_id = _query_id,
2008
50.1k
                             .fragment_id = _fragment_id,
2009
50.1k
                             .fragment_instance_id = TUniqueId(),
2010
50.1k
                             .backend_num = -1,
2011
50.1k
                             .runtime_state = _runtime_state.get(),
2012
50.1k
                             .load_error_url = load_eror_url,
2013
50.1k
                             .first_error_msg = first_error_msg,
2014
50.1k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2015
2016
50.1k
    return _report_status_cb(
2017
50.1k
            req, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
2018
50.4k
}
2019
2020
50
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2021
50
    size_t res = 0;
2022
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2023
    // here to traverse the vector.
2024
50
    for (const auto& task_instances : _tasks) {
2025
130
        for (const auto& task : task_instances) {
2026
130
            if (task.first->is_running()) {
2027
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2028
0
                                      << " is running, task: " << (void*)task.first.get()
2029
0
                                      << ", is_running: " << task.first->is_running();
2030
0
                *has_running_task = true;
2031
0
                return 0;
2032
0
            }
2033
2034
130
            size_t revocable_size = task.first->get_revocable_size();
2035
130
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2036
12
                res += revocable_size;
2037
12
            }
2038
130
        }
2039
50
    }
2040
50
    return res;
2041
50
}
2042
2043
100
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2044
100
    std::vector<PipelineTask*> revocable_tasks;
2045
100
    for (const auto& task_instances : _tasks) {
2046
260
        for (const auto& task : task_instances) {
2047
260
            size_t revocable_size_ = task.first->get_revocable_size();
2048
2049
260
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2050
24
                revocable_tasks.emplace_back(task.first.get());
2051
24
            }
2052
260
        }
2053
100
    }
2054
100
    return revocable_tasks;
2055
100
}
2056
2057
62
std::string PipelineFragmentContext::debug_string() {
2058
62
    std::lock_guard<std::mutex> l(_task_mutex);
2059
62
    fmt::memory_buffer debug_string_buffer;
2060
62
    fmt::format_to(debug_string_buffer,
2061
62
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2062
62
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2063
62
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2064
229
    for (size_t j = 0; j < _tasks.size(); j++) {
2065
167
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2066
595
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2067
428
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2068
428
                           _tasks[j][i].first->debug_string());
2069
428
        }
2070
167
    }
2071
2072
62
    return fmt::to_string(debug_string_buffer);
2073
62
}
2074
2075
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2076
2.41k
PipelineFragmentContext::collect_realtime_profile() const {
2077
2.41k
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2078
2079
    // we do not have mutex to protect pipeline_id_to_profile
2080
    // so we need to make sure this funciton is invoked after fragment context
2081
    // has already been prepared.
2082
2.41k
    if (!_prepared) {
2083
0
        std::string msg =
2084
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2085
0
        DCHECK(false) << msg;
2086
0
        LOG_ERROR(msg);
2087
0
        return res;
2088
0
    }
2089
2090
    // Make sure first profile is fragment level profile
2091
2.41k
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2092
2.41k
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2093
2.41k
    res.push_back(fragment_profile);
2094
2095
    // pipeline_id_to_profile is initialized in prepare stage
2096
4.29k
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2097
4.29k
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2098
4.29k
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2099
4.29k
        res.push_back(profile_ptr);
2100
4.29k
    }
2101
2102
2.41k
    return res;
2103
2.41k
}
2104
2105
std::shared_ptr<TRuntimeProfileTree>
2106
2.41k
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2107
    // we do not have mutex to protect pipeline_id_to_profile
2108
    // so we need to make sure this funciton is invoked after fragment context
2109
    // has already been prepared.
2110
2.41k
    if (!_prepared) {
2111
0
        std::string msg =
2112
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2113
0
        DCHECK(false) << msg;
2114
0
        LOG_ERROR(msg);
2115
0
        return nullptr;
2116
0
    }
2117
2118
6.58k
    for (const auto& tasks : _tasks) {
2119
13.2k
        for (const auto& task : tasks) {
2120
13.2k
            if (task.second->load_channel_profile() == nullptr) {
2121
0
                continue;
2122
0
            }
2123
2124
13.2k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2125
2126
13.2k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2127
13.2k
                                                           _runtime_state->profile_level());
2128
13.2k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2129
13.2k
        }
2130
6.58k
    }
2131
2132
2.41k
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2133
2.41k
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2134
2.41k
                                                      _runtime_state->profile_level());
2135
2.41k
    return load_channel_profile;
2136
2.41k
}
2137
2138
// Collect runtime filter IDs registered by all tasks in this PFC.
2139
// Used during recursive CTE stage transitions to know which filters to deregister
2140
// before creating the new PFC for the next recursion round.
2141
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2142
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2143
// the vector sizes do not change, and individual RuntimeState filter sets are
2144
// written only during open() which has completed by the time we reach rerun.
2145
3.08k
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2146
3.08k
    std::set<int> result;
2147
3.77k
    for (const auto& _task : _tasks) {
2148
6.08k
        for (const auto& task : _task) {
2149
6.08k
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2150
6.08k
            result.merge(set);
2151
6.08k
        }
2152
3.77k
    }
2153
3.08k
    if (_runtime_state) {
2154
3.08k
        auto set = _runtime_state->get_deregister_runtime_filter();
2155
3.08k
        result.merge(set);
2156
3.08k
    }
2157
3.08k
    return result;
2158
3.08k
}
2159
2160
444k
void PipelineFragmentContext::_release_resource() {
2161
444k
    std::lock_guard<std::mutex> l(_task_mutex);
2162
    // The memory released by the query end is recorded in the query mem tracker.
2163
444k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2164
444k
    auto st = _query_ctx->exec_status();
2165
1.37M
    for (auto& _task : _tasks) {
2166
1.37M
        if (!_task.empty()) {
2167
1.37M
            _call_back(_task.front().first->runtime_state(), &st);
2168
1.37M
        }
2169
1.37M
    }
2170
444k
    _tasks.clear();
2171
444k
    _dag.clear();
2172
444k
    _pip_id_to_pipeline.clear();
2173
444k
    _pipelines.clear();
2174
444k
    _sink.reset();
2175
444k
    _root_op.reset();
2176
444k
    _runtime_filter_mgr_map.clear();
2177
444k
    _op_id_to_shared_state.clear();
2178
444k
}
2179
2180
#include "common/compile_check_end.h"
2181
} // namespace doris