Coverage Report

Created: 2026-03-31 07:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <pthread.h>
24
25
#include <algorithm>
26
#include <cstdlib>
27
// IWYU pragma: no_include <bits/chrono.h>
28
#include <fmt/format.h>
29
30
#include <chrono> // IWYU pragma: keep
31
#include <map>
32
#include <memory>
33
#include <ostream>
34
#include <utility>
35
36
#include "cloud/config.h"
37
#include "common/cast_set.h"
38
#include "common/config.h"
39
#include "common/exception.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "exec/exchange/local_exchange_sink_operator.h"
43
#include "exec/exchange/local_exchange_source_operator.h"
44
#include "exec/exchange/local_exchanger.h"
45
#include "exec/exchange/vdata_stream_mgr.h"
46
#include "exec/operator/aggregation_sink_operator.h"
47
#include "exec/operator/aggregation_source_operator.h"
48
#include "exec/operator/analytic_sink_operator.h"
49
#include "exec/operator/analytic_source_operator.h"
50
#include "exec/operator/assert_num_rows_operator.h"
51
#include "exec/operator/blackhole_sink_operator.h"
52
#include "exec/operator/cache_sink_operator.h"
53
#include "exec/operator/cache_source_operator.h"
54
#include "exec/operator/datagen_operator.h"
55
#include "exec/operator/dict_sink_operator.h"
56
#include "exec/operator/distinct_streaming_aggregation_operator.h"
57
#include "exec/operator/empty_set_operator.h"
58
#include "exec/operator/es_scan_operator.h"
59
#include "exec/operator/exchange_sink_operator.h"
60
#include "exec/operator/exchange_source_operator.h"
61
#include "exec/operator/file_scan_operator.h"
62
#include "exec/operator/group_commit_block_sink_operator.h"
63
#include "exec/operator/group_commit_scan_operator.h"
64
#include "exec/operator/hashjoin_build_sink.h"
65
#include "exec/operator/hashjoin_probe_operator.h"
66
#include "exec/operator/hive_table_sink_operator.h"
67
#include "exec/operator/iceberg_delete_sink_operator.h"
68
#include "exec/operator/iceberg_merge_sink_operator.h"
69
#include "exec/operator/iceberg_table_sink_operator.h"
70
#include "exec/operator/jdbc_scan_operator.h"
71
#include "exec/operator/jdbc_table_sink_operator.h"
72
#include "exec/operator/local_merge_sort_source_operator.h"
73
#include "exec/operator/materialization_opertor.h"
74
#include "exec/operator/maxcompute_table_sink_operator.h"
75
#include "exec/operator/memory_scratch_sink_operator.h"
76
#include "exec/operator/meta_scan_operator.h"
77
#include "exec/operator/multi_cast_data_stream_sink.h"
78
#include "exec/operator/multi_cast_data_stream_source.h"
79
#include "exec/operator/nested_loop_join_build_operator.h"
80
#include "exec/operator/nested_loop_join_probe_operator.h"
81
#include "exec/operator/olap_scan_operator.h"
82
#include "exec/operator/olap_table_sink_operator.h"
83
#include "exec/operator/olap_table_sink_v2_operator.h"
84
#include "exec/operator/partition_sort_sink_operator.h"
85
#include "exec/operator/partition_sort_source_operator.h"
86
#include "exec/operator/partitioned_aggregation_sink_operator.h"
87
#include "exec/operator/partitioned_aggregation_source_operator.h"
88
#include "exec/operator/partitioned_hash_join_probe_operator.h"
89
#include "exec/operator/partitioned_hash_join_sink_operator.h"
90
#include "exec/operator/rec_cte_anchor_sink_operator.h"
91
#include "exec/operator/rec_cte_scan_operator.h"
92
#include "exec/operator/rec_cte_sink_operator.h"
93
#include "exec/operator/rec_cte_source_operator.h"
94
#include "exec/operator/repeat_operator.h"
95
#include "exec/operator/result_file_sink_operator.h"
96
#include "exec/operator/result_sink_operator.h"
97
#include "exec/operator/schema_scan_operator.h"
98
#include "exec/operator/select_operator.h"
99
#include "exec/operator/set_probe_sink_operator.h"
100
#include "exec/operator/set_sink_operator.h"
101
#include "exec/operator/set_source_operator.h"
102
#include "exec/operator/sort_sink_operator.h"
103
#include "exec/operator/sort_source_operator.h"
104
#include "exec/operator/spill_iceberg_table_sink_operator.h"
105
#include "exec/operator/spill_sort_sink_operator.h"
106
#include "exec/operator/spill_sort_source_operator.h"
107
#include "exec/operator/streaming_aggregation_operator.h"
108
#include "exec/operator/table_function_operator.h"
109
#include "exec/operator/tvf_table_sink_operator.h"
110
#include "exec/operator/union_sink_operator.h"
111
#include "exec/operator/union_source_operator.h"
112
#include "exec/pipeline/dependency.h"
113
#include "exec/pipeline/pipeline_task.h"
114
#include "exec/pipeline/task_scheduler.h"
115
#include "exec/runtime_filter/runtime_filter_mgr.h"
116
#include "exec/sort/topn_sorter.h"
117
#include "exec/spill/spill_file.h"
118
#include "io/fs/stream_load_pipe.h"
119
#include "load/stream_load/new_load_stream_mgr.h"
120
#include "runtime/exec_env.h"
121
#include "runtime/fragment_mgr.h"
122
#include "runtime/result_block_buffer.h"
123
#include "runtime/result_buffer_mgr.h"
124
#include "runtime/runtime_state.h"
125
#include "runtime/thread_context.h"
126
#include "service/backend_options.h"
127
#include "util/countdown_latch.h"
128
#include "util/debug_util.h"
129
#include "util/uid_util.h"
130
131
namespace doris {
132
#include "common/compile_check_begin.h"
133
PipelineFragmentContext::PipelineFragmentContext(
134
        TUniqueId query_id, const TPipelineFragmentParams& request,
135
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
136
        const std::function<void(RuntimeState*, Status*)>& call_back,
137
        report_status_callback report_status_cb)
138
341k
        : _query_id(std::move(query_id)),
139
341k
          _fragment_id(request.fragment_id),
140
341k
          _exec_env(exec_env),
141
341k
          _query_ctx(std::move(query_ctx)),
142
341k
          _call_back(call_back),
143
341k
          _is_report_on_cancel(true),
144
341k
          _report_status_cb(std::move(report_status_cb)),
145
341k
          _params(request),
146
341k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
147
341k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
148
341k
                                                               : false) {
149
341k
    _fragment_watcher.start();
150
341k
}
151
152
341k
PipelineFragmentContext::~PipelineFragmentContext() {
153
341k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
154
341k
            .tag("query_id", print_id(_query_id))
155
341k
            .tag("fragment_id", _fragment_id);
156
341k
    _release_resource();
157
341k
    {
158
        // The memory released by the query end is recorded in the query mem tracker.
159
341k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
160
341k
        _runtime_state.reset();
161
341k
        _query_ctx.reset();
162
341k
    }
163
341k
}
164
165
296
bool PipelineFragmentContext::is_timeout(timespec now) const {
166
296
    if (_timeout <= 0) {
167
0
        return false;
168
0
    }
169
296
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
170
296
}
171
172
// notify_close() transitions the PFC from "waiting for external close notification" to
173
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
174
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
175
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
176
9.04k
bool PipelineFragmentContext::notify_close() {
177
9.04k
    bool all_closed = false;
178
9.04k
    bool need_remove = false;
179
9.04k
    {
180
9.04k
        std::lock_guard<std::mutex> l(_task_mutex);
181
9.04k
        if (_closed_tasks >= _total_tasks) {
182
3.51k
            if (_need_notify_close) {
183
                // Fragment was cancelled and waiting for notify to close.
184
                // Record that we need to remove from fragment mgr, but do it
185
                // after releasing _task_mutex to avoid ABBA deadlock with
186
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
187
                // first, then _task_mutex via debug_string()).
188
3.43k
                need_remove = true;
189
3.43k
            }
190
3.51k
            all_closed = true;
191
3.51k
        }
192
        // make fragment release by self after cancel
193
9.04k
        _need_notify_close = false;
194
9.04k
    }
195
9.04k
    if (need_remove) {
196
3.43k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
197
3.43k
    }
198
9.04k
    return all_closed;
199
9.04k
}
200
201
// Must not add lock in this method. Because it will call query ctx cancel. And
202
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
203
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
204
// There maybe dead lock.
205
5.56k
void PipelineFragmentContext::cancel(const Status reason) {
206
5.56k
    LOG_INFO("PipelineFragmentContext::cancel")
207
5.56k
            .tag("query_id", print_id(_query_id))
208
5.56k
            .tag("fragment_id", _fragment_id)
209
5.56k
            .tag("reason", reason.to_string());
210
5.56k
    if (notify_close()) {
211
97
        return;
212
97
    }
213
    // Timeout is a special error code, we need print current stack to debug timeout issue.
214
5.46k
    if (reason.is<ErrorCode::TIMEOUT>()) {
215
8
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
216
8
                                   debug_string());
217
8
        LOG_LONG_STRING(WARNING, dbg_str);
218
8
    }
219
220
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
221
5.46k
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
222
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
223
0
                    debug_string());
224
0
    }
225
226
5.46k
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
227
0
        print_profile("cancel pipeline, reason: " + reason.to_string());
228
0
    }
229
230
5.46k
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
231
22
        _query_ctx->set_load_error_url(error_url);
232
22
    }
233
234
5.46k
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
235
22
        _query_ctx->set_first_error_msg(first_error_msg);
236
22
    }
237
238
5.46k
    _query_ctx->cancel(reason, _fragment_id);
239
5.46k
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
240
281
        _is_report_on_cancel = false;
241
5.18k
    } else {
242
26.6k
        for (auto& id : _fragment_instance_ids) {
243
26.6k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
244
26.6k
        }
245
5.18k
    }
246
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
247
    // For stream load the fragment's query_id == load id, it is set in FE.
248
5.46k
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
249
5.46k
    if (stream_load_ctx != nullptr) {
250
30
        stream_load_ctx->pipe->cancel(reason.to_string());
251
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
252
        // We need to set the error URL at this point to ensure error information is properly
253
        // propagated to the client.
254
30
        stream_load_ctx->error_url = get_load_error_url();
255
30
        stream_load_ctx->first_error_msg = get_first_error_msg();
256
30
    }
257
258
28.4k
    for (auto& tasks : _tasks) {
259
62.3k
        for (auto& task : tasks) {
260
62.3k
            task.first->unblock_all_dependencies();
261
62.3k
        }
262
28.4k
    }
263
5.46k
}
264
265
548k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
266
548k
    PipelineId id = _next_pipeline_id++;
267
548k
    auto pipeline = std::make_shared<Pipeline>(
268
548k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
269
548k
            parent ? parent->num_tasks() : _num_instances);
270
548k
    if (idx >= 0) {
271
101k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
272
446k
    } else {
273
446k
        _pipelines.emplace_back(pipeline);
274
446k
    }
275
548k
    if (parent) {
276
203k
        parent->set_children(pipeline);
277
203k
    }
278
548k
    return pipeline;
279
548k
}
280
281
340k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
282
340k
    {
283
340k
        SCOPED_TIMER(_build_pipelines_timer);
284
        // 2. Build pipelines with operators in this fragment.
285
340k
        auto root_pipeline = add_pipeline();
286
340k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
287
340k
                                         &_root_op, root_pipeline));
288
289
        // 3. Create sink operator
290
340k
        if (!_params.fragment.__isset.output_sink) {
291
0
            return Status::InternalError("No output sink in this fragment!");
292
0
        }
293
340k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
294
340k
                                          _params.fragment.output_exprs, _params,
295
340k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
296
340k
                                          *_desc_tbl, root_pipeline->id()));
297
340k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
298
340k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
299
300
445k
        for (PipelinePtr& pipeline : _pipelines) {
301
18.4E
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
302
445k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
303
445k
        }
304
340k
    }
305
    // 4. Build local exchanger
306
340k
    if (_runtime_state->enable_local_shuffle()) {
307
338k
        SCOPED_TIMER(_plan_local_exchanger_timer);
308
338k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
309
338k
                                             _params.bucket_seq_to_instance_idx,
310
338k
                                             _params.shuffle_idx_to_instance_idx));
311
338k
    }
312
313
    // 5. Initialize global states in pipelines.
314
549k
    for (PipelinePtr& pipeline : _pipelines) {
315
549k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
316
549k
        pipeline->children().clear();
317
549k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
318
549k
    }
319
320
339k
    {
321
339k
        SCOPED_TIMER(_build_tasks_timer);
322
        // 6. Build pipeline tasks and initialize local state.
323
339k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
324
339k
    }
325
326
339k
    return Status::OK();
327
339k
}
328
329
340k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
330
340k
    if (_prepared) {
331
0
        return Status::InternalError("Already prepared");
332
0
    }
333
340k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
334
340k
        _timeout = _params.query_options.execution_timeout;
335
340k
    }
336
337
340k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
338
340k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
339
340k
    SCOPED_TIMER(_prepare_timer);
340
340k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
341
340k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
342
340k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
343
340k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
344
340k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
345
340k
    {
346
340k
        SCOPED_TIMER(_init_context_timer);
347
340k
        cast_set(_num_instances, _params.local_params.size());
348
340k
        _total_instances =
349
340k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
350
351
340k
        auto* fragment_context = this;
352
353
340k
        if (_params.query_options.__isset.is_report_success) {
354
337k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
355
337k
        }
356
357
        // 1. Set up the global runtime state.
358
340k
        _runtime_state = RuntimeState::create_unique(
359
340k
                _params.query_id, _params.fragment_id, _params.query_options,
360
340k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
361
340k
        _runtime_state->set_task_execution_context(shared_from_this());
362
340k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
363
340k
        if (_params.__isset.backend_id) {
364
337k
            _runtime_state->set_backend_id(_params.backend_id);
365
337k
        }
366
340k
        if (_params.__isset.import_label) {
367
238
            _runtime_state->set_import_label(_params.import_label);
368
238
        }
369
340k
        if (_params.__isset.db_name) {
370
190
            _runtime_state->set_db_name(_params.db_name);
371
190
        }
372
340k
        if (_params.__isset.load_job_id) {
373
0
            _runtime_state->set_load_job_id(_params.load_job_id);
374
0
        }
375
376
340k
        if (_params.is_simplified_param) {
377
122k
            _desc_tbl = _query_ctx->desc_tbl;
378
218k
        } else {
379
218k
            DCHECK(_params.__isset.desc_tbl);
380
218k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
381
218k
                                                  &_desc_tbl));
382
218k
        }
383
340k
        _runtime_state->set_desc_tbl(_desc_tbl);
384
340k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
385
340k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
386
340k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
387
340k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
388
389
        // init fragment_instance_ids
390
340k
        const auto target_size = _params.local_params.size();
391
340k
        _fragment_instance_ids.resize(target_size);
392
1.41M
        for (size_t i = 0; i < _params.local_params.size(); i++) {
393
1.07M
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
394
1.07M
            _fragment_instance_ids[i] = fragment_instance_id;
395
1.07M
        }
396
340k
    }
397
398
340k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
399
400
339k
    _init_next_report_time();
401
402
339k
    _prepared = true;
403
339k
    return Status::OK();
404
340k
}
405
406
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
407
        int instance_idx,
408
1.07M
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
409
1.07M
    const auto& local_params = _params.local_params[instance_idx];
410
1.07M
    auto fragment_instance_id = local_params.fragment_instance_id;
411
1.07M
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
412
1.07M
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
413
1.07M
    auto get_shared_state = [&](PipelinePtr pipeline)
414
1.07M
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
415
1.84M
                                       std::vector<std::shared_ptr<Dependency>>>> {
416
1.84M
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
417
1.84M
                                std::vector<std::shared_ptr<Dependency>>>>
418
1.84M
                shared_state_map;
419
2.49M
        for (auto& op : pipeline->operators()) {
420
2.49M
            auto source_id = op->operator_id();
421
2.49M
            if (auto iter = _op_id_to_shared_state.find(source_id);
422
2.49M
                iter != _op_id_to_shared_state.end()) {
423
773k
                shared_state_map.insert({source_id, iter->second});
424
773k
            }
425
2.49M
        }
426
1.84M
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
427
1.84M
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
428
1.84M
                iter != _op_id_to_shared_state.end()) {
429
337k
                shared_state_map.insert({sink_to_source_id, iter->second});
430
337k
            }
431
1.84M
        }
432
1.84M
        return shared_state_map;
433
1.84M
    };
434
435
3.35M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
436
2.27M
        auto& pipeline = _pipelines[pip_idx];
437
2.27M
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
438
1.84M
            auto task_runtime_state = RuntimeState::create_unique(
439
1.84M
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
440
1.84M
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
441
1.84M
            {
442
                // Initialize runtime state for this task
443
1.84M
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
444
445
1.84M
                task_runtime_state->set_task_execution_context(shared_from_this());
446
1.84M
                task_runtime_state->set_be_number(local_params.backend_num);
447
448
1.84M
                if (_params.__isset.backend_id) {
449
1.84M
                    task_runtime_state->set_backend_id(_params.backend_id);
450
1.84M
                }
451
1.84M
                if (_params.__isset.import_label) {
452
239
                    task_runtime_state->set_import_label(_params.import_label);
453
239
                }
454
1.84M
                if (_params.__isset.db_name) {
455
191
                    task_runtime_state->set_db_name(_params.db_name);
456
191
                }
457
1.84M
                if (_params.__isset.load_job_id) {
458
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
459
0
                }
460
1.84M
                if (_params.__isset.wal_id) {
461
114
                    task_runtime_state->set_wal_id(_params.wal_id);
462
114
                }
463
1.84M
                if (_params.__isset.content_length) {
464
31
                    task_runtime_state->set_content_length(_params.content_length);
465
31
                }
466
467
1.84M
                task_runtime_state->set_desc_tbl(_desc_tbl);
468
1.84M
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
469
1.84M
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
470
1.84M
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
471
1.84M
                task_runtime_state->set_max_operator_id(max_operator_id());
472
1.84M
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
473
1.84M
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
474
1.84M
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
475
476
1.84M
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
477
1.84M
            }
478
1.84M
            auto cur_task_id = _total_tasks++;
479
1.84M
            task_runtime_state->set_task_id(cur_task_id);
480
1.84M
            task_runtime_state->set_task_num(pipeline->num_tasks());
481
1.84M
            auto task = std::make_shared<PipelineTask>(
482
1.84M
                    pipeline, cur_task_id, task_runtime_state.get(),
483
1.84M
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
484
1.84M
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
485
1.84M
                    instance_idx);
486
1.84M
            pipeline->incr_created_tasks(instance_idx, task.get());
487
1.84M
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
488
1.84M
            _tasks[instance_idx].emplace_back(
489
1.84M
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
490
1.84M
                            std::move(task), std::move(task_runtime_state)});
491
1.84M
        }
492
2.27M
    }
493
494
    /**
495
         * Build DAG for pipeline tasks.
496
         * For example, we have
497
         *
498
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
499
         *            \                      /
500
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
501
         *                 \                          /
502
         *               JoinProbeOperator2 (Pipeline1)
503
         *
504
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
505
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
506
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
507
         *
508
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
509
         * and JoinProbeOperator2.
510
         */
511
2.27M
    for (auto& _pipeline : _pipelines) {
512
2.27M
        if (pipeline_id_to_task.contains(_pipeline->id())) {
513
1.83M
            auto* task = pipeline_id_to_task[_pipeline->id()];
514
1.83M
            DCHECK(task != nullptr);
515
516
            // If this task has upstream dependency, then inject it into this task.
517
1.83M
            if (_dag.contains(_pipeline->id())) {
518
1.19M
                auto& deps = _dag[_pipeline->id()];
519
1.96M
                for (auto& dep : deps) {
520
1.96M
                    if (pipeline_id_to_task.contains(dep)) {
521
1.09M
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
522
1.09M
                        if (ss) {
523
419k
                            task->inject_shared_state(ss);
524
674k
                        } else {
525
674k
                            pipeline_id_to_task[dep]->inject_shared_state(
526
674k
                                    task->get_source_shared_state());
527
674k
                        }
528
1.09M
                    }
529
1.96M
                }
530
1.19M
            }
531
1.83M
        }
532
2.27M
    }
533
3.35M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
534
2.27M
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
535
1.83M
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
536
1.83M
            DCHECK(pipeline_id_to_profile[pip_idx]);
537
1.83M
            std::vector<TScanRangeParams> scan_ranges;
538
1.83M
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
539
1.83M
            if (local_params.per_node_scan_ranges.contains(node_id)) {
540
270k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
541
270k
            }
542
1.83M
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
543
1.83M
                                                             _params.fragment.output_sink));
544
1.83M
        }
545
2.27M
    }
546
1.08M
    {
547
1.08M
        std::lock_guard<std::mutex> l(_state_map_lock);
548
1.08M
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
549
1.08M
    }
550
1.08M
    return Status::OK();
551
1.07M
}
552
553
340k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
554
340k
    _total_tasks = 0;
555
340k
    _closed_tasks = 0;
556
340k
    const auto target_size = _params.local_params.size();
557
340k
    _tasks.resize(target_size);
558
340k
    _runtime_filter_mgr_map.resize(target_size);
559
888k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
560
548k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
561
548k
    }
562
340k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
563
564
340k
    if (target_size > 1 &&
565
340k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
566
119k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
567
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
568
26.9k
        std::vector<Status> prepare_status(target_size);
569
26.9k
        int submitted_tasks = 0;
570
26.9k
        Status submit_status;
571
26.9k
        CountDownLatch latch((int)target_size);
572
340k
        for (int i = 0; i < target_size; i++) {
573
313k
            submit_status = thread_pool->submit_func([&, i]() {
574
313k
                SCOPED_ATTACH_TASK(_query_ctx.get());
575
313k
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
576
313k
                latch.count_down();
577
313k
            });
578
313k
            if (LIKELY(submit_status.ok())) {
579
313k
                submitted_tasks++;
580
18.4E
            } else {
581
18.4E
                break;
582
18.4E
            }
583
313k
        }
584
26.9k
        latch.arrive_and_wait(target_size - submitted_tasks);
585
26.9k
        if (UNLIKELY(!submit_status.ok())) {
586
0
            return submit_status;
587
0
        }
588
340k
        for (int i = 0; i < submitted_tasks; i++) {
589
313k
            if (!prepare_status[i].ok()) {
590
0
                return prepare_status[i];
591
0
            }
592
313k
        }
593
313k
    } else {
594
1.07M
        for (int i = 0; i < target_size; i++) {
595
766k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
596
766k
        }
597
313k
    }
598
340k
    _pipeline_parent_map.clear();
599
340k
    _op_id_to_shared_state.clear();
600
601
340k
    return Status::OK();
602
340k
}
603
604
338k
void PipelineFragmentContext::_init_next_report_time() {
605
338k
    auto interval_s = config::pipeline_status_report_interval;
606
338k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
607
18.4E
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
608
31.2k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
609
        // We don't want to wait longer than it takes to run the entire fragment.
610
31.2k
        _previous_report_time =
611
31.2k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
612
31.2k
        _disable_period_report = false;
613
31.2k
    }
614
338k
}
615
616
3.70k
void PipelineFragmentContext::refresh_next_report_time() {
617
3.70k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
618
3.70k
    DCHECK(disable == true);
619
3.70k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
620
3.70k
    _disable_period_report.compare_exchange_strong(disable, false);
621
3.70k
}
622
623
6.26M
void PipelineFragmentContext::trigger_report_if_necessary() {
624
6.26M
    if (!_is_report_success) {
625
5.89M
        return;
626
5.89M
    }
627
376k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
628
376k
    if (disable) {
629
5.97k
        return;
630
5.97k
    }
631
370k
    int32_t interval_s = config::pipeline_status_report_interval;
632
370k
    if (interval_s <= 0) {
633
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
634
0
                        "trigger "
635
0
                        "report.";
636
0
    }
637
370k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
638
370k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
639
370k
    if (MonotonicNanos() > next_report_time) {
640
3.71k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
641
3.71k
                                                            std::memory_order_acq_rel)) {
642
12
            return;
643
12
        }
644
3.70k
        if (VLOG_FILE_IS_ON) {
645
0
            VLOG_FILE << "Reporting "
646
0
                      << "profile for query_id " << print_id(_query_id)
647
0
                      << ", fragment id: " << _fragment_id;
648
649
0
            std::stringstream ss;
650
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
651
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
652
0
            if (_runtime_state->load_channel_profile()) {
653
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
654
0
            }
655
656
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
657
0
                      << " profile:\n"
658
0
                      << ss.str();
659
0
        }
660
3.70k
        auto st = send_report(false);
661
3.70k
        if (!st.ok()) {
662
0
            disable = true;
663
0
            _disable_period_report.compare_exchange_strong(disable, false,
664
0
                                                           std::memory_order_acq_rel);
665
0
        }
666
3.70k
    }
667
370k
}
668
669
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
670
335k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
671
335k
    if (_params.fragment.plan.nodes.empty()) {
672
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
673
0
    }
674
675
335k
    int node_idx = 0;
676
677
335k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
678
335k
                                        &node_idx, root, cur_pipe, 0, false, false));
679
680
335k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
681
0
        return Status::InternalError(
682
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
683
0
    }
684
335k
    return Status::OK();
685
335k
}
686
687
Status PipelineFragmentContext::_create_tree_helper(
688
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
689
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
690
546k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
691
    // propagate error case
692
546k
    if (*node_idx >= tnodes.size()) {
693
0
        return Status::InternalError(
694
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
695
0
                *node_idx, tnodes.size());
696
0
    }
697
546k
    const TPlanNode& tnode = tnodes[*node_idx];
698
699
546k
    int num_children = tnodes[*node_idx].num_children;
700
546k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
701
546k
    bool current_require_bucket_distribution = require_bucket_distribution;
702
    // TODO: Create CacheOperator is confused now
703
546k
    OperatorPtr op = nullptr;
704
546k
    OperatorPtr cache_op = nullptr;
705
546k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
706
546k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
707
546k
                                     followed_by_shuffled_operator,
708
546k
                                     current_require_bucket_distribution, cache_op));
709
    // Initialization must be done here. For example, group by expressions in agg will be used to
710
    // decide if a local shuffle should be planed, so it must be initialized here.
711
546k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
712
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
713
546k
    if (parent != nullptr) {
714
        // add to parent's child(s)
715
209k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
716
336k
    } else {
717
336k
        *root = op;
718
336k
    }
719
    /**
720
     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
721
     *
722
     * For plan:
723
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
724
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
725
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
726
     *
727
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
728
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
729
     */
730
546k
    auto required_data_distribution =
731
546k
            cur_pipe->operators().empty()
732
546k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
733
546k
                    : op->required_data_distribution(_runtime_state.get());
734
546k
    current_followed_by_shuffled_operator =
735
546k
            ((followed_by_shuffled_operator ||
736
546k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
737
490k
                                             : op->is_shuffled_operator())) &&
738
546k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
739
546k
            (followed_by_shuffled_operator &&
740
438k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
741
742
546k
    current_require_bucket_distribution =
743
546k
            ((require_bucket_distribution ||
744
546k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
745
494k
                                             : op->is_colocated_operator())) &&
746
546k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
747
546k
            (require_bucket_distribution &&
748
444k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
749
750
546k
    if (num_children == 0) {
751
351k
        _use_serial_source = op->is_serial_operator();
752
351k
    }
753
    // rely on that tnodes is preorder of the plan
754
755k
    for (int i = 0; i < num_children; i++) {
755
209k
        ++*node_idx;
756
209k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
757
209k
                                            current_followed_by_shuffled_operator,
758
209k
                                            current_require_bucket_distribution));
759
760
        // we are expecting a child, but have used all nodes
761
        // this means we have been given a bad tree and must fail
762
209k
        if (*node_idx >= tnodes.size()) {
763
0
            return Status::InternalError(
764
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
765
0
                    "nodes: {}",
766
0
                    *node_idx, tnodes.size());
767
0
        }
768
209k
    }
769
770
546k
    return Status::OK();
771
546k
}
772
773
void PipelineFragmentContext::_inherit_pipeline_properties(
774
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
775
101k
        PipelinePtr pipe_with_sink) {
776
101k
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
777
101k
    pipe_with_source->set_num_tasks(_num_instances);
778
101k
    pipe_with_source->set_data_distribution(data_distribution);
779
101k
}
780
781
Status PipelineFragmentContext::_add_local_exchange_impl(
782
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
783
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
784
        const std::map<int, int>& bucket_seq_to_instance_idx,
785
101k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
786
101k
    auto& operators = cur_pipe->operators();
787
101k
    const auto downstream_pipeline_id = cur_pipe->id();
788
101k
    auto local_exchange_id = next_operator_id();
789
    // 1. Create a new pipeline with local exchange sink.
790
101k
    DataSinkOperatorPtr sink;
791
101k
    auto sink_id = next_sink_operator_id();
792
793
    /**
794
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
795
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
796
     */
797
101k
    const bool followed_by_shuffled_operator =
798
101k
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
799
101k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
800
101k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
801
101k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
802
101k
                                         followed_by_shuffled_operator && !_use_serial_source;
803
101k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
804
101k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
805
101k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
806
101k
    if (bucket_seq_to_instance_idx.empty() &&
807
101k
        data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
808
12
        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
809
12
    }
810
101k
    RETURN_IF_ERROR(new_pip->set_sink(sink));
811
101k
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
812
101k
                                          num_buckets, use_global_hash_shuffle,
813
101k
                                          shuffle_idx_to_instance_idx));
814
815
    // 2. Create and initialize LocalExchangeSharedState.
816
101k
    std::shared_ptr<LocalExchangeSharedState> shared_state =
817
101k
            LocalExchangeSharedState::create_shared(_num_instances);
818
101k
    switch (data_distribution.distribution_type) {
819
18.7k
    case ExchangeType::HASH_SHUFFLE:
820
18.7k
        shared_state->exchanger = ShuffleExchanger::create_unique(
821
18.7k
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
822
18.7k
                use_global_hash_shuffle ? _total_instances : _num_instances,
823
18.7k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
824
18.7k
                        ? cast_set<int>(
825
18.7k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
826
18.7k
                        : 0);
827
18.7k
        break;
828
543
    case ExchangeType::BUCKET_HASH_SHUFFLE:
829
543
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
830
543
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
831
543
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
832
543
                        ? cast_set<int>(
833
543
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
834
543
                        : 0);
835
543
        break;
836
78.5k
    case ExchangeType::PASSTHROUGH:
837
78.5k
        shared_state->exchanger = PassthroughExchanger::create_unique(
838
78.5k
                cur_pipe->num_tasks(), _num_instances,
839
78.5k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
840
78.5k
                        ? cast_set<int>(
841
78.5k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
842
78.5k
                        : 0);
843
78.5k
        break;
844
496
    case ExchangeType::BROADCAST:
845
496
        shared_state->exchanger = BroadcastExchanger::create_unique(
846
496
                cur_pipe->num_tasks(), _num_instances,
847
496
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
848
496
                        ? cast_set<int>(
849
496
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
850
496
                        : 0);
851
496
        break;
852
2.55k
    case ExchangeType::PASS_TO_ONE:
853
2.55k
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
854
            // If shared hash table is enabled for BJ, hash table will be built by only one task
855
769
            shared_state->exchanger = PassToOneExchanger::create_unique(
856
769
                    cur_pipe->num_tasks(), _num_instances,
857
769
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
858
769
                            ? cast_set<int>(_runtime_state->query_options()
859
769
                                                    .local_exchange_free_blocks_limit)
860
769
                            : 0);
861
1.78k
        } else {
862
1.78k
            shared_state->exchanger = BroadcastExchanger::create_unique(
863
1.78k
                    cur_pipe->num_tasks(), _num_instances,
864
1.78k
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
865
1.78k
                            ? cast_set<int>(_runtime_state->query_options()
866
1.78k
                                                    .local_exchange_free_blocks_limit)
867
1.78k
                            : 0);
868
1.78k
        }
869
2.55k
        break;
870
939
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
871
939
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
872
939
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
873
939
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
874
939
                        ? cast_set<int>(
875
939
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
876
939
                        : 0);
877
939
        break;
878
0
    default:
879
0
        return Status::InternalError("Unsupported local exchange type : " +
880
0
                                     std::to_string((int)data_distribution.distribution_type));
881
101k
    }
882
102k
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
883
102k
                                             "LOCAL_EXCHANGE_OPERATOR");
884
102k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
885
102k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
886
887
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
888
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
889
890
    // 3.1 Initialize new pipeline's operator list.
891
102k
    std::copy(operators.begin(), operators.begin() + idx,
892
102k
              std::inserter(new_pip->operators(), new_pip->operators().end()));
893
894
    // 3.2 Erase unused operators in previous pipeline.
895
102k
    operators.erase(operators.begin(), operators.begin() + idx);
896
897
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
898
102k
    OperatorPtr source_op;
899
102k
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
900
102k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
901
102k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
902
102k
    if (!operators.empty()) {
903
41.7k
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
904
41.7k
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
905
41.7k
    }
906
102k
    operators.insert(operators.begin(), source_op);
907
908
    // 5. Set children for two pipelines separately.
909
102k
    std::vector<std::shared_ptr<Pipeline>> new_children;
910
102k
    std::vector<PipelineId> edges_with_source;
911
120k
    for (auto child : cur_pipe->children()) {
912
120k
        bool found = false;
913
135k
        for (auto op : new_pip->operators()) {
914
135k
            if (child->sink()->node_id() == op->node_id()) {
915
12.9k
                new_pip->set_children(child);
916
12.9k
                found = true;
917
12.9k
            };
918
135k
        }
919
120k
        if (!found) {
920
107k
            new_children.push_back(child);
921
107k
            edges_with_source.push_back(child->id());
922
107k
        }
923
120k
    }
924
102k
    new_children.push_back(new_pip);
925
102k
    edges_with_source.push_back(new_pip->id());
926
927
    // 6. Set DAG for new pipelines.
928
102k
    if (!new_pip->children().empty()) {
929
6.98k
        std::vector<PipelineId> edges_with_sink;
930
12.9k
        for (auto child : new_pip->children()) {
931
12.9k
            edges_with_sink.push_back(child->id());
932
12.9k
        }
933
6.98k
        _dag.insert({new_pip->id(), edges_with_sink});
934
6.98k
    }
935
102k
    cur_pipe->set_children(new_children);
936
102k
    _dag[downstream_pipeline_id] = edges_with_source;
937
102k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
938
102k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
939
102k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
940
941
    // 7. Inherit properties from current pipeline.
942
102k
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
943
102k
    return Status::OK();
944
102k
}
945
946
Status PipelineFragmentContext::_add_local_exchange(
947
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
948
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
949
        const std::map<int, int>& bucket_seq_to_instance_idx,
950
167k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
951
167k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
952
39.7k
        return Status::OK();
953
39.7k
    }
954
955
127k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
956
45.5k
        return Status::OK();
957
45.5k
    }
958
82.3k
    *do_local_exchange = true;
959
960
82.3k
    auto& operators = cur_pipe->operators();
961
82.3k
    auto total_op_num = operators.size();
962
82.3k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
963
82.3k
    RETURN_IF_ERROR(_add_local_exchange_impl(
964
82.3k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
965
82.3k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
966
967
18.4E
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
968
18.4E
            << "total_op_num: " << total_op_num
969
18.4E
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
970
18.4E
            << " new_pip->operators().size(): " << new_pip->operators().size();
971
972
    // There are some local shuffles with relatively heavy operations on the sink.
973
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
974
    // Therefore, local passthrough is used to increase the concurrency of the sink.
975
    // op -> local sink(1) -> local source (n)
976
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
977
82.9k
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
978
82.3k
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
979
18.9k
        RETURN_IF_ERROR(_add_local_exchange_impl(
980
18.9k
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
981
18.9k
                add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
982
18.9k
                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
983
18.9k
                shuffle_idx_to_instance_idx));
984
18.9k
    }
985
82.3k
    return Status::OK();
986
82.3k
}
987
988
Status PipelineFragmentContext::_plan_local_exchange(
989
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
990
337k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
991
781k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
992
444k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
993
        // Set property if child pipeline is not join operator's child.
994
444k
        if (!_pipelines[pip_idx]->children().empty()) {
995
101k
            for (auto& child : _pipelines[pip_idx]->children()) {
996
101k
                if (child->sink()->node_id() ==
997
101k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
998
90.1k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
999
90.1k
                }
1000
101k
            }
1001
97.5k
        }
1002
1003
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1004
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1005
        // still keep colocate plan after local shuffle
1006
444k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1007
444k
                                             bucket_seq_to_instance_idx,
1008
444k
                                             shuffle_idx_to_instance_idx));
1009
444k
    }
1010
337k
    return Status::OK();
1011
337k
}
1012
1013
Status PipelineFragmentContext::_plan_local_exchange(
1014
        int num_buckets, int pip_idx, PipelinePtr pip,
1015
        const std::map<int, int>& bucket_seq_to_instance_idx,
1016
442k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1017
442k
    int idx = 1;
1018
442k
    bool do_local_exchange = false;
1019
484k
    do {
1020
484k
        auto& ops = pip->operators();
1021
484k
        do_local_exchange = false;
1022
        // Plan local exchange for each operator.
1023
551k
        for (; idx < ops.size();) {
1024
108k
            if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1025
101k
                RETURN_IF_ERROR(_add_local_exchange(
1026
101k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
1027
101k
                        ops[idx]->required_data_distribution(_runtime_state.get()),
1028
101k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1029
101k
                        shuffle_idx_to_instance_idx));
1030
101k
            }
1031
108k
            if (do_local_exchange) {
1032
                // If local exchange is needed for current operator, we will split this pipeline to
1033
                // two pipelines by local exchange sink/source. And then we need to process remaining
1034
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1035
                // is current operator was already processed) and continue to plan local exchange.
1036
41.8k
                idx = 2;
1037
41.8k
                break;
1038
41.8k
            }
1039
66.9k
            idx++;
1040
66.9k
        }
1041
484k
    } while (do_local_exchange);
1042
442k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1043
66.5k
        RETURN_IF_ERROR(_add_local_exchange(
1044
66.5k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1045
66.5k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1046
66.5k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1047
66.5k
    }
1048
442k
    return Status::OK();
1049
442k
}
1050
1051
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1052
                                                  const std::vector<TExpr>& output_exprs,
1053
                                                  const TPipelineFragmentParams& params,
1054
                                                  const RowDescriptor& row_desc,
1055
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1056
339k
                                                  PipelineId cur_pipeline_id) {
1057
339k
    switch (thrift_sink.type) {
1058
121k
    case TDataSinkType::DATA_STREAM_SINK: {
1059
121k
        if (!thrift_sink.__isset.stream_sink) {
1060
0
            return Status::InternalError("Missing data stream sink.");
1061
0
        }
1062
121k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1063
121k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1064
121k
                params.destinations, _fragment_instance_ids);
1065
121k
        break;
1066
121k
    }
1067
188k
    case TDataSinkType::RESULT_SINK: {
1068
188k
        if (!thrift_sink.__isset.result_sink) {
1069
0
            return Status::InternalError("Missing data buffer sink.");
1070
0
        }
1071
1072
188k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), row_desc,
1073
188k
                                                      output_exprs, thrift_sink.result_sink);
1074
188k
        break;
1075
188k
    }
1076
105
    case TDataSinkType::DICTIONARY_SINK: {
1077
105
        if (!thrift_sink.__isset.dictionary_sink) {
1078
0
            return Status::InternalError("Missing dict sink.");
1079
0
        }
1080
1081
105
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1082
105
                                                    thrift_sink.dictionary_sink);
1083
105
        break;
1084
105
    }
1085
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1086
28.3k
    case TDataSinkType::OLAP_TABLE_SINK: {
1087
28.3k
        if (state->query_options().enable_memtable_on_sink_node &&
1088
28.3k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1089
28.3k
            !config::is_cloud_mode()) {
1090
338
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(pool, next_sink_operator_id(),
1091
338
                                                               row_desc, output_exprs);
1092
28.0k
        } else {
1093
28.0k
            _sink = std::make_shared<OlapTableSinkOperatorX>(pool, next_sink_operator_id(),
1094
28.0k
                                                             row_desc, output_exprs);
1095
28.0k
        }
1096
28.3k
        break;
1097
0
    }
1098
165
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1099
165
        DCHECK(thrift_sink.__isset.olap_table_sink);
1100
165
        DCHECK(state->get_query_ctx() != nullptr);
1101
165
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1102
165
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1103
165
                                                                output_exprs);
1104
165
        break;
1105
0
    }
1106
0
    case TDataSinkType::HIVE_TABLE_SINK: {
1107
0
        if (!thrift_sink.__isset.hive_table_sink) {
1108
0
            return Status::InternalError("Missing hive table sink.");
1109
0
        }
1110
0
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1111
0
                                                         output_exprs);
1112
0
        break;
1113
0
    }
1114
0
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1115
0
        if (!thrift_sink.__isset.iceberg_table_sink) {
1116
0
            return Status::InternalError("Missing iceberg table sink.");
1117
0
        }
1118
0
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1119
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1120
0
                                                                     row_desc, output_exprs);
1121
0
        } else {
1122
0
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1123
0
                                                                row_desc, output_exprs);
1124
0
        }
1125
0
        break;
1126
0
    }
1127
0
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1128
0
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1129
0
            return Status::InternalError("Missing iceberg delete sink.");
1130
0
        }
1131
0
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1132
0
                                                             row_desc, output_exprs);
1133
0
        break;
1134
0
    }
1135
0
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1136
0
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1137
0
            return Status::InternalError("Missing iceberg merge sink.");
1138
0
        }
1139
0
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1140
0
                                                            output_exprs);
1141
0
        break;
1142
0
    }
1143
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1144
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1145
0
            return Status::InternalError("Missing max compute table sink.");
1146
0
        }
1147
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1148
0
                                                       output_exprs);
1149
0
        break;
1150
0
    }
1151
0
    case TDataSinkType::JDBC_TABLE_SINK: {
1152
0
        if (!thrift_sink.__isset.jdbc_table_sink) {
1153
0
            return Status::InternalError("Missing data jdbc sink.");
1154
0
        }
1155
0
        if (config::enable_java_support) {
1156
0
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1157
0
                                                             output_exprs);
1158
0
        } else {
1159
0
            return Status::InternalError(
1160
0
                    "Jdbc table sink is not enabled, you can change be config "
1161
0
                    "enable_java_support to true and restart be.");
1162
0
        }
1163
0
        break;
1164
0
    }
1165
3
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1166
3
        if (!thrift_sink.__isset.memory_scratch_sink) {
1167
0
            return Status::InternalError("Missing data buffer sink.");
1168
0
        }
1169
1170
3
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1171
3
                                                             output_exprs);
1172
3
        break;
1173
3
    }
1174
338
    case TDataSinkType::RESULT_FILE_SINK: {
1175
338
        if (!thrift_sink.__isset.result_file_sink) {
1176
0
            return Status::InternalError("Missing result file sink.");
1177
0
        }
1178
1179
        // Result file sink is not the top sink
1180
338
        if (params.__isset.destinations && !params.destinations.empty()) {
1181
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1182
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1183
0
                    params.destinations, output_exprs, desc_tbl);
1184
338
        } else {
1185
338
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1186
338
                                                              output_exprs);
1187
338
        }
1188
338
        break;
1189
338
    }
1190
1.73k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1191
1.73k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1192
1.73k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1193
1.73k
        auto sink_id = next_sink_operator_id();
1194
1.73k
        const int multi_cast_node_id = sink_id;
1195
1.73k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1196
        // one sink has multiple sources.
1197
1.73k
        std::vector<int> sources;
1198
6.72k
        for (int i = 0; i < sender_size; ++i) {
1199
4.98k
            auto source_id = next_operator_id();
1200
4.98k
            sources.push_back(source_id);
1201
4.98k
        }
1202
1203
1.73k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1204
1.73k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1205
6.72k
        for (int i = 0; i < sender_size; ++i) {
1206
4.98k
            auto new_pipeline = add_pipeline();
1207
            // use to exchange sink
1208
4.98k
            RowDescriptor* exchange_row_desc = nullptr;
1209
4.98k
            {
1210
4.98k
                const auto& tmp_row_desc =
1211
4.98k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1212
4.98k
                                ? RowDescriptor(state->desc_tbl(),
1213
4.98k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1214
4.98k
                                                         .output_tuple_id})
1215
4.98k
                                : row_desc;
1216
4.98k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1217
4.98k
            }
1218
4.98k
            auto source_id = sources[i];
1219
4.98k
            OperatorPtr source_op;
1220
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1221
4.98k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1222
4.98k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1223
4.98k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1224
4.98k
                    /*operator_id=*/source_id);
1225
4.98k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1226
4.98k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1227
            // 2. create and set sink operator of data stream sender for new pipeline
1228
1229
4.98k
            DataSinkOperatorPtr sink_op;
1230
4.98k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1231
4.98k
                    state, *exchange_row_desc, next_sink_operator_id(),
1232
4.98k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1233
4.98k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1234
1235
4.98k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1236
4.98k
            {
1237
4.98k
                TDataSink* t = pool->add(new TDataSink());
1238
4.98k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1239
4.98k
                RETURN_IF_ERROR(sink_op->init(*t));
1240
4.98k
            }
1241
1242
            // 3. set dependency dag
1243
4.98k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1244
4.98k
        }
1245
1.73k
        if (sources.empty()) {
1246
0
            return Status::InternalError("size of sources must be greater than 0");
1247
0
        }
1248
1.73k
        break;
1249
1.73k
    }
1250
1.73k
    case TDataSinkType::BLACKHOLE_SINK: {
1251
3
        if (!thrift_sink.__isset.blackhole_sink) {
1252
0
            return Status::InternalError("Missing blackhole sink.");
1253
0
        }
1254
1255
3
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1256
3
        break;
1257
3
    }
1258
0
    case TDataSinkType::TVF_TABLE_SINK: {
1259
0
        if (!thrift_sink.__isset.tvf_table_sink) {
1260
0
            return Status::InternalError("Missing TVF table sink.");
1261
0
        }
1262
0
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1263
0
                                                        output_exprs);
1264
0
        break;
1265
0
    }
1266
0
    default:
1267
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1268
339k
    }
1269
339k
    return Status::OK();
1270
339k
}
1271
1272
// NOLINTBEGIN(readability-function-size)
1273
// NOLINTBEGIN(readability-function-cognitive-complexity)
1274
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1275
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1276
                                                 PipelinePtr& cur_pipe, int parent_idx,
1277
                                                 int child_idx,
1278
                                                 const bool followed_by_shuffled_operator,
1279
                                                 const bool require_bucket_distribution,
1280
550k
                                                 OperatorPtr& cache_op) {
1281
550k
    std::vector<DataSinkOperatorPtr> sink_ops;
1282
550k
    Defer defer = Defer([&]() {
1283
549k
        if (op) {
1284
549k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1285
549k
        }
1286
548k
        for (auto& s : sink_ops) {
1287
101k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1288
101k
        }
1289
548k
    });
1290
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1291
    // Therefore, here we need to use a stack-like structure.
1292
550k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1293
550k
    std::stringstream error_msg;
1294
550k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1295
1296
550k
    bool fe_with_old_version = false;
1297
550k
    switch (tnode.node_type) {
1298
167k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1299
167k
        op = std::make_shared<OlapScanOperatorX>(
1300
167k
                pool, tnode, next_operator_id(), descs, _num_instances,
1301
167k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1302
167k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1303
167k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1304
167k
        break;
1305
167k
    }
1306
77
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1307
77
        DCHECK(_query_ctx != nullptr);
1308
77
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1309
77
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1310
77
                                                    _num_instances);
1311
77
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1312
77
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1313
77
        break;
1314
77
    }
1315
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1316
0
        if (config::enable_java_support) {
1317
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1318
0
                                                     _num_instances);
1319
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1320
0
        } else {
1321
0
            return Status::InternalError(
1322
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1323
0
                    "to true and restart be.");
1324
0
        }
1325
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1326
0
        break;
1327
0
    }
1328
2.68k
    case TPlanNodeType::FILE_SCAN_NODE: {
1329
2.68k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1330
2.68k
                                                 _num_instances);
1331
2.68k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1332
2.68k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1333
2.68k
        break;
1334
2.68k
    }
1335
0
    case TPlanNodeType::ES_SCAN_NODE:
1336
0
    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
1337
0
        op = std::make_shared<EsScanOperatorX>(pool, tnode, next_operator_id(), descs,
1338
0
                                               _num_instances);
1339
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1340
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1341
0
        break;
1342
0
    }
1343
123k
    case TPlanNodeType::EXCHANGE_NODE: {
1344
123k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1345
123k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1346
18.4E
                                  : 0;
1347
123k
        DCHECK_GT(num_senders, 0);
1348
123k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1349
123k
                                                       num_senders);
1350
123k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1351
123k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1352
123k
        break;
1353
123k
    }
1354
146k
    case TPlanNodeType::AGGREGATION_NODE: {
1355
146k
        if (tnode.agg_node.grouping_exprs.empty() &&
1356
146k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1357
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1358
0
                                         ": group by and output is empty");
1359
0
        }
1360
146k
        bool need_create_cache_op =
1361
146k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1362
146k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1363
24
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1364
24
            auto cache_source_id = next_operator_id();
1365
24
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1366
24
                                                        _params.fragment.query_cache_param);
1367
24
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1368
1369
24
            const auto downstream_pipeline_id = cur_pipe->id();
1370
24
            if (!_dag.contains(downstream_pipeline_id)) {
1371
24
                _dag.insert({downstream_pipeline_id, {}});
1372
24
            }
1373
24
            new_pipe = add_pipeline(cur_pipe);
1374
24
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1375
1376
24
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1377
24
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1378
24
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1379
24
            return Status::OK();
1380
24
        };
1381
146k
        const bool group_by_limit_opt =
1382
146k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1383
1384
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1385
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1386
146k
        const bool enable_spill = _runtime_state->enable_spill() &&
1387
146k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1388
146k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1389
146k
                                      tnode.agg_node.use_streaming_preaggregation &&
1390
146k
                                      !tnode.agg_node.grouping_exprs.empty();
1391
        // TODO: distinct streaming agg does not support spill.
1392
146k
        const bool can_use_distinct_streaming_agg =
1393
146k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1394
146k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1395
146k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1396
146k
                _params.query_options.enable_distinct_streaming_aggregation;
1397
1398
146k
        if (can_use_distinct_streaming_agg) {
1399
92.1k
            if (need_create_cache_op) {
1400
8
                PipelinePtr new_pipe;
1401
8
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1402
1403
8
                cache_op = op;
1404
8
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1405
8
                                                                     tnode, descs);
1406
8
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1407
8
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1408
8
                cur_pipe = new_pipe;
1409
92.0k
            } else {
1410
92.0k
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1411
92.0k
                                                                     tnode, descs);
1412
92.0k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1413
92.0k
            }
1414
92.1k
        } else if (is_streaming_agg) {
1415
1.88k
            if (need_create_cache_op) {
1416
5
                PipelinePtr new_pipe;
1417
5
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1418
5
                cache_op = op;
1419
5
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1420
5
                                                             descs);
1421
5
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1422
5
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1423
5
                cur_pipe = new_pipe;
1424
1.87k
            } else {
1425
1.87k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1426
1.87k
                                                             descs);
1427
1.87k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1428
1.87k
            }
1429
52.0k
        } else {
1430
            // create new pipeline to add query cache operator
1431
52.0k
            PipelinePtr new_pipe;
1432
52.0k
            if (need_create_cache_op) {
1433
11
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1434
11
                cache_op = op;
1435
11
            }
1436
1437
52.0k
            if (enable_spill) {
1438
1.50k
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1439
1.50k
                                                                     next_operator_id(), descs);
1440
50.5k
            } else {
1441
50.5k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1442
50.5k
            }
1443
52.0k
            if (need_create_cache_op) {
1444
11
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1445
11
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1446
11
                cur_pipe = new_pipe;
1447
52.0k
            } else {
1448
52.0k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1449
52.0k
            }
1450
1451
52.0k
            const auto downstream_pipeline_id = cur_pipe->id();
1452
52.0k
            if (!_dag.contains(downstream_pipeline_id)) {
1453
49.8k
                _dag.insert({downstream_pipeline_id, {}});
1454
49.8k
            }
1455
52.0k
            cur_pipe = add_pipeline(cur_pipe);
1456
52.0k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1457
1458
52.0k
            if (enable_spill) {
1459
1.50k
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1460
1.50k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1461
50.5k
            } else {
1462
50.5k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1463
50.5k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1464
50.5k
            }
1465
52.0k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1466
52.0k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1467
52.0k
        }
1468
146k
        break;
1469
146k
    }
1470
146k
    case TPlanNodeType::HASH_JOIN_NODE: {
1471
7.17k
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1472
7.17k
                                       tnode.hash_join_node.is_broadcast_join;
1473
7.17k
        const auto enable_spill = _runtime_state->enable_spill();
1474
7.17k
        if (enable_spill && !is_broadcast_join) {
1475
0
            auto tnode_ = tnode;
1476
0
            tnode_.runtime_filters.clear();
1477
0
            auto inner_probe_operator =
1478
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1479
1480
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1481
            // So here use `tnode_` which has no runtime filters.
1482
0
            auto probe_side_inner_sink_operator =
1483
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1484
1485
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1486
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1487
1488
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1489
0
                    pool, tnode_, next_operator_id(), descs);
1490
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1491
0
                                                inner_probe_operator);
1492
0
            op = std::move(probe_operator);
1493
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1494
1495
0
            const auto downstream_pipeline_id = cur_pipe->id();
1496
0
            if (!_dag.contains(downstream_pipeline_id)) {
1497
0
                _dag.insert({downstream_pipeline_id, {}});
1498
0
            }
1499
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1500
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1501
1502
0
            auto inner_sink_operator =
1503
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1504
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1505
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1506
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1507
1508
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1509
0
            sink_ops.push_back(std::move(sink_operator));
1510
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1511
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1512
1513
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1514
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1515
7.17k
        } else {
1516
7.17k
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1517
7.17k
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1518
1519
7.17k
            const auto downstream_pipeline_id = cur_pipe->id();
1520
7.17k
            if (!_dag.contains(downstream_pipeline_id)) {
1521
6.34k
                _dag.insert({downstream_pipeline_id, {}});
1522
6.34k
            }
1523
7.17k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1524
7.17k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1525
1526
7.17k
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1527
7.17k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1528
7.17k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1529
7.17k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1530
1531
7.17k
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1532
7.17k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1533
7.17k
        }
1534
7.17k
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1535
1.83k
            std::shared_ptr<HashJoinSharedState> shared_state =
1536
1.83k
                    HashJoinSharedState::create_shared(_num_instances);
1537
11.5k
            for (int i = 0; i < _num_instances; i++) {
1538
9.67k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1539
9.67k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1540
9.67k
                sink_dep->set_shared_state(shared_state.get());
1541
9.67k
                shared_state->sink_deps.push_back(sink_dep);
1542
9.67k
            }
1543
1.83k
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1544
1.83k
                                                     op->node_id(), "HASH_JOIN_PROBE");
1545
1.83k
            _op_id_to_shared_state.insert(
1546
1.83k
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1547
1.83k
        }
1548
7.17k
        break;
1549
7.17k
    }
1550
4.14k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1551
4.14k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1552
4.14k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1553
1554
4.14k
        const auto downstream_pipeline_id = cur_pipe->id();
1555
4.14k
        if (!_dag.contains(downstream_pipeline_id)) {
1556
3.90k
            _dag.insert({downstream_pipeline_id, {}});
1557
3.90k
        }
1558
4.14k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1559
4.14k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1560
1561
4.14k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1562
4.14k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1563
4.14k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1564
4.14k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1565
4.14k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1566
4.14k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1567
4.14k
        break;
1568
4.14k
    }
1569
49.1k
    case TPlanNodeType::UNION_NODE: {
1570
49.1k
        int child_count = tnode.num_children;
1571
49.1k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1572
49.1k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1573
1574
49.1k
        const auto downstream_pipeline_id = cur_pipe->id();
1575
49.1k
        if (!_dag.contains(downstream_pipeline_id)) {
1576
48.4k
            _dag.insert({downstream_pipeline_id, {}});
1577
48.4k
        }
1578
50.2k
        for (int i = 0; i < child_count; i++) {
1579
1.16k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1580
1.16k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1581
1.16k
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1582
1.16k
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1583
1.16k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1584
1.16k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1585
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1586
1.16k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1587
1.16k
        }
1588
49.1k
        break;
1589
49.1k
    }
1590
49.1k
    case TPlanNodeType::SORT_NODE: {
1591
34.7k
        const auto should_spill = _runtime_state->enable_spill() &&
1592
34.7k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1593
34.7k
        const bool use_local_merge =
1594
34.7k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1595
34.7k
        if (should_spill) {
1596
9
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1597
34.7k
        } else if (use_local_merge) {
1598
32.6k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1599
32.6k
                                                                 descs);
1600
32.6k
        } else {
1601
2.07k
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1602
2.07k
        }
1603
34.7k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1604
1605
34.7k
        const auto downstream_pipeline_id = cur_pipe->id();
1606
34.7k
        if (!_dag.contains(downstream_pipeline_id)) {
1607
34.6k
            _dag.insert({downstream_pipeline_id, {}});
1608
34.6k
        }
1609
34.7k
        cur_pipe = add_pipeline(cur_pipe);
1610
34.7k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1611
1612
34.7k
        if (should_spill) {
1613
9
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1614
9
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1615
34.7k
        } else {
1616
34.7k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1617
34.7k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1618
34.7k
        }
1619
34.7k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1620
34.7k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1621
34.7k
        break;
1622
34.7k
    }
1623
34.7k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1624
65
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1625
65
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1626
1627
65
        const auto downstream_pipeline_id = cur_pipe->id();
1628
65
        if (!_dag.contains(downstream_pipeline_id)) {
1629
65
            _dag.insert({downstream_pipeline_id, {}});
1630
65
        }
1631
65
        cur_pipe = add_pipeline(cur_pipe);
1632
65
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1633
1634
65
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1635
65
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1636
65
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1637
65
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1638
65
        break;
1639
65
    }
1640
1.76k
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1641
1.76k
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1642
1.76k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1643
1644
1.76k
        const auto downstream_pipeline_id = cur_pipe->id();
1645
1.76k
        if (!_dag.contains(downstream_pipeline_id)) {
1646
1.75k
            _dag.insert({downstream_pipeline_id, {}});
1647
1.75k
        }
1648
1.76k
        cur_pipe = add_pipeline(cur_pipe);
1649
1.76k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1650
1651
1.76k
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1652
1.76k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1653
1.76k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1654
1.76k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1655
1.76k
        break;
1656
1.76k
    }
1657
1.76k
    case TPlanNodeType::MATERIALIZATION_NODE: {
1658
685
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1659
685
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1660
685
        break;
1661
685
    }
1662
685
    case TPlanNodeType::INTERSECT_NODE: {
1663
119
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1664
119
                                                                      cur_pipe, sink_ops));
1665
119
        break;
1666
119
    }
1667
129
    case TPlanNodeType::EXCEPT_NODE: {
1668
129
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1669
129
                                                                       cur_pipe, sink_ops));
1670
129
        break;
1671
129
    }
1672
328
    case TPlanNodeType::REPEAT_NODE: {
1673
328
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1674
328
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1675
328
        break;
1676
328
    }
1677
923
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1678
923
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1679
923
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1680
923
        break;
1681
923
    }
1682
923
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1683
18
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1684
18
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1685
18
        break;
1686
18
    }
1687
1.48k
    case TPlanNodeType::EMPTY_SET_NODE: {
1688
1.48k
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1689
1.48k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1690
1.48k
        break;
1691
1.48k
    }
1692
1.48k
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1693
265
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1694
265
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1695
265
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1696
265
        break;
1697
265
    }
1698
1.54k
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1699
1.54k
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1700
1.54k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1701
1.54k
        break;
1702
1.54k
    }
1703
5.01k
    case TPlanNodeType::META_SCAN_NODE: {
1704
5.01k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1705
5.01k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1706
5.01k
        break;
1707
5.01k
    }
1708
5.01k
    case TPlanNodeType::SELECT_NODE: {
1709
1.46k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1710
1.46k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1711
1.46k
        break;
1712
1.46k
    }
1713
1.46k
    case TPlanNodeType::REC_CTE_NODE: {
1714
151
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1715
151
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1716
1717
151
        const auto downstream_pipeline_id = cur_pipe->id();
1718
151
        if (!_dag.contains(downstream_pipeline_id)) {
1719
148
            _dag.insert({downstream_pipeline_id, {}});
1720
148
        }
1721
1722
151
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1723
151
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1724
1725
151
        DataSinkOperatorPtr anchor_sink;
1726
151
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1727
151
                                                                  op->operator_id(), tnode, descs);
1728
151
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1729
151
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1730
151
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1731
1732
151
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1733
151
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1734
1735
151
        DataSinkOperatorPtr rec_sink;
1736
151
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
1737
151
                                                         tnode, descs);
1738
151
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
1739
151
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
1740
151
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
1741
1742
151
        break;
1743
151
    }
1744
1.95k
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
1745
1.95k
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
1746
1.95k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1747
1.95k
        break;
1748
1.95k
    }
1749
1.95k
    default:
1750
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
1751
0
                                     print_plan_node_type(tnode.node_type));
1752
550k
    }
1753
547k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
1754
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
1755
0
        op->set_serial_operator();
1756
0
    }
1757
1758
547k
    return Status::OK();
1759
550k
}
1760
// NOLINTEND(readability-function-cognitive-complexity)
1761
// NOLINTEND(readability-function-size)
1762
1763
template <bool is_intersect>
1764
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
1765
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1766
248
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1767
248
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1768
248
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1769
1770
248
    const auto downstream_pipeline_id = cur_pipe->id();
1771
248
    if (!_dag.contains(downstream_pipeline_id)) {
1772
231
        _dag.insert({downstream_pipeline_id, {}});
1773
231
    }
1774
1775
837
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1776
589
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1777
589
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1778
1779
589
        if (child_id == 0) {
1780
248
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1781
248
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1782
341
        } else {
1783
341
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1784
341
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
341
        }
1786
589
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1787
589
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1788
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1789
589
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1790
589
    }
1791
1792
248
    return Status::OK();
1793
248
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1766
119
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1767
119
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1768
119
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1769
1770
119
    const auto downstream_pipeline_id = cur_pipe->id();
1771
119
    if (!_dag.contains(downstream_pipeline_id)) {
1772
110
        _dag.insert({downstream_pipeline_id, {}});
1773
110
    }
1774
1775
435
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1776
316
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1777
316
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1778
1779
316
        if (child_id == 0) {
1780
119
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1781
119
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1782
197
        } else {
1783
197
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1784
197
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
197
        }
1786
316
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1787
316
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1788
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1789
316
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1790
316
    }
1791
1792
119
    return Status::OK();
1793
119
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1766
129
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1767
129
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1768
129
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1769
1770
129
    const auto downstream_pipeline_id = cur_pipe->id();
1771
129
    if (!_dag.contains(downstream_pipeline_id)) {
1772
121
        _dag.insert({downstream_pipeline_id, {}});
1773
121
    }
1774
1775
402
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1776
273
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1777
273
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1778
1779
273
        if (child_id == 0) {
1780
129
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1781
129
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1782
144
        } else {
1783
144
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1784
144
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
144
        }
1786
273
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1787
273
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1788
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1789
273
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1790
273
    }
1791
1792
129
    return Status::OK();
1793
129
}
1794
1795
337k
Status PipelineFragmentContext::submit() {
1796
337k
    if (_submitted) {
1797
0
        return Status::InternalError("submitted");
1798
0
    }
1799
337k
    _submitted = true;
1800
1801
337k
    int submit_tasks = 0;
1802
337k
    Status st;
1803
337k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
1804
1.07M
    for (auto& task : _tasks) {
1805
1.85M
        for (auto& t : task) {
1806
1.85M
            st = scheduler->submit(t.first);
1807
1.85M
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
1808
1.85M
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
1809
1.85M
            if (!st) {
1810
0
                cancel(Status::InternalError("submit context to executor fail"));
1811
0
                std::lock_guard<std::mutex> l(_task_mutex);
1812
0
                _total_tasks = submit_tasks;
1813
0
                break;
1814
0
            }
1815
1.85M
            submit_tasks++;
1816
1.85M
        }
1817
1.07M
    }
1818
337k
    if (!st.ok()) {
1819
0
        bool need_remove = false;
1820
0
        {
1821
0
            std::lock_guard<std::mutex> l(_task_mutex);
1822
0
            if (_closed_tasks >= _total_tasks) {
1823
0
                need_remove = _close_fragment_instance();
1824
0
            }
1825
0
        }
1826
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1827
0
        if (need_remove) {
1828
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1829
0
        }
1830
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
1831
0
                                     BackendOptions::get_localhost());
1832
337k
    } else {
1833
337k
        return st;
1834
337k
    }
1835
337k
}
1836
1837
0
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
1838
0
    if (_runtime_state->enable_profile()) {
1839
0
        std::stringstream ss;
1840
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1841
0
            runtime_profile_ptr->pretty_print(&ss);
1842
0
        }
1843
1844
0
        if (_runtime_state->load_channel_profile()) {
1845
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1846
0
        }
1847
1848
0
        auto profile_str =
1849
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
1850
0
                            this->_fragment_id, extra_info, ss.str());
1851
0
        LOG_LONG_STRING(INFO, profile_str);
1852
0
    }
1853
0
}
1854
// If all pipeline tasks binded to the fragment instance are finished, then we could
1855
// close the fragment instance.
1856
// Returns true if the caller should call remove_pipeline_context() **after** releasing
1857
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
1858
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
1859
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
1860
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
1861
// (via debug_string()).
1862
340k
bool PipelineFragmentContext::_close_fragment_instance() {
1863
340k
    if (_is_fragment_instance_closed) {
1864
0
        return false;
1865
0
    }
1866
340k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
1867
340k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1868
340k
    if (!_need_notify_close) {
1869
336k
        auto st = send_report(true);
1870
336k
        if (!st) {
1871
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1872
0
                                        print_id(_query_id), _fragment_id, st.to_string());
1873
0
        }
1874
336k
    }
1875
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
1876
    // Since stream load does not have someting like coordinator on FE, so
1877
    // backend can not report profile to FE, ant its profile can not be shown
1878
    // in the same way with other query. So we print the profile content to info log.
1879
1880
340k
    if (_runtime_state->enable_profile() &&
1881
340k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
1882
2.28k
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
1883
2.28k
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
1884
0
        std::stringstream ss;
1885
        // Compute the _local_time_percent before pretty_print the runtime_profile
1886
        // Before add this operation, the print out like that:
1887
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
1888
        // After add the operation, the print out like that:
1889
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
1890
        // We can easily know the exec node execute time without child time consumed.
1891
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1892
0
            runtime_profile_ptr->pretty_print(&ss);
1893
0
        }
1894
1895
0
        if (_runtime_state->load_channel_profile()) {
1896
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1897
0
        }
1898
1899
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
1900
0
    }
1901
1902
340k
    if (_query_ctx->enable_profile()) {
1903
2.28k
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
1904
2.28k
                                         collect_realtime_load_channel_profile());
1905
2.28k
    }
1906
1907
    // Return whether the caller needs to remove from the pipeline map.
1908
    // The caller must do this after releasing _task_mutex.
1909
340k
    return !_need_notify_close;
1910
340k
}
1911
1912
1.84M
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
1913
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
1914
1.84M
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
1915
1.84M
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
1916
548k
        if (_dag.contains(pipeline_id)) {
1917
310k
            for (auto dep : _dag[pipeline_id]) {
1918
310k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
1919
310k
            }
1920
248k
        }
1921
548k
    }
1922
1.84M
    bool need_remove = false;
1923
1.84M
    {
1924
1.84M
        std::lock_guard<std::mutex> l(_task_mutex);
1925
1.84M
        ++_closed_tasks;
1926
1.84M
        if (_closed_tasks >= _total_tasks) {
1927
340k
            need_remove = _close_fragment_instance();
1928
340k
        }
1929
1.84M
    }
1930
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1931
1.84M
    if (need_remove) {
1932
336k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1933
336k
    }
1934
1.84M
}
1935
1936
41.8k
std::string PipelineFragmentContext::get_load_error_url() {
1937
41.8k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
1938
0
        return to_load_error_http_path(str);
1939
0
    }
1940
112k
    for (auto& tasks : _tasks) {
1941
193k
        for (auto& task : tasks) {
1942
193k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
1943
148
                return to_load_error_http_path(str);
1944
148
            }
1945
193k
        }
1946
112k
    }
1947
41.7k
    return "";
1948
41.8k
}
1949
1950
41.8k
std::string PipelineFragmentContext::get_first_error_msg() {
1951
41.8k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
1952
0
        return str;
1953
0
    }
1954
112k
    for (auto& tasks : _tasks) {
1955
193k
        for (auto& task : tasks) {
1956
193k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
1957
148
                return str;
1958
148
            }
1959
193k
        }
1960
112k
    }
1961
41.7k
    return "";
1962
41.8k
}
1963
1964
340k
Status PipelineFragmentContext::send_report(bool done) {
1965
340k
    Status exec_status = _query_ctx->exec_status();
1966
    // If plan is done successfully, but _is_report_success is false,
1967
    // no need to send report.
1968
    // Load will set _is_report_success to true because load wants to know
1969
    // the process.
1970
340k
    if (!_is_report_success && done && exec_status.ok()) {
1971
304k
        return Status::OK();
1972
304k
    }
1973
1974
    // If both _is_report_success and _is_report_on_cancel are false,
1975
    // which means no matter query is success or failed, no report is needed.
1976
    // This may happen when the query limit reached and
1977
    // a internal cancellation being processed
1978
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
1979
    // be set to false, to avoid sending fault report to FE.
1980
36.4k
    if (!_is_report_success && !_is_report_on_cancel) {
1981
142
        if (done) {
1982
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
1983
142
            return Status::OK();
1984
142
        }
1985
0
        return Status::NeedSendAgain("");
1986
142
    }
1987
1988
36.3k
    std::vector<RuntimeState*> runtime_states;
1989
1990
83.9k
    for (auto& tasks : _tasks) {
1991
131k
        for (auto& task : tasks) {
1992
131k
            runtime_states.push_back(task.second.get());
1993
131k
        }
1994
83.9k
    }
1995
1996
36.3k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
1997
36.3k
                                        ? get_load_error_url()
1998
18.4E
                                        : _query_ctx->get_load_error_url();
1999
36.3k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2000
36.3k
                                          ? get_first_error_msg()
2001
18.4E
                                          : _query_ctx->get_first_error_msg();
2002
2003
36.3k
    ReportStatusRequest req {.status = exec_status,
2004
36.3k
                             .runtime_states = runtime_states,
2005
36.3k
                             .done = done || !exec_status.ok(),
2006
36.3k
                             .coord_addr = _query_ctx->coord_addr,
2007
36.3k
                             .query_id = _query_id,
2008
36.3k
                             .fragment_id = _fragment_id,
2009
36.3k
                             .fragment_instance_id = TUniqueId(),
2010
36.3k
                             .backend_num = -1,
2011
36.3k
                             .runtime_state = _runtime_state.get(),
2012
36.3k
                             .load_error_url = load_eror_url,
2013
36.3k
                             .first_error_msg = first_error_msg,
2014
36.3k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2015
2016
36.3k
    return _report_status_cb(
2017
36.3k
            req, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
2018
36.4k
}
2019
2020
28
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2021
28
    size_t res = 0;
2022
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2023
    // here to traverse the vector.
2024
28
    for (const auto& task_instances : _tasks) {
2025
80
        for (const auto& task : task_instances) {
2026
80
            if (task.first->is_running()) {
2027
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2028
0
                                      << " is running, task: " << (void*)task.first.get()
2029
0
                                      << ", is_running: " << task.first->is_running();
2030
0
                *has_running_task = true;
2031
0
                return 0;
2032
0
            }
2033
2034
80
            size_t revocable_size = task.first->get_revocable_size();
2035
80
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2036
8
                res += revocable_size;
2037
8
            }
2038
80
        }
2039
28
    }
2040
28
    return res;
2041
28
}
2042
2043
56
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2044
56
    std::vector<PipelineTask*> revocable_tasks;
2045
56
    for (const auto& task_instances : _tasks) {
2046
160
        for (const auto& task : task_instances) {
2047
160
            size_t revocable_size_ = task.first->get_revocable_size();
2048
2049
160
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2050
16
                revocable_tasks.emplace_back(task.first.get());
2051
16
            }
2052
160
        }
2053
56
    }
2054
56
    return revocable_tasks;
2055
56
}
2056
2057
304
std::string PipelineFragmentContext::debug_string() {
2058
304
    std::lock_guard<std::mutex> l(_task_mutex);
2059
304
    fmt::memory_buffer debug_string_buffer;
2060
304
    fmt::format_to(debug_string_buffer,
2061
304
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2062
304
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2063
304
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2064
2.93k
    for (size_t j = 0; j < _tasks.size(); j++) {
2065
2.62k
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2066
7.43k
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2067
4.81k
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2068
4.81k
                           _tasks[j][i].first->debug_string());
2069
4.81k
        }
2070
2.62k
    }
2071
2072
304
    return fmt::to_string(debug_string_buffer);
2073
304
}
2074
2075
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2076
2.28k
PipelineFragmentContext::collect_realtime_profile() const {
2077
2.28k
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2078
2079
    // we do not have mutex to protect pipeline_id_to_profile
2080
    // so we need to make sure this funciton is invoked after fragment context
2081
    // has already been prepared.
2082
2.28k
    if (!_prepared) {
2083
0
        std::string msg =
2084
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2085
0
        DCHECK(false) << msg;
2086
0
        LOG_ERROR(msg);
2087
0
        return res;
2088
0
    }
2089
2090
    // Make sure first profile is fragment level profile
2091
2.28k
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2092
2.28k
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2093
2.28k
    res.push_back(fragment_profile);
2094
2095
    // pipeline_id_to_profile is initialized in prepare stage
2096
4.07k
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2097
4.07k
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2098
4.07k
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2099
4.07k
        res.push_back(profile_ptr);
2100
4.07k
    }
2101
2102
2.28k
    return res;
2103
2.28k
}
2104
2105
std::shared_ptr<TRuntimeProfileTree>
2106
2.28k
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2107
    // we do not have mutex to protect pipeline_id_to_profile
2108
    // so we need to make sure this funciton is invoked after fragment context
2109
    // has already been prepared.
2110
2.28k
    if (!_prepared) {
2111
0
        std::string msg =
2112
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2113
0
        DCHECK(false) << msg;
2114
0
        LOG_ERROR(msg);
2115
0
        return nullptr;
2116
0
    }
2117
2118
4.78k
    for (const auto& tasks : _tasks) {
2119
9.70k
        for (const auto& task : tasks) {
2120
9.70k
            if (task.second->load_channel_profile() == nullptr) {
2121
0
                continue;
2122
0
            }
2123
2124
9.70k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2125
2126
9.70k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2127
9.70k
                                                           _runtime_state->profile_level());
2128
9.70k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2129
9.70k
        }
2130
4.78k
    }
2131
2132
2.28k
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2133
2.28k
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2134
2.28k
                                                      _runtime_state->profile_level());
2135
2.28k
    return load_channel_profile;
2136
2.28k
}
2137
2138
// Collect runtime filter IDs registered by all tasks in this PFC.
2139
// Used during recursive CTE stage transitions to know which filters to deregister
2140
// before creating the new PFC for the next recursion round.
2141
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2142
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2143
// the vector sizes do not change, and individual RuntimeState filter sets are
2144
// written only during open() which has completed by the time we reach rerun.
2145
3.28k
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2146
3.28k
    std::set<int> result;
2147
12.6k
    for (const auto& _task : _tasks) {
2148
21.1k
        for (const auto& task : _task) {
2149
21.1k
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2150
21.1k
            result.merge(set);
2151
21.1k
        }
2152
12.6k
    }
2153
3.28k
    if (_runtime_state) {
2154
3.28k
        auto set = _runtime_state->get_deregister_runtime_filter();
2155
3.28k
        result.merge(set);
2156
3.28k
    }
2157
3.28k
    return result;
2158
3.28k
}
2159
2160
341k
void PipelineFragmentContext::_release_resource() {
2161
341k
    std::lock_guard<std::mutex> l(_task_mutex);
2162
    // The memory released by the query end is recorded in the query mem tracker.
2163
341k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2164
341k
    auto st = _query_ctx->exec_status();
2165
1.08M
    for (auto& _task : _tasks) {
2166
1.08M
        if (!_task.empty()) {
2167
1.08M
            _call_back(_task.front().first->runtime_state(), &st);
2168
1.08M
        }
2169
1.08M
    }
2170
341k
    _tasks.clear();
2171
341k
    _dag.clear();
2172
341k
    _pip_id_to_pipeline.clear();
2173
341k
    _pipelines.clear();
2174
341k
    _sink.reset();
2175
341k
    _root_op.reset();
2176
341k
    _runtime_filter_mgr_map.clear();
2177
341k
    _op_id_to_shared_state.clear();
2178
341k
}
2179
2180
#include "common/compile_check_end.h"
2181
} // namespace doris