Coverage Report

Created: 2026-03-31 16:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <pthread.h>
24
25
#include <algorithm>
26
#include <cstdlib>
27
// IWYU pragma: no_include <bits/chrono.h>
28
#include <fmt/format.h>
29
30
#include <chrono> // IWYU pragma: keep
31
#include <map>
32
#include <memory>
33
#include <ostream>
34
#include <utility>
35
36
#include "cloud/config.h"
37
#include "common/cast_set.h"
38
#include "common/config.h"
39
#include "common/exception.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "exec/exchange/local_exchange_sink_operator.h"
43
#include "exec/exchange/local_exchange_source_operator.h"
44
#include "exec/exchange/local_exchanger.h"
45
#include "exec/exchange/vdata_stream_mgr.h"
46
#include "exec/operator/aggregation_sink_operator.h"
47
#include "exec/operator/aggregation_source_operator.h"
48
#include "exec/operator/analytic_sink_operator.h"
49
#include "exec/operator/analytic_source_operator.h"
50
#include "exec/operator/assert_num_rows_operator.h"
51
#include "exec/operator/blackhole_sink_operator.h"
52
#include "exec/operator/cache_sink_operator.h"
53
#include "exec/operator/cache_source_operator.h"
54
#include "exec/operator/datagen_operator.h"
55
#include "exec/operator/dict_sink_operator.h"
56
#include "exec/operator/distinct_streaming_aggregation_operator.h"
57
#include "exec/operator/empty_set_operator.h"
58
#include "exec/operator/es_scan_operator.h"
59
#include "exec/operator/exchange_sink_operator.h"
60
#include "exec/operator/exchange_source_operator.h"
61
#include "exec/operator/file_scan_operator.h"
62
#include "exec/operator/group_commit_block_sink_operator.h"
63
#include "exec/operator/group_commit_scan_operator.h"
64
#include "exec/operator/hashjoin_build_sink.h"
65
#include "exec/operator/hashjoin_probe_operator.h"
66
#include "exec/operator/hive_table_sink_operator.h"
67
#include "exec/operator/iceberg_delete_sink_operator.h"
68
#include "exec/operator/iceberg_merge_sink_operator.h"
69
#include "exec/operator/iceberg_table_sink_operator.h"
70
#include "exec/operator/jdbc_scan_operator.h"
71
#include "exec/operator/jdbc_table_sink_operator.h"
72
#include "exec/operator/local_merge_sort_source_operator.h"
73
#include "exec/operator/materialization_opertor.h"
74
#include "exec/operator/maxcompute_table_sink_operator.h"
75
#include "exec/operator/memory_scratch_sink_operator.h"
76
#include "exec/operator/meta_scan_operator.h"
77
#include "exec/operator/multi_cast_data_stream_sink.h"
78
#include "exec/operator/multi_cast_data_stream_source.h"
79
#include "exec/operator/nested_loop_join_build_operator.h"
80
#include "exec/operator/nested_loop_join_probe_operator.h"
81
#include "exec/operator/olap_scan_operator.h"
82
#include "exec/operator/olap_table_sink_operator.h"
83
#include "exec/operator/olap_table_sink_v2_operator.h"
84
#include "exec/operator/partition_sort_sink_operator.h"
85
#include "exec/operator/partition_sort_source_operator.h"
86
#include "exec/operator/partitioned_aggregation_sink_operator.h"
87
#include "exec/operator/partitioned_aggregation_source_operator.h"
88
#include "exec/operator/partitioned_hash_join_probe_operator.h"
89
#include "exec/operator/partitioned_hash_join_sink_operator.h"
90
#include "exec/operator/rec_cte_anchor_sink_operator.h"
91
#include "exec/operator/rec_cte_scan_operator.h"
92
#include "exec/operator/rec_cte_sink_operator.h"
93
#include "exec/operator/rec_cte_source_operator.h"
94
#include "exec/operator/repeat_operator.h"
95
#include "exec/operator/result_file_sink_operator.h"
96
#include "exec/operator/result_sink_operator.h"
97
#include "exec/operator/schema_scan_operator.h"
98
#include "exec/operator/select_operator.h"
99
#include "exec/operator/set_probe_sink_operator.h"
100
#include "exec/operator/set_sink_operator.h"
101
#include "exec/operator/set_source_operator.h"
102
#include "exec/operator/sort_sink_operator.h"
103
#include "exec/operator/sort_source_operator.h"
104
#include "exec/operator/spill_iceberg_table_sink_operator.h"
105
#include "exec/operator/spill_sort_sink_operator.h"
106
#include "exec/operator/spill_sort_source_operator.h"
107
#include "exec/operator/streaming_aggregation_operator.h"
108
#include "exec/operator/table_function_operator.h"
109
#include "exec/operator/tvf_table_sink_operator.h"
110
#include "exec/operator/union_sink_operator.h"
111
#include "exec/operator/union_source_operator.h"
112
#include "exec/pipeline/dependency.h"
113
#include "exec/pipeline/pipeline_task.h"
114
#include "exec/pipeline/task_scheduler.h"
115
#include "exec/runtime_filter/runtime_filter_mgr.h"
116
#include "exec/sort/topn_sorter.h"
117
#include "exec/spill/spill_file.h"
118
#include "io/fs/stream_load_pipe.h"
119
#include "load/stream_load/new_load_stream_mgr.h"
120
#include "runtime/exec_env.h"
121
#include "runtime/fragment_mgr.h"
122
#include "runtime/result_block_buffer.h"
123
#include "runtime/result_buffer_mgr.h"
124
#include "runtime/runtime_state.h"
125
#include "runtime/thread_context.h"
126
#include "service/backend_options.h"
127
#include "util/countdown_latch.h"
128
#include "util/debug_util.h"
129
#include "util/uid_util.h"
130
131
namespace doris {
132
#include "common/compile_check_begin.h"
133
PipelineFragmentContext::PipelineFragmentContext(
134
        TUniqueId query_id, const TPipelineFragmentParams& request,
135
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
136
        const std::function<void(RuntimeState*, Status*)>& call_back,
137
        report_status_callback report_status_cb)
138
431k
        : _query_id(std::move(query_id)),
139
431k
          _fragment_id(request.fragment_id),
140
431k
          _exec_env(exec_env),
141
431k
          _query_ctx(std::move(query_ctx)),
142
431k
          _call_back(call_back),
143
431k
          _is_report_on_cancel(true),
144
431k
          _report_status_cb(std::move(report_status_cb)),
145
431k
          _params(request),
146
431k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
147
431k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
148
431k
                                                               : false) {
149
431k
    _fragment_watcher.start();
150
431k
}
151
152
431k
PipelineFragmentContext::~PipelineFragmentContext() {
153
431k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
154
431k
            .tag("query_id", print_id(_query_id))
155
431k
            .tag("fragment_id", _fragment_id);
156
431k
    _release_resource();
157
431k
    {
158
        // The memory released by the query end is recorded in the query mem tracker.
159
431k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
160
431k
        _runtime_state.reset();
161
431k
        _query_ctx.reset();
162
431k
    }
163
431k
}
164
165
201
bool PipelineFragmentContext::is_timeout(timespec now) const {
166
201
    if (_timeout <= 0) {
167
0
        return false;
168
0
    }
169
201
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
170
201
}
171
172
// notify_close() transitions the PFC from "waiting for external close notification" to
173
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
174
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
175
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
176
9.30k
bool PipelineFragmentContext::notify_close() {
177
9.30k
    bool all_closed = false;
178
9.30k
    bool need_remove = false;
179
9.30k
    {
180
9.30k
        std::lock_guard<std::mutex> l(_task_mutex);
181
9.30k
        if (_closed_tasks >= _total_tasks) {
182
3.48k
            if (_need_notify_close) {
183
                // Fragment was cancelled and waiting for notify to close.
184
                // Record that we need to remove from fragment mgr, but do it
185
                // after releasing _task_mutex to avoid ABBA deadlock with
186
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
187
                // first, then _task_mutex via debug_string()).
188
3.40k
                need_remove = true;
189
3.40k
            }
190
3.48k
            all_closed = true;
191
3.48k
        }
192
        // make fragment release by self after cancel
193
9.30k
        _need_notify_close = false;
194
9.30k
    }
195
9.30k
    if (need_remove) {
196
3.40k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
197
3.40k
    }
198
9.30k
    return all_closed;
199
9.30k
}
200
201
// Must not add lock in this method. Because it will call query ctx cancel. And
202
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
203
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
204
// There maybe dead lock.
205
5.82k
void PipelineFragmentContext::cancel(const Status reason) {
206
5.82k
    LOG_INFO("PipelineFragmentContext::cancel")
207
5.82k
            .tag("query_id", print_id(_query_id))
208
5.82k
            .tag("fragment_id", _fragment_id)
209
5.82k
            .tag("reason", reason.to_string());
210
5.82k
    if (notify_close()) {
211
92
        return;
212
92
    }
213
    // Timeout is a special error code, we need print current stack to debug timeout issue.
214
5.72k
    if (reason.is<ErrorCode::TIMEOUT>()) {
215
8
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
216
8
                                   debug_string());
217
8
        LOG_LONG_STRING(WARNING, dbg_str);
218
8
    }
219
220
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
221
5.72k
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
222
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
223
0
                    debug_string());
224
0
    }
225
226
5.72k
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
227
12
        print_profile("cancel pipeline, reason: " + reason.to_string());
228
12
    }
229
230
5.72k
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
231
21
        _query_ctx->set_load_error_url(error_url);
232
21
    }
233
234
5.72k
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
235
21
        _query_ctx->set_first_error_msg(first_error_msg);
236
21
    }
237
238
5.72k
    _query_ctx->cancel(reason, _fragment_id);
239
5.72k
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
240
541
        _is_report_on_cancel = false;
241
5.18k
    } else {
242
18.8k
        for (auto& id : _fragment_instance_ids) {
243
18.8k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
244
18.8k
        }
245
5.18k
    }
246
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
247
    // For stream load the fragment's query_id == load id, it is set in FE.
248
5.72k
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
249
5.72k
    if (stream_load_ctx != nullptr) {
250
30
        stream_load_ctx->pipe->cancel(reason.to_string());
251
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
252
        // We need to set the error URL at this point to ensure error information is properly
253
        // propagated to the client.
254
30
        stream_load_ctx->error_url = get_load_error_url();
255
30
        stream_load_ctx->first_error_msg = get_first_error_msg();
256
30
    }
257
258
21.1k
    for (auto& tasks : _tasks) {
259
52.0k
        for (auto& task : tasks) {
260
52.0k
            task.first->unblock_all_dependencies();
261
52.0k
        }
262
21.1k
    }
263
5.72k
}
264
265
677k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
266
677k
    PipelineId id = _next_pipeline_id++;
267
677k
    auto pipeline = std::make_shared<Pipeline>(
268
677k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
269
677k
            parent ? parent->num_tasks() : _num_instances);
270
677k
    if (idx >= 0) {
271
111k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
272
565k
    } else {
273
565k
        _pipelines.emplace_back(pipeline);
274
565k
    }
275
677k
    if (parent) {
276
239k
        parent->set_children(pipeline);
277
239k
    }
278
677k
    return pipeline;
279
677k
}
280
281
431k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
282
431k
    {
283
431k
        SCOPED_TIMER(_build_pipelines_timer);
284
        // 2. Build pipelines with operators in this fragment.
285
431k
        auto root_pipeline = add_pipeline();
286
431k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
287
431k
                                         &_root_op, root_pipeline));
288
289
        // 3. Create sink operator
290
431k
        if (!_params.fragment.__isset.output_sink) {
291
0
            return Status::InternalError("No output sink in this fragment!");
292
0
        }
293
431k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
294
431k
                                          _params.fragment.output_exprs, _params,
295
431k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
296
431k
                                          *_desc_tbl, root_pipeline->id()));
297
431k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
298
431k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
299
300
565k
        for (PipelinePtr& pipeline : _pipelines) {
301
18.4E
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
302
565k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
303
565k
        }
304
431k
    }
305
    // 4. Build local exchanger
306
431k
    if (_runtime_state->enable_local_shuffle()) {
307
429k
        SCOPED_TIMER(_plan_local_exchanger_timer);
308
429k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
309
429k
                                             _params.bucket_seq_to_instance_idx,
310
429k
                                             _params.shuffle_idx_to_instance_idx));
311
429k
    }
312
313
    // 5. Initialize global states in pipelines.
314
678k
    for (PipelinePtr& pipeline : _pipelines) {
315
678k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
316
678k
        pipeline->children().clear();
317
678k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
318
678k
    }
319
320
430k
    {
321
430k
        SCOPED_TIMER(_build_tasks_timer);
322
        // 6. Build pipeline tasks and initialize local state.
323
430k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
324
430k
    }
325
326
430k
    return Status::OK();
327
430k
}
328
329
431k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
330
431k
    if (_prepared) {
331
0
        return Status::InternalError("Already prepared");
332
0
    }
333
431k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
334
431k
        _timeout = _params.query_options.execution_timeout;
335
431k
    }
336
337
431k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
338
431k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
339
431k
    SCOPED_TIMER(_prepare_timer);
340
431k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
341
431k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
342
431k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
343
431k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
344
431k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
345
431k
    {
346
431k
        SCOPED_TIMER(_init_context_timer);
347
431k
        cast_set(_num_instances, _params.local_params.size());
348
431k
        _total_instances =
349
431k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
350
351
431k
        auto* fragment_context = this;
352
353
431k
        if (_params.query_options.__isset.is_report_success) {
354
429k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
355
429k
        }
356
357
        // 1. Set up the global runtime state.
358
431k
        _runtime_state = RuntimeState::create_unique(
359
431k
                _params.query_id, _params.fragment_id, _params.query_options,
360
431k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
361
431k
        _runtime_state->set_task_execution_context(shared_from_this());
362
431k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
363
431k
        if (_params.__isset.backend_id) {
364
429k
            _runtime_state->set_backend_id(_params.backend_id);
365
429k
        }
366
431k
        if (_params.__isset.import_label) {
367
237
            _runtime_state->set_import_label(_params.import_label);
368
237
        }
369
431k
        if (_params.__isset.db_name) {
370
189
            _runtime_state->set_db_name(_params.db_name);
371
189
        }
372
431k
        if (_params.__isset.load_job_id) {
373
0
            _runtime_state->set_load_job_id(_params.load_job_id);
374
0
        }
375
376
431k
        if (_params.is_simplified_param) {
377
148k
            _desc_tbl = _query_ctx->desc_tbl;
378
282k
        } else {
379
282k
            DCHECK(_params.__isset.desc_tbl);
380
282k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
381
282k
                                                  &_desc_tbl));
382
282k
        }
383
431k
        _runtime_state->set_desc_tbl(_desc_tbl);
384
431k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
385
431k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
386
431k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
387
431k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
388
389
        // init fragment_instance_ids
390
431k
        const auto target_size = _params.local_params.size();
391
431k
        _fragment_instance_ids.resize(target_size);
392
1.69M
        for (size_t i = 0; i < _params.local_params.size(); i++) {
393
1.26M
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
394
1.26M
            _fragment_instance_ids[i] = fragment_instance_id;
395
1.26M
        }
396
431k
    }
397
398
431k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
399
400
430k
    _init_next_report_time();
401
402
430k
    _prepared = true;
403
430k
    return Status::OK();
404
431k
}
405
406
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
407
        int instance_idx,
408
1.26M
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
409
1.26M
    const auto& local_params = _params.local_params[instance_idx];
410
1.26M
    auto fragment_instance_id = local_params.fragment_instance_id;
411
1.26M
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
412
1.26M
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
413
1.26M
    auto get_shared_state = [&](PipelinePtr pipeline)
414
1.26M
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
415
2.12M
                                       std::vector<std::shared_ptr<Dependency>>>> {
416
2.12M
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
417
2.12M
                                std::vector<std::shared_ptr<Dependency>>>>
418
2.12M
                shared_state_map;
419
2.77M
        for (auto& op : pipeline->operators()) {
420
2.77M
            auto source_id = op->operator_id();
421
2.77M
            if (auto iter = _op_id_to_shared_state.find(source_id);
422
2.77M
                iter != _op_id_to_shared_state.end()) {
423
827k
                shared_state_map.insert({source_id, iter->second});
424
827k
            }
425
2.77M
        }
426
2.12M
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
427
2.12M
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
428
2.12M
                iter != _op_id_to_shared_state.end()) {
429
338k
                shared_state_map.insert({sink_to_source_id, iter->second});
430
338k
            }
431
2.12M
        }
432
2.12M
        return shared_state_map;
433
2.12M
    };
434
435
3.86M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
436
2.60M
        auto& pipeline = _pipelines[pip_idx];
437
2.60M
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
438
2.11M
            auto task_runtime_state = RuntimeState::create_unique(
439
2.11M
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
440
2.11M
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
441
2.11M
            {
442
                // Initialize runtime state for this task
443
2.11M
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
444
445
2.11M
                task_runtime_state->set_task_execution_context(shared_from_this());
446
2.11M
                task_runtime_state->set_be_number(local_params.backend_num);
447
448
2.11M
                if (_params.__isset.backend_id) {
449
2.11M
                    task_runtime_state->set_backend_id(_params.backend_id);
450
2.11M
                }
451
2.11M
                if (_params.__isset.import_label) {
452
238
                    task_runtime_state->set_import_label(_params.import_label);
453
238
                }
454
2.11M
                if (_params.__isset.db_name) {
455
190
                    task_runtime_state->set_db_name(_params.db_name);
456
190
                }
457
2.11M
                if (_params.__isset.load_job_id) {
458
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
459
0
                }
460
2.11M
                if (_params.__isset.wal_id) {
461
114
                    task_runtime_state->set_wal_id(_params.wal_id);
462
114
                }
463
2.11M
                if (_params.__isset.content_length) {
464
31
                    task_runtime_state->set_content_length(_params.content_length);
465
31
                }
466
467
2.11M
                task_runtime_state->set_desc_tbl(_desc_tbl);
468
2.11M
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
469
2.11M
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
470
2.11M
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
471
2.11M
                task_runtime_state->set_max_operator_id(max_operator_id());
472
2.11M
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
473
2.11M
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
474
2.11M
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
475
476
2.11M
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
477
2.11M
            }
478
2.11M
            auto cur_task_id = _total_tasks++;
479
2.11M
            task_runtime_state->set_task_id(cur_task_id);
480
2.11M
            task_runtime_state->set_task_num(pipeline->num_tasks());
481
2.11M
            auto task = std::make_shared<PipelineTask>(
482
2.11M
                    pipeline, cur_task_id, task_runtime_state.get(),
483
2.11M
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
484
2.11M
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
485
2.11M
                    instance_idx);
486
2.11M
            pipeline->incr_created_tasks(instance_idx, task.get());
487
2.11M
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
488
2.11M
            _tasks[instance_idx].emplace_back(
489
2.11M
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
490
2.11M
                            std::move(task), std::move(task_runtime_state)});
491
2.11M
        }
492
2.60M
    }
493
494
    /**
495
         * Build DAG for pipeline tasks.
496
         * For example, we have
497
         *
498
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
499
         *            \                      /
500
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
501
         *                 \                          /
502
         *               JoinProbeOperator2 (Pipeline1)
503
         *
504
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
505
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
506
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
507
         *
508
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
509
         * and JoinProbeOperator2.
510
         */
511
2.60M
    for (auto& _pipeline : _pipelines) {
512
2.60M
        if (pipeline_id_to_task.contains(_pipeline->id())) {
513
2.11M
            auto* task = pipeline_id_to_task[_pipeline->id()];
514
2.11M
            DCHECK(task != nullptr);
515
516
            // If this task has upstream dependency, then inject it into this task.
517
2.11M
            if (_dag.contains(_pipeline->id())) {
518
1.34M
                auto& deps = _dag[_pipeline->id()];
519
2.15M
                for (auto& dep : deps) {
520
2.15M
                    if (pipeline_id_to_task.contains(dep)) {
521
1.17M
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
522
1.17M
                        if (ss) {
523
508k
                            task->inject_shared_state(ss);
524
665k
                        } else {
525
665k
                            pipeline_id_to_task[dep]->inject_shared_state(
526
665k
                                    task->get_source_shared_state());
527
665k
                        }
528
1.17M
                    }
529
2.15M
                }
530
1.34M
            }
531
2.11M
        }
532
2.60M
    }
533
3.86M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
534
2.60M
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
535
2.11M
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
536
2.11M
            DCHECK(pipeline_id_to_profile[pip_idx]);
537
2.11M
            std::vector<TScanRangeParams> scan_ranges;
538
2.11M
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
539
2.11M
            if (local_params.per_node_scan_ranges.contains(node_id)) {
540
342k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
541
342k
            }
542
2.11M
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
543
2.11M
                                                             _params.fragment.output_sink));
544
2.11M
        }
545
2.60M
    }
546
1.26M
    {
547
1.26M
        std::lock_guard<std::mutex> l(_state_map_lock);
548
1.26M
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
549
1.26M
    }
550
1.26M
    return Status::OK();
551
1.26M
}
552
553
430k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
554
430k
    _total_tasks = 0;
555
430k
    _closed_tasks = 0;
556
430k
    const auto target_size = _params.local_params.size();
557
430k
    _tasks.resize(target_size);
558
430k
    _runtime_filter_mgr_map.resize(target_size);
559
1.10M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
560
676k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
561
676k
    }
562
430k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
563
564
430k
    if (target_size > 1 &&
565
430k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
566
137k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
567
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
568
24.4k
        std::vector<Status> prepare_status(target_size);
569
24.4k
        int submitted_tasks = 0;
570
24.4k
        Status submit_status;
571
24.4k
        CountDownLatch latch((int)target_size);
572
333k
        for (int i = 0; i < target_size; i++) {
573
308k
            submit_status = thread_pool->submit_func([&, i]() {
574
308k
                SCOPED_ATTACH_TASK(_query_ctx.get());
575
308k
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
576
308k
                latch.count_down();
577
308k
            });
578
308k
            if (LIKELY(submit_status.ok())) {
579
308k
                submitted_tasks++;
580
18.4E
            } else {
581
18.4E
                break;
582
18.4E
            }
583
308k
        }
584
24.4k
        latch.arrive_and_wait(target_size - submitted_tasks);
585
24.4k
        if (UNLIKELY(!submit_status.ok())) {
586
0
            return submit_status;
587
0
        }
588
333k
        for (int i = 0; i < submitted_tasks; i++) {
589
308k
            if (!prepare_status[i].ok()) {
590
0
                return prepare_status[i];
591
0
            }
592
308k
        }
593
406k
    } else {
594
1.35M
        for (int i = 0; i < target_size; i++) {
595
953k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
596
953k
        }
597
406k
    }
598
430k
    _pipeline_parent_map.clear();
599
430k
    _op_id_to_shared_state.clear();
600
601
430k
    return Status::OK();
602
430k
}
603
604
429k
void PipelineFragmentContext::_init_next_report_time() {
605
429k
    auto interval_s = config::pipeline_status_report_interval;
606
429k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
607
41.4k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
608
41.4k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
609
        // We don't want to wait longer than it takes to run the entire fragment.
610
41.4k
        _previous_report_time =
611
41.4k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
612
41.4k
        _disable_period_report = false;
613
41.4k
    }
614
429k
}
615
616
4.90k
void PipelineFragmentContext::refresh_next_report_time() {
617
4.90k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
618
4.90k
    DCHECK(disable == true);
619
4.90k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
620
4.90k
    _disable_period_report.compare_exchange_strong(disable, false);
621
4.90k
}
622
623
7.30M
void PipelineFragmentContext::trigger_report_if_necessary() {
624
7.30M
    if (!_is_report_success) {
625
6.79M
        return;
626
6.79M
    }
627
501k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
628
501k
    if (disable) {
629
8.66k
        return;
630
8.66k
    }
631
493k
    int32_t interval_s = config::pipeline_status_report_interval;
632
493k
    if (interval_s <= 0) {
633
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
634
0
                        "trigger "
635
0
                        "report.";
636
0
    }
637
493k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
638
493k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
639
493k
    if (MonotonicNanos() > next_report_time) {
640
4.90k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
641
4.90k
                                                            std::memory_order_acq_rel)) {
642
7
            return;
643
7
        }
644
4.89k
        if (VLOG_FILE_IS_ON) {
645
0
            VLOG_FILE << "Reporting "
646
0
                      << "profile for query_id " << print_id(_query_id)
647
0
                      << ", fragment id: " << _fragment_id;
648
649
0
            std::stringstream ss;
650
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
651
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
652
0
            if (_runtime_state->load_channel_profile()) {
653
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
654
0
            }
655
656
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
657
0
                      << " profile:\n"
658
0
                      << ss.str();
659
0
        }
660
4.89k
        auto st = send_report(false);
661
4.89k
        if (!st.ok()) {
662
0
            disable = true;
663
0
            _disable_period_report.compare_exchange_strong(disable, false,
664
0
                                                           std::memory_order_acq_rel);
665
0
        }
666
4.89k
    }
667
493k
}
668
669
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
670
429k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
671
429k
    if (_params.fragment.plan.nodes.empty()) {
672
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
673
0
    }
674
675
429k
    int node_idx = 0;
676
677
429k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
678
429k
                                        &node_idx, root, cur_pipe, 0, false, false));
679
680
429k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
681
0
        return Status::InternalError(
682
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
683
0
    }
684
429k
    return Status::OK();
685
429k
}
686
687
Status PipelineFragmentContext::_create_tree_helper(
688
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
689
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
690
671k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
691
    // propagate error case
692
671k
    if (*node_idx >= tnodes.size()) {
693
0
        return Status::InternalError(
694
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
695
0
                *node_idx, tnodes.size());
696
0
    }
697
671k
    const TPlanNode& tnode = tnodes[*node_idx];
698
699
671k
    int num_children = tnodes[*node_idx].num_children;
700
671k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
701
671k
    bool current_require_bucket_distribution = require_bucket_distribution;
702
    // TODO: Create CacheOperator is confused now
703
671k
    OperatorPtr op = nullptr;
704
671k
    OperatorPtr cache_op = nullptr;
705
671k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
706
671k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
707
671k
                                     followed_by_shuffled_operator,
708
671k
                                     current_require_bucket_distribution, cache_op));
709
    // Initialization must be done here. For example, group by expressions in agg will be used to
710
    // decide if a local shuffle should be planed, so it must be initialized here.
711
671k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
712
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
713
671k
    if (parent != nullptr) {
714
        // add to parent's child(s)
715
241k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
716
429k
    } else {
717
429k
        *root = op;
718
429k
    }
719
    /**
720
     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
721
     *
722
     * For plan:
723
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
724
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
725
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
726
     *
727
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
728
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
729
     */
730
671k
    auto required_data_distribution =
731
671k
            cur_pipe->operators().empty()
732
671k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
733
671k
                    : op->required_data_distribution(_runtime_state.get());
734
671k
    current_followed_by_shuffled_operator =
735
671k
            ((followed_by_shuffled_operator ||
736
671k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
737
611k
                                             : op->is_shuffled_operator())) &&
738
671k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
739
671k
            (followed_by_shuffled_operator &&
740
558k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
741
742
671k
    current_require_bucket_distribution =
743
671k
            ((require_bucket_distribution ||
744
671k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
745
616k
                                             : op->is_colocated_operator())) &&
746
671k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
747
671k
            (require_bucket_distribution &&
748
564k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
749
750
671k
    if (num_children == 0) {
751
445k
        _use_serial_source = op->is_serial_operator();
752
445k
    }
753
    // rely on that tnodes is preorder of the plan
754
913k
    for (int i = 0; i < num_children; i++) {
755
242k
        ++*node_idx;
756
242k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
757
242k
                                            current_followed_by_shuffled_operator,
758
242k
                                            current_require_bucket_distribution));
759
760
        // we are expecting a child, but have used all nodes
761
        // this means we have been given a bad tree and must fail
762
242k
        if (*node_idx >= tnodes.size()) {
763
0
            return Status::InternalError(
764
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
765
0
                    "nodes: {}",
766
0
                    *node_idx, tnodes.size());
767
0
        }
768
242k
    }
769
770
671k
    return Status::OK();
771
671k
}
772
773
void PipelineFragmentContext::_inherit_pipeline_properties(
774
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
775
111k
        PipelinePtr pipe_with_sink) {
776
111k
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
777
111k
    pipe_with_source->set_num_tasks(_num_instances);
778
111k
    pipe_with_source->set_data_distribution(data_distribution);
779
111k
}
780
781
Status PipelineFragmentContext::_add_local_exchange_impl(
782
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
783
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
784
        const std::map<int, int>& bucket_seq_to_instance_idx,
785
111k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
786
111k
    auto& operators = cur_pipe->operators();
787
111k
    const auto downstream_pipeline_id = cur_pipe->id();
788
111k
    auto local_exchange_id = next_operator_id();
789
    // 1. Create a new pipeline with local exchange sink.
790
111k
    DataSinkOperatorPtr sink;
791
111k
    auto sink_id = next_sink_operator_id();
792
793
    /**
794
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
795
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
796
     */
797
111k
    const bool followed_by_shuffled_operator =
798
111k
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
799
111k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
800
111k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
801
111k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
802
111k
                                         followed_by_shuffled_operator && !_use_serial_source;
803
111k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
804
111k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
805
111k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
806
111k
    if (bucket_seq_to_instance_idx.empty() &&
807
111k
        data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
808
5
        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
809
5
    }
810
111k
    RETURN_IF_ERROR(new_pip->set_sink(sink));
811
111k
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
812
111k
                                          num_buckets, use_global_hash_shuffle,
813
111k
                                          shuffle_idx_to_instance_idx));
814
815
    // 2. Create and initialize LocalExchangeSharedState.
816
111k
    std::shared_ptr<LocalExchangeSharedState> shared_state =
817
111k
            LocalExchangeSharedState::create_shared(_num_instances);
818
111k
    switch (data_distribution.distribution_type) {
819
13.8k
    case ExchangeType::HASH_SHUFFLE:
820
13.8k
        shared_state->exchanger = ShuffleExchanger::create_unique(
821
13.8k
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
822
13.8k
                use_global_hash_shuffle ? _total_instances : _num_instances,
823
13.8k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
824
13.8k
                        ? cast_set<int>(
825
13.8k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
826
13.8k
                        : 0);
827
13.8k
        break;
828
526
    case ExchangeType::BUCKET_HASH_SHUFFLE:
829
526
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
830
526
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
831
526
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
832
526
                        ? cast_set<int>(
833
526
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
834
526
                        : 0);
835
526
        break;
836
93.1k
    case ExchangeType::PASSTHROUGH:
837
93.1k
        shared_state->exchanger = PassthroughExchanger::create_unique(
838
93.1k
                cur_pipe->num_tasks(), _num_instances,
839
93.1k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
840
93.1k
                        ? cast_set<int>(
841
93.0k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
842
93.1k
                        : 0);
843
93.1k
        break;
844
370
    case ExchangeType::BROADCAST:
845
370
        shared_state->exchanger = BroadcastExchanger::create_unique(
846
370
                cur_pipe->num_tasks(), _num_instances,
847
370
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
848
370
                        ? cast_set<int>(
849
370
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
850
370
                        : 0);
851
370
        break;
852
2.55k
    case ExchangeType::PASS_TO_ONE:
853
2.55k
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
854
            // If shared hash table is enabled for BJ, hash table will be built by only one task
855
1.69k
            shared_state->exchanger = PassToOneExchanger::create_unique(
856
1.69k
                    cur_pipe->num_tasks(), _num_instances,
857
1.69k
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
858
1.69k
                            ? cast_set<int>(_runtime_state->query_options()
859
1.69k
                                                    .local_exchange_free_blocks_limit)
860
1.69k
                            : 0);
861
1.69k
        } else {
862
864
            shared_state->exchanger = BroadcastExchanger::create_unique(
863
864
                    cur_pipe->num_tasks(), _num_instances,
864
864
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
865
864
                            ? cast_set<int>(_runtime_state->query_options()
866
863
                                                    .local_exchange_free_blocks_limit)
867
864
                            : 0);
868
864
        }
869
2.55k
        break;
870
785
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
871
785
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
872
785
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
873
785
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
874
785
                        ? cast_set<int>(
875
785
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
876
785
                        : 0);
877
785
        break;
878
0
    default:
879
0
        return Status::InternalError("Unsupported local exchange type : " +
880
0
                                     std::to_string((int)data_distribution.distribution_type));
881
111k
    }
882
111k
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
883
111k
                                             "LOCAL_EXCHANGE_OPERATOR");
884
111k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
885
111k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
886
887
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
888
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
889
890
    // 3.1 Initialize new pipeline's operator list.
891
111k
    std::copy(operators.begin(), operators.begin() + idx,
892
111k
              std::inserter(new_pip->operators(), new_pip->operators().end()));
893
894
    // 3.2 Erase unused operators in previous pipeline.
895
111k
    operators.erase(operators.begin(), operators.begin() + idx);
896
897
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
898
111k
    OperatorPtr source_op;
899
111k
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
900
111k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
901
111k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
902
111k
    if (!operators.empty()) {
903
43.6k
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
904
43.6k
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
905
43.6k
    }
906
111k
    operators.insert(operators.begin(), source_op);
907
908
    // 5. Set children for two pipelines separately.
909
111k
    std::vector<std::shared_ptr<Pipeline>> new_children;
910
111k
    std::vector<PipelineId> edges_with_source;
911
129k
    for (auto child : cur_pipe->children()) {
912
129k
        bool found = false;
913
143k
        for (auto op : new_pip->operators()) {
914
143k
            if (child->sink()->node_id() == op->node_id()) {
915
12.9k
                new_pip->set_children(child);
916
12.9k
                found = true;
917
12.9k
            };
918
143k
        }
919
129k
        if (!found) {
920
116k
            new_children.push_back(child);
921
116k
            edges_with_source.push_back(child->id());
922
116k
        }
923
129k
    }
924
111k
    new_children.push_back(new_pip);
925
111k
    edges_with_source.push_back(new_pip->id());
926
927
    // 6. Set DAG for new pipelines.
928
111k
    if (!new_pip->children().empty()) {
929
7.36k
        std::vector<PipelineId> edges_with_sink;
930
12.9k
        for (auto child : new_pip->children()) {
931
12.9k
            edges_with_sink.push_back(child->id());
932
12.9k
        }
933
7.36k
        _dag.insert({new_pip->id(), edges_with_sink});
934
7.36k
    }
935
111k
    cur_pipe->set_children(new_children);
936
111k
    _dag[downstream_pipeline_id] = edges_with_source;
937
111k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
938
111k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
939
111k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
940
941
    // 7. Inherit properties from current pipeline.
942
111k
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
943
111k
    return Status::OK();
944
111k
}
945
946
Status PipelineFragmentContext::_add_local_exchange(
947
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
948
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
949
        const std::map<int, int>& bucket_seq_to_instance_idx,
950
194k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
951
194k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
952
50.5k
        return Status::OK();
953
50.5k
    }
954
955
144k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
956
46.9k
        return Status::OK();
957
46.9k
    }
958
97.1k
    *do_local_exchange = true;
959
960
97.1k
    auto& operators = cur_pipe->operators();
961
97.1k
    auto total_op_num = operators.size();
962
97.1k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
963
97.1k
    RETURN_IF_ERROR(_add_local_exchange_impl(
964
97.1k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
965
97.1k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
966
967
18.4E
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
968
18.4E
            << "total_op_num: " << total_op_num
969
18.4E
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
970
18.4E
            << " new_pip->operators().size(): " << new_pip->operators().size();
971
972
    // There are some local shuffles with relatively heavy operations on the sink.
973
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
974
    // Therefore, local passthrough is used to increase the concurrency of the sink.
975
    // op -> local sink(1) -> local source (n)
976
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
977
97.3k
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
978
97.1k
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
979
13.9k
        RETURN_IF_ERROR(_add_local_exchange_impl(
980
13.9k
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
981
13.9k
                add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
982
13.9k
                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
983
13.9k
                shuffle_idx_to_instance_idx));
984
13.9k
    }
985
97.1k
    return Status::OK();
986
97.1k
}
987
988
Status PipelineFragmentContext::_plan_local_exchange(
989
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
990
428k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
991
991k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
992
563k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
993
        // Set property if child pipeline is not join operator's child.
994
563k
        if (!_pipelines[pip_idx]->children().empty()) {
995
128k
            for (auto& child : _pipelines[pip_idx]->children()) {
996
128k
                if (child->sink()->node_id() ==
997
128k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
998
114k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
999
114k
                }
1000
128k
            }
1001
123k
        }
1002
1003
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1004
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1005
        // still keep colocate plan after local shuffle
1006
563k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1007
563k
                                             bucket_seq_to_instance_idx,
1008
563k
                                             shuffle_idx_to_instance_idx));
1009
563k
    }
1010
428k
    return Status::OK();
1011
428k
}
1012
1013
Status PipelineFragmentContext::_plan_local_exchange(
1014
        int num_buckets, int pip_idx, PipelinePtr pip,
1015
        const std::map<int, int>& bucket_seq_to_instance_idx,
1016
562k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1017
562k
    int idx = 1;
1018
562k
    bool do_local_exchange = false;
1019
606k
    do {
1020
606k
        auto& ops = pip->operators();
1021
606k
        do_local_exchange = false;
1022
        // Plan local exchange for each operator.
1023
677k
        for (; idx < ops.size();) {
1024
114k
            if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1025
106k
                RETURN_IF_ERROR(_add_local_exchange(
1026
106k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
1027
106k
                        ops[idx]->required_data_distribution(_runtime_state.get()),
1028
106k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1029
106k
                        shuffle_idx_to_instance_idx));
1030
106k
            }
1031
114k
            if (do_local_exchange) {
1032
                // If local exchange is needed for current operator, we will split this pipeline to
1033
                // two pipelines by local exchange sink/source. And then we need to process remaining
1034
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1035
                // is current operator was already processed) and continue to plan local exchange.
1036
43.6k
                idx = 2;
1037
43.6k
                break;
1038
43.6k
            }
1039
70.8k
            idx++;
1040
70.8k
        }
1041
606k
    } while (do_local_exchange);
1042
562k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1043
88.1k
        RETURN_IF_ERROR(_add_local_exchange(
1044
88.1k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1045
88.1k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1046
88.1k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1047
88.1k
    }
1048
562k
    return Status::OK();
1049
562k
}
1050
1051
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1052
                                                  const std::vector<TExpr>& output_exprs,
1053
                                                  const TPipelineFragmentParams& params,
1054
                                                  const RowDescriptor& row_desc,
1055
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1056
430k
                                                  PipelineId cur_pipeline_id) {
1057
430k
    switch (thrift_sink.type) {
1058
146k
    case TDataSinkType::DATA_STREAM_SINK: {
1059
146k
        if (!thrift_sink.__isset.stream_sink) {
1060
0
            return Status::InternalError("Missing data stream sink.");
1061
0
        }
1062
146k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1063
146k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1064
146k
                params.destinations, _fragment_instance_ids);
1065
146k
        break;
1066
146k
    }
1067
247k
    case TDataSinkType::RESULT_SINK: {
1068
247k
        if (!thrift_sink.__isset.result_sink) {
1069
0
            return Status::InternalError("Missing data buffer sink.");
1070
0
        }
1071
1072
247k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), row_desc,
1073
247k
                                                      output_exprs, thrift_sink.result_sink);
1074
247k
        break;
1075
247k
    }
1076
105
    case TDataSinkType::DICTIONARY_SINK: {
1077
105
        if (!thrift_sink.__isset.dictionary_sink) {
1078
0
            return Status::InternalError("Missing dict sink.");
1079
0
        }
1080
1081
105
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1082
105
                                                    thrift_sink.dictionary_sink);
1083
105
        break;
1084
105
    }
1085
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1086
30.5k
    case TDataSinkType::OLAP_TABLE_SINK: {
1087
30.5k
        if (state->query_options().enable_memtable_on_sink_node &&
1088
30.5k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1089
30.5k
            !config::is_cloud_mode()) {
1090
2.15k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(pool, next_sink_operator_id(),
1091
2.15k
                                                               row_desc, output_exprs);
1092
28.3k
        } else {
1093
28.3k
            _sink = std::make_shared<OlapTableSinkOperatorX>(pool, next_sink_operator_id(),
1094
28.3k
                                                             row_desc, output_exprs);
1095
28.3k
        }
1096
30.5k
        break;
1097
0
    }
1098
165
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1099
165
        DCHECK(thrift_sink.__isset.olap_table_sink);
1100
165
        DCHECK(state->get_query_ctx() != nullptr);
1101
165
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1102
165
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1103
165
                                                                output_exprs);
1104
165
        break;
1105
0
    }
1106
1.46k
    case TDataSinkType::HIVE_TABLE_SINK: {
1107
1.46k
        if (!thrift_sink.__isset.hive_table_sink) {
1108
0
            return Status::InternalError("Missing hive table sink.");
1109
0
        }
1110
1.46k
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1111
1.46k
                                                         output_exprs);
1112
1.46k
        break;
1113
1.46k
    }
1114
1.73k
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1115
1.73k
        if (!thrift_sink.__isset.iceberg_table_sink) {
1116
0
            return Status::InternalError("Missing iceberg table sink.");
1117
0
        }
1118
1.73k
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1119
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1120
0
                                                                     row_desc, output_exprs);
1121
1.73k
        } else {
1122
1.73k
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1123
1.73k
                                                                row_desc, output_exprs);
1124
1.73k
        }
1125
1.73k
        break;
1126
1.73k
    }
1127
20
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1128
20
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1129
0
            return Status::InternalError("Missing iceberg delete sink.");
1130
0
        }
1131
20
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1132
20
                                                             row_desc, output_exprs);
1133
20
        break;
1134
20
    }
1135
80
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1136
80
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1137
0
            return Status::InternalError("Missing iceberg merge sink.");
1138
0
        }
1139
80
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1140
80
                                                            output_exprs);
1141
80
        break;
1142
80
    }
1143
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1144
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1145
0
            return Status::InternalError("Missing max compute table sink.");
1146
0
        }
1147
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1148
0
                                                       output_exprs);
1149
0
        break;
1150
0
    }
1151
80
    case TDataSinkType::JDBC_TABLE_SINK: {
1152
80
        if (!thrift_sink.__isset.jdbc_table_sink) {
1153
0
            return Status::InternalError("Missing data jdbc sink.");
1154
0
        }
1155
80
        if (config::enable_java_support) {
1156
80
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1157
80
                                                             output_exprs);
1158
80
        } else {
1159
0
            return Status::InternalError(
1160
0
                    "Jdbc table sink is not enabled, you can change be config "
1161
0
                    "enable_java_support to true and restart be.");
1162
0
        }
1163
80
        break;
1164
80
    }
1165
80
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1166
3
        if (!thrift_sink.__isset.memory_scratch_sink) {
1167
0
            return Status::InternalError("Missing data buffer sink.");
1168
0
        }
1169
1170
3
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1171
3
                                                             output_exprs);
1172
3
        break;
1173
3
    }
1174
501
    case TDataSinkType::RESULT_FILE_SINK: {
1175
501
        if (!thrift_sink.__isset.result_file_sink) {
1176
0
            return Status::InternalError("Missing result file sink.");
1177
0
        }
1178
1179
        // Result file sink is not the top sink
1180
502
        if (params.__isset.destinations && !params.destinations.empty()) {
1181
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1182
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1183
0
                    params.destinations, output_exprs, desc_tbl);
1184
501
        } else {
1185
501
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1186
501
                                                              output_exprs);
1187
501
        }
1188
501
        break;
1189
501
    }
1190
2.32k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1191
2.32k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1192
2.32k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1193
2.32k
        auto sink_id = next_sink_operator_id();
1194
2.32k
        const int multi_cast_node_id = sink_id;
1195
2.32k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1196
        // one sink has multiple sources.
1197
2.32k
        std::vector<int> sources;
1198
9.09k
        for (int i = 0; i < sender_size; ++i) {
1199
6.76k
            auto source_id = next_operator_id();
1200
6.76k
            sources.push_back(source_id);
1201
6.76k
        }
1202
1203
2.32k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1204
2.32k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1205
9.09k
        for (int i = 0; i < sender_size; ++i) {
1206
6.76k
            auto new_pipeline = add_pipeline();
1207
            // use to exchange sink
1208
6.76k
            RowDescriptor* exchange_row_desc = nullptr;
1209
6.76k
            {
1210
6.76k
                const auto& tmp_row_desc =
1211
6.76k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1212
6.76k
                                ? RowDescriptor(state->desc_tbl(),
1213
6.76k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1214
6.76k
                                                         .output_tuple_id})
1215
6.76k
                                : row_desc;
1216
6.76k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1217
6.76k
            }
1218
6.76k
            auto source_id = sources[i];
1219
6.76k
            OperatorPtr source_op;
1220
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1221
6.76k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1222
6.76k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1223
6.76k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1224
6.76k
                    /*operator_id=*/source_id);
1225
6.76k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1226
6.76k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1227
            // 2. create and set sink operator of data stream sender for new pipeline
1228
1229
6.76k
            DataSinkOperatorPtr sink_op;
1230
6.76k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1231
6.76k
                    state, *exchange_row_desc, next_sink_operator_id(),
1232
6.76k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1233
6.76k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1234
1235
6.76k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1236
6.76k
            {
1237
6.76k
                TDataSink* t = pool->add(new TDataSink());
1238
6.76k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1239
6.76k
                RETURN_IF_ERROR(sink_op->init(*t));
1240
6.76k
            }
1241
1242
            // 3. set dependency dag
1243
6.76k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1244
6.76k
        }
1245
2.32k
        if (sources.empty()) {
1246
0
            return Status::InternalError("size of sources must be greater than 0");
1247
0
        }
1248
2.32k
        break;
1249
2.32k
    }
1250
2.32k
    case TDataSinkType::BLACKHOLE_SINK: {
1251
13
        if (!thrift_sink.__isset.blackhole_sink) {
1252
0
            return Status::InternalError("Missing blackhole sink.");
1253
0
        }
1254
1255
13
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1256
13
        break;
1257
13
    }
1258
156
    case TDataSinkType::TVF_TABLE_SINK: {
1259
156
        if (!thrift_sink.__isset.tvf_table_sink) {
1260
0
            return Status::InternalError("Missing TVF table sink.");
1261
0
        }
1262
156
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1263
156
                                                        output_exprs);
1264
156
        break;
1265
156
    }
1266
0
    default:
1267
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1268
430k
    }
1269
430k
    return Status::OK();
1270
430k
}
1271
1272
// NOLINTBEGIN(readability-function-size)
1273
// NOLINTBEGIN(readability-function-cognitive-complexity)
1274
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1275
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1276
                                                 PipelinePtr& cur_pipe, int parent_idx,
1277
                                                 int child_idx,
1278
                                                 const bool followed_by_shuffled_operator,
1279
                                                 const bool require_bucket_distribution,
1280
673k
                                                 OperatorPtr& cache_op) {
1281
673k
    std::vector<DataSinkOperatorPtr> sink_ops;
1282
673k
    Defer defer = Defer([&]() {
1283
672k
        if (op) {
1284
672k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1285
672k
        }
1286
672k
        for (auto& s : sink_ops) {
1287
127k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1288
127k
        }
1289
672k
    });
1290
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1291
    // Therefore, here we need to use a stack-like structure.
1292
673k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1293
673k
    std::stringstream error_msg;
1294
673k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1295
1296
673k
    bool fe_with_old_version = false;
1297
673k
    switch (tnode.node_type) {
1298
205k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1299
205k
        op = std::make_shared<OlapScanOperatorX>(
1300
205k
                pool, tnode, next_operator_id(), descs, _num_instances,
1301
205k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1302
205k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1303
205k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1304
205k
        break;
1305
205k
    }
1306
76
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1307
76
        DCHECK(_query_ctx != nullptr);
1308
76
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1309
76
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1310
76
                                                    _num_instances);
1311
76
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1312
76
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1313
76
        break;
1314
76
    }
1315
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1316
0
        if (config::enable_java_support) {
1317
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1318
0
                                                     _num_instances);
1319
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1320
0
        } else {
1321
0
            return Status::InternalError(
1322
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1323
0
                    "to true and restart be.");
1324
0
        }
1325
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1326
0
        break;
1327
0
    }
1328
22.2k
    case TPlanNodeType::FILE_SCAN_NODE: {
1329
22.2k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1330
22.2k
                                                 _num_instances);
1331
22.2k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1332
22.2k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1333
22.2k
        break;
1334
22.2k
    }
1335
0
    case TPlanNodeType::ES_SCAN_NODE:
1336
592
    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
1337
592
        op = std::make_shared<EsScanOperatorX>(pool, tnode, next_operator_id(), descs,
1338
592
                                               _num_instances);
1339
592
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1340
592
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1341
592
        break;
1342
592
    }
1343
151k
    case TPlanNodeType::EXCHANGE_NODE: {
1344
151k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1345
151k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1346
18.4E
                                  : 0;
1347
151k
        DCHECK_GT(num_senders, 0);
1348
151k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1349
151k
                                                       num_senders);
1350
151k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1351
151k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1352
151k
        break;
1353
151k
    }
1354
160k
    case TPlanNodeType::AGGREGATION_NODE: {
1355
160k
        if (tnode.agg_node.grouping_exprs.empty() &&
1356
160k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1357
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1358
0
                                         ": group by and output is empty");
1359
0
        }
1360
160k
        bool need_create_cache_op =
1361
160k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1362
160k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1363
24
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1364
24
            auto cache_source_id = next_operator_id();
1365
24
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1366
24
                                                        _params.fragment.query_cache_param);
1367
24
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1368
1369
24
            const auto downstream_pipeline_id = cur_pipe->id();
1370
24
            if (!_dag.contains(downstream_pipeline_id)) {
1371
24
                _dag.insert({downstream_pipeline_id, {}});
1372
24
            }
1373
24
            new_pipe = add_pipeline(cur_pipe);
1374
24
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1375
1376
24
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1377
24
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1378
24
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1379
24
            return Status::OK();
1380
24
        };
1381
160k
        const bool group_by_limit_opt =
1382
160k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1383
1384
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1385
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1386
160k
        const bool enable_spill = _runtime_state->enable_spill() &&
1387
160k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1388
160k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1389
160k
                                      tnode.agg_node.use_streaming_preaggregation &&
1390
160k
                                      !tnode.agg_node.grouping_exprs.empty();
1391
        // TODO: distinct streaming agg does not support spill.
1392
160k
        const bool can_use_distinct_streaming_agg =
1393
160k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1394
160k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1395
160k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1396
160k
                _params.query_options.enable_distinct_streaming_aggregation;
1397
1398
160k
        if (can_use_distinct_streaming_agg) {
1399
92.7k
            if (need_create_cache_op) {
1400
8
                PipelinePtr new_pipe;
1401
8
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1402
1403
8
                cache_op = op;
1404
8
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1405
8
                                                                     tnode, descs);
1406
8
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1407
8
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1408
8
                cur_pipe = new_pipe;
1409
92.7k
            } else {
1410
92.7k
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1411
92.7k
                                                                     tnode, descs);
1412
92.7k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1413
92.7k
            }
1414
92.7k
        } else if (is_streaming_agg) {
1415
3.30k
            if (need_create_cache_op) {
1416
4
                PipelinePtr new_pipe;
1417
4
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1418
4
                cache_op = op;
1419
4
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1420
4
                                                             descs);
1421
4
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1422
4
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1423
4
                cur_pipe = new_pipe;
1424
3.30k
            } else {
1425
3.30k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1426
3.30k
                                                             descs);
1427
3.30k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1428
3.30k
            }
1429
64.6k
        } else {
1430
            // create new pipeline to add query cache operator
1431
64.6k
            PipelinePtr new_pipe;
1432
64.6k
            if (need_create_cache_op) {
1433
12
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1434
12
                cache_op = op;
1435
12
            }
1436
1437
64.6k
            if (enable_spill) {
1438
629
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1439
629
                                                                     next_operator_id(), descs);
1440
64.0k
            } else {
1441
64.0k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1442
64.0k
            }
1443
64.6k
            if (need_create_cache_op) {
1444
12
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1445
12
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1446
12
                cur_pipe = new_pipe;
1447
64.6k
            } else {
1448
64.6k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1449
64.6k
            }
1450
1451
64.6k
            const auto downstream_pipeline_id = cur_pipe->id();
1452
64.6k
            if (!_dag.contains(downstream_pipeline_id)) {
1453
62.0k
                _dag.insert({downstream_pipeline_id, {}});
1454
62.0k
            }
1455
64.6k
            cur_pipe = add_pipeline(cur_pipe);
1456
64.6k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1457
1458
64.6k
            if (enable_spill) {
1459
629
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1460
629
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1461
64.0k
            } else {
1462
64.0k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1463
64.0k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1464
64.0k
            }
1465
64.6k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1466
64.6k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1467
64.6k
        }
1468
160k
        break;
1469
160k
    }
1470
160k
    case TPlanNodeType::HASH_JOIN_NODE: {
1471
7.90k
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1472
7.90k
                                       tnode.hash_join_node.is_broadcast_join;
1473
7.90k
        const auto enable_spill = _runtime_state->enable_spill();
1474
7.90k
        if (enable_spill && !is_broadcast_join) {
1475
0
            auto tnode_ = tnode;
1476
0
            tnode_.runtime_filters.clear();
1477
0
            auto inner_probe_operator =
1478
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1479
1480
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1481
            // So here use `tnode_` which has no runtime filters.
1482
0
            auto probe_side_inner_sink_operator =
1483
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1484
1485
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1486
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1487
1488
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1489
0
                    pool, tnode_, next_operator_id(), descs);
1490
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1491
0
                                                inner_probe_operator);
1492
0
            op = std::move(probe_operator);
1493
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1494
1495
0
            const auto downstream_pipeline_id = cur_pipe->id();
1496
0
            if (!_dag.contains(downstream_pipeline_id)) {
1497
0
                _dag.insert({downstream_pipeline_id, {}});
1498
0
            }
1499
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1500
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1501
1502
0
            auto inner_sink_operator =
1503
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1504
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1505
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1506
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1507
1508
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1509
0
            sink_ops.push_back(std::move(sink_operator));
1510
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1511
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1512
1513
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1514
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1515
7.90k
        } else {
1516
7.90k
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1517
7.90k
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1518
1519
7.90k
            const auto downstream_pipeline_id = cur_pipe->id();
1520
7.90k
            if (!_dag.contains(downstream_pipeline_id)) {
1521
7.09k
                _dag.insert({downstream_pipeline_id, {}});
1522
7.09k
            }
1523
7.90k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1524
7.90k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1525
1526
7.90k
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1527
7.90k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1528
7.90k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1529
7.90k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1530
1531
7.90k
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1532
7.90k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1533
7.90k
        }
1534
7.90k
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1535
2.85k
            std::shared_ptr<HashJoinSharedState> shared_state =
1536
2.85k
                    HashJoinSharedState::create_shared(_num_instances);
1537
21.3k
            for (int i = 0; i < _num_instances; i++) {
1538
18.5k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1539
18.5k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1540
18.5k
                sink_dep->set_shared_state(shared_state.get());
1541
18.5k
                shared_state->sink_deps.push_back(sink_dep);
1542
18.5k
            }
1543
2.85k
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1544
2.85k
                                                     op->node_id(), "HASH_JOIN_PROBE");
1545
2.85k
            _op_id_to_shared_state.insert(
1546
2.85k
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1547
2.85k
        }
1548
7.90k
        break;
1549
7.90k
    }
1550
5.36k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1551
5.36k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1552
5.36k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1553
1554
5.36k
        const auto downstream_pipeline_id = cur_pipe->id();
1555
5.36k
        if (!_dag.contains(downstream_pipeline_id)) {
1556
5.11k
            _dag.insert({downstream_pipeline_id, {}});
1557
5.11k
        }
1558
5.36k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1559
5.36k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1560
1561
5.36k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1562
5.36k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1563
5.36k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1564
5.36k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1565
5.36k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1566
5.36k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1567
5.36k
        break;
1568
5.36k
    }
1569
53.1k
    case TPlanNodeType::UNION_NODE: {
1570
53.1k
        int child_count = tnode.num_children;
1571
53.1k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1572
53.1k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1573
1574
53.1k
        const auto downstream_pipeline_id = cur_pipe->id();
1575
53.1k
        if (!_dag.contains(downstream_pipeline_id)) {
1576
52.8k
            _dag.insert({downstream_pipeline_id, {}});
1577
52.8k
        }
1578
54.9k
        for (int i = 0; i < child_count; i++) {
1579
1.81k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1580
1.81k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1581
1.81k
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1582
1.81k
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1583
1.81k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1584
1.81k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1585
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1586
1.81k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1587
1.81k
        }
1588
53.1k
        break;
1589
53.1k
    }
1590
53.1k
    case TPlanNodeType::SORT_NODE: {
1591
45.8k
        const auto should_spill = _runtime_state->enable_spill() &&
1592
45.8k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1593
45.8k
        const bool use_local_merge =
1594
45.8k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1595
45.8k
        if (should_spill) {
1596
9
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1597
45.8k
        } else if (use_local_merge) {
1598
43.0k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1599
43.0k
                                                                 descs);
1600
43.0k
        } else {
1601
2.74k
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1602
2.74k
        }
1603
45.8k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1604
1605
45.8k
        const auto downstream_pipeline_id = cur_pipe->id();
1606
45.8k
        if (!_dag.contains(downstream_pipeline_id)) {
1607
45.7k
            _dag.insert({downstream_pipeline_id, {}});
1608
45.7k
        }
1609
45.8k
        cur_pipe = add_pipeline(cur_pipe);
1610
45.8k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1611
1612
45.8k
        if (should_spill) {
1613
9
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1614
9
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1615
45.8k
        } else {
1616
45.8k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1617
45.8k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1618
45.8k
        }
1619
45.8k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1620
45.8k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1621
45.8k
        break;
1622
45.8k
    }
1623
45.8k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1624
64
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1625
64
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1626
1627
64
        const auto downstream_pipeline_id = cur_pipe->id();
1628
64
        if (!_dag.contains(downstream_pipeline_id)) {
1629
64
            _dag.insert({downstream_pipeline_id, {}});
1630
64
        }
1631
64
        cur_pipe = add_pipeline(cur_pipe);
1632
64
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1633
1634
64
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1635
64
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1636
64
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1637
64
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1638
64
        break;
1639
64
    }
1640
1.81k
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1641
1.81k
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1642
1.81k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1643
1644
1.81k
        const auto downstream_pipeline_id = cur_pipe->id();
1645
1.81k
        if (!_dag.contains(downstream_pipeline_id)) {
1646
1.79k
            _dag.insert({downstream_pipeline_id, {}});
1647
1.79k
        }
1648
1.81k
        cur_pipe = add_pipeline(cur_pipe);
1649
1.81k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1650
1651
1.81k
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1652
1.81k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1653
1.81k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1654
1.81k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1655
1.81k
        break;
1656
1.81k
    }
1657
1.81k
    case TPlanNodeType::MATERIALIZATION_NODE: {
1658
1.61k
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1659
1.61k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1660
1.61k
        break;
1661
1.61k
    }
1662
1.61k
    case TPlanNodeType::INTERSECT_NODE: {
1663
119
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1664
119
                                                                      cur_pipe, sink_ops));
1665
119
        break;
1666
119
    }
1667
129
    case TPlanNodeType::EXCEPT_NODE: {
1668
129
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1669
129
                                                                       cur_pipe, sink_ops));
1670
129
        break;
1671
129
    }
1672
311
    case TPlanNodeType::REPEAT_NODE: {
1673
311
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1674
311
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1675
311
        break;
1676
311
    }
1677
927
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1678
927
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1679
927
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1680
927
        break;
1681
927
    }
1682
927
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1683
218
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1684
218
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1685
218
        break;
1686
218
    }
1687
1.62k
    case TPlanNodeType::EMPTY_SET_NODE: {
1688
1.62k
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1689
1.62k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1690
1.62k
        break;
1691
1.62k
    }
1692
1.62k
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1693
459
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1694
459
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1695
459
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1696
459
        break;
1697
459
    }
1698
2.31k
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1699
2.31k
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1700
2.31k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1701
2.31k
        break;
1702
2.31k
    }
1703
7.26k
    case TPlanNodeType::META_SCAN_NODE: {
1704
7.26k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1705
7.26k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1706
7.26k
        break;
1707
7.26k
    }
1708
7.26k
    case TPlanNodeType::SELECT_NODE: {
1709
2.08k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1710
2.08k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1711
2.08k
        break;
1712
2.08k
    }
1713
2.08k
    case TPlanNodeType::REC_CTE_NODE: {
1714
151
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1715
151
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1716
1717
151
        const auto downstream_pipeline_id = cur_pipe->id();
1718
151
        if (!_dag.contains(downstream_pipeline_id)) {
1719
148
            _dag.insert({downstream_pipeline_id, {}});
1720
148
        }
1721
1722
151
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1723
151
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1724
1725
151
        DataSinkOperatorPtr anchor_sink;
1726
151
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1727
151
                                                                  op->operator_id(), tnode, descs);
1728
151
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1729
151
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1730
151
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1731
1732
151
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1733
151
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1734
1735
151
        DataSinkOperatorPtr rec_sink;
1736
151
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
1737
151
                                                         tnode, descs);
1738
151
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
1739
151
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
1740
151
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
1741
1742
151
        break;
1743
151
    }
1744
1.95k
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
1745
1.95k
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
1746
1.95k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1747
1.95k
        break;
1748
1.95k
    }
1749
1.95k
    default:
1750
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
1751
0
                                     print_plan_node_type(tnode.node_type));
1752
673k
    }
1753
672k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
1754
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
1755
0
        op->set_serial_operator();
1756
0
    }
1757
1758
672k
    return Status::OK();
1759
673k
}
1760
// NOLINTEND(readability-function-cognitive-complexity)
1761
// NOLINTEND(readability-function-size)
1762
1763
template <bool is_intersect>
1764
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
1765
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1766
248
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1767
248
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1768
248
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1769
1770
248
    const auto downstream_pipeline_id = cur_pipe->id();
1771
248
    if (!_dag.contains(downstream_pipeline_id)) {
1772
231
        _dag.insert({downstream_pipeline_id, {}});
1773
231
    }
1774
1775
837
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1776
589
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1777
589
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1778
1779
589
        if (child_id == 0) {
1780
248
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1781
248
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1782
341
        } else {
1783
341
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1784
341
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
341
        }
1786
589
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1787
589
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1788
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1789
589
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1790
589
    }
1791
1792
248
    return Status::OK();
1793
248
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1766
119
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1767
119
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1768
119
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1769
1770
119
    const auto downstream_pipeline_id = cur_pipe->id();
1771
119
    if (!_dag.contains(downstream_pipeline_id)) {
1772
110
        _dag.insert({downstream_pipeline_id, {}});
1773
110
    }
1774
1775
435
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1776
316
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1777
316
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1778
1779
316
        if (child_id == 0) {
1780
119
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1781
119
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1782
197
        } else {
1783
197
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1784
197
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
197
        }
1786
316
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1787
316
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1788
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1789
316
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1790
316
    }
1791
1792
119
    return Status::OK();
1793
119
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1766
129
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1767
129
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1768
129
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1769
1770
129
    const auto downstream_pipeline_id = cur_pipe->id();
1771
129
    if (!_dag.contains(downstream_pipeline_id)) {
1772
121
        _dag.insert({downstream_pipeline_id, {}});
1773
121
    }
1774
1775
402
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1776
273
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1777
273
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1778
1779
273
        if (child_id == 0) {
1780
129
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1781
129
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1782
144
        } else {
1783
144
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1784
144
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
144
        }
1786
273
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1787
273
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1788
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1789
273
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1790
273
    }
1791
1792
129
    return Status::OK();
1793
129
}
1794
1795
428k
Status PipelineFragmentContext::submit() {
1796
428k
    if (_submitted) {
1797
0
        return Status::InternalError("submitted");
1798
0
    }
1799
428k
    _submitted = true;
1800
1801
428k
    int submit_tasks = 0;
1802
428k
    Status st;
1803
428k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
1804
1.26M
    for (auto& task : _tasks) {
1805
2.12M
        for (auto& t : task) {
1806
2.12M
            st = scheduler->submit(t.first);
1807
2.12M
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
1808
2.12M
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
1809
2.12M
            if (!st) {
1810
0
                cancel(Status::InternalError("submit context to executor fail"));
1811
0
                std::lock_guard<std::mutex> l(_task_mutex);
1812
0
                _total_tasks = submit_tasks;
1813
0
                break;
1814
0
            }
1815
2.12M
            submit_tasks++;
1816
2.12M
        }
1817
1.26M
    }
1818
428k
    if (!st.ok()) {
1819
0
        bool need_remove = false;
1820
0
        {
1821
0
            std::lock_guard<std::mutex> l(_task_mutex);
1822
0
            if (_closed_tasks >= _total_tasks) {
1823
0
                need_remove = _close_fragment_instance();
1824
0
            }
1825
0
        }
1826
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1827
0
        if (need_remove) {
1828
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1829
0
        }
1830
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
1831
0
                                     BackendOptions::get_localhost());
1832
428k
    } else {
1833
428k
        return st;
1834
428k
    }
1835
428k
}
1836
1837
12
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
1838
12
    if (_runtime_state->enable_profile()) {
1839
0
        std::stringstream ss;
1840
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1841
0
            runtime_profile_ptr->pretty_print(&ss);
1842
0
        }
1843
1844
0
        if (_runtime_state->load_channel_profile()) {
1845
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1846
0
        }
1847
1848
0
        auto profile_str =
1849
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
1850
0
                            this->_fragment_id, extra_info, ss.str());
1851
0
        LOG_LONG_STRING(INFO, profile_str);
1852
0
    }
1853
12
}
1854
// If all pipeline tasks binded to the fragment instance are finished, then we could
1855
// close the fragment instance.
1856
// Returns true if the caller should call remove_pipeline_context() **after** releasing
1857
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
1858
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
1859
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
1860
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
1861
// (via debug_string()).
1862
430k
bool PipelineFragmentContext::_close_fragment_instance() {
1863
430k
    if (_is_fragment_instance_closed) {
1864
0
        return false;
1865
0
    }
1866
430k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
1867
430k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1868
430k
    if (!_need_notify_close) {
1869
427k
        auto st = send_report(true);
1870
427k
        if (!st) {
1871
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1872
0
                                        print_id(_query_id), _fragment_id, st.to_string());
1873
0
        }
1874
427k
    }
1875
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
1876
    // Since stream load does not have someting like coordinator on FE, so
1877
    // backend can not report profile to FE, ant its profile can not be shown
1878
    // in the same way with other query. So we print the profile content to info log.
1879
1880
430k
    if (_runtime_state->enable_profile() &&
1881
430k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
1882
3.07k
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
1883
3.07k
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
1884
0
        std::stringstream ss;
1885
        // Compute the _local_time_percent before pretty_print the runtime_profile
1886
        // Before add this operation, the print out like that:
1887
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
1888
        // After add the operation, the print out like that:
1889
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
1890
        // We can easily know the exec node execute time without child time consumed.
1891
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1892
0
            runtime_profile_ptr->pretty_print(&ss);
1893
0
        }
1894
1895
0
        if (_runtime_state->load_channel_profile()) {
1896
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1897
0
        }
1898
1899
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
1900
0
    }
1901
1902
430k
    if (_query_ctx->enable_profile()) {
1903
3.07k
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
1904
3.07k
                                         collect_realtime_load_channel_profile());
1905
3.07k
    }
1906
1907
    // Return whether the caller needs to remove from the pipeline map.
1908
    // The caller must do this after releasing _task_mutex.
1909
430k
    return !_need_notify_close;
1910
430k
}
1911
1912
2.11M
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
1913
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
1914
2.11M
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
1915
2.11M
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
1916
677k
        if (_dag.contains(pipeline_id)) {
1917
357k
            for (auto dep : _dag[pipeline_id]) {
1918
357k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
1919
357k
            }
1920
288k
        }
1921
677k
    }
1922
2.11M
    bool need_remove = false;
1923
2.11M
    {
1924
2.11M
        std::lock_guard<std::mutex> l(_task_mutex);
1925
2.11M
        ++_closed_tasks;
1926
2.11M
        if (_closed_tasks >= _total_tasks) {
1927
430k
            need_remove = _close_fragment_instance();
1928
430k
        }
1929
2.11M
    }
1930
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1931
2.11M
    if (need_remove) {
1932
427k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1933
427k
    }
1934
2.11M
}
1935
1936
53.5k
std::string PipelineFragmentContext::get_load_error_url() {
1937
53.5k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
1938
0
        return to_load_error_http_path(str);
1939
0
    }
1940
137k
    for (auto& tasks : _tasks) {
1941
224k
        for (auto& task : tasks) {
1942
224k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
1943
163
                return to_load_error_http_path(str);
1944
163
            }
1945
224k
        }
1946
137k
    }
1947
53.3k
    return "";
1948
53.5k
}
1949
1950
53.5k
std::string PipelineFragmentContext::get_first_error_msg() {
1951
53.5k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
1952
0
        return str;
1953
0
    }
1954
137k
    for (auto& tasks : _tasks) {
1955
224k
        for (auto& task : tasks) {
1956
224k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
1957
163
                return str;
1958
163
            }
1959
224k
        }
1960
137k
    }
1961
53.3k
    return "";
1962
53.5k
}
1963
1964
432k
Status PipelineFragmentContext::send_report(bool done) {
1965
432k
    Status exec_status = _query_ctx->exec_status();
1966
    // If plan is done successfully, but _is_report_success is false,
1967
    // no need to send report.
1968
    // Load will set _is_report_success to true because load wants to know
1969
    // the process.
1970
432k
    if (!_is_report_success && done && exec_status.ok()) {
1971
384k
        return Status::OK();
1972
384k
    }
1973
1974
    // If both _is_report_success and _is_report_on_cancel are false,
1975
    // which means no matter query is success or failed, no report is needed.
1976
    // This may happen when the query limit reached and
1977
    // a internal cancellation being processed
1978
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
1979
    // be set to false, to avoid sending fault report to FE.
1980
48.1k
    if (!_is_report_success && !_is_report_on_cancel) {
1981
399
        if (done) {
1982
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
1983
399
            return Status::OK();
1984
399
        }
1985
0
        return Status::NeedSendAgain("");
1986
399
    }
1987
1988
47.7k
    std::vector<RuntimeState*> runtime_states;
1989
1990
117k
    for (auto& tasks : _tasks) {
1991
172k
        for (auto& task : tasks) {
1992
172k
            runtime_states.push_back(task.second.get());
1993
172k
        }
1994
117k
    }
1995
1996
47.7k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
1997
47.8k
                                        ? get_load_error_url()
1998
18.4E
                                        : _query_ctx->get_load_error_url();
1999
47.7k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2000
47.7k
                                          ? get_first_error_msg()
2001
18.4E
                                          : _query_ctx->get_first_error_msg();
2002
2003
47.7k
    ReportStatusRequest req {.status = exec_status,
2004
47.7k
                             .runtime_states = runtime_states,
2005
47.7k
                             .done = done || !exec_status.ok(),
2006
47.7k
                             .coord_addr = _query_ctx->coord_addr,
2007
47.7k
                             .query_id = _query_id,
2008
47.7k
                             .fragment_id = _fragment_id,
2009
47.7k
                             .fragment_instance_id = TUniqueId(),
2010
47.7k
                             .backend_num = -1,
2011
47.7k
                             .runtime_state = _runtime_state.get(),
2012
47.7k
                             .load_error_url = load_eror_url,
2013
47.7k
                             .first_error_msg = first_error_msg,
2014
47.7k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2015
2016
47.7k
    return _report_status_cb(
2017
47.7k
            req, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
2018
48.1k
}
2019
2020
6
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2021
6
    size_t res = 0;
2022
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2023
    // here to traverse the vector.
2024
6
    for (const auto& task_instances : _tasks) {
2025
18
        for (const auto& task : task_instances) {
2026
18
            if (task.first->is_running()) {
2027
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2028
0
                                      << " is running, task: " << (void*)task.first.get()
2029
0
                                      << ", is_running: " << task.first->is_running();
2030
0
                *has_running_task = true;
2031
0
                return 0;
2032
0
            }
2033
2034
18
            size_t revocable_size = task.first->get_revocable_size();
2035
18
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2036
2
                res += revocable_size;
2037
2
            }
2038
18
        }
2039
6
    }
2040
6
    return res;
2041
6
}
2042
2043
12
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2044
12
    std::vector<PipelineTask*> revocable_tasks;
2045
12
    for (const auto& task_instances : _tasks) {
2046
36
        for (const auto& task : task_instances) {
2047
36
            size_t revocable_size_ = task.first->get_revocable_size();
2048
2049
36
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2050
4
                revocable_tasks.emplace_back(task.first.get());
2051
4
            }
2052
36
        }
2053
12
    }
2054
12
    return revocable_tasks;
2055
12
}
2056
2057
209
std::string PipelineFragmentContext::debug_string() {
2058
209
    std::lock_guard<std::mutex> l(_task_mutex);
2059
209
    fmt::memory_buffer debug_string_buffer;
2060
209
    fmt::format_to(debug_string_buffer,
2061
209
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2062
209
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2063
209
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2064
802
    for (size_t j = 0; j < _tasks.size(); j++) {
2065
593
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2066
1.51k
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2067
919
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2068
919
                           _tasks[j][i].first->debug_string());
2069
919
        }
2070
593
    }
2071
2072
209
    return fmt::to_string(debug_string_buffer);
2073
209
}
2074
2075
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2076
3.07k
PipelineFragmentContext::collect_realtime_profile() const {
2077
3.07k
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2078
2079
    // we do not have mutex to protect pipeline_id_to_profile
2080
    // so we need to make sure this funciton is invoked after fragment context
2081
    // has already been prepared.
2082
3.07k
    if (!_prepared) {
2083
0
        std::string msg =
2084
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2085
0
        DCHECK(false) << msg;
2086
0
        LOG_ERROR(msg);
2087
0
        return res;
2088
0
    }
2089
2090
    // Make sure first profile is fragment level profile
2091
3.07k
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2092
3.07k
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2093
3.07k
    res.push_back(fragment_profile);
2094
2095
    // pipeline_id_to_profile is initialized in prepare stage
2096
5.37k
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2097
5.37k
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2098
5.37k
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2099
5.37k
        res.push_back(profile_ptr);
2100
5.37k
    }
2101
2102
3.07k
    return res;
2103
3.07k
}
2104
2105
std::shared_ptr<TRuntimeProfileTree>
2106
3.07k
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2107
    // we do not have mutex to protect pipeline_id_to_profile
2108
    // so we need to make sure this funciton is invoked after fragment context
2109
    // has already been prepared.
2110
3.07k
    if (!_prepared) {
2111
0
        std::string msg =
2112
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2113
0
        DCHECK(false) << msg;
2114
0
        LOG_ERROR(msg);
2115
0
        return nullptr;
2116
0
    }
2117
2118
7.23k
    for (const auto& tasks : _tasks) {
2119
14.1k
        for (const auto& task : tasks) {
2120
14.1k
            if (task.second->load_channel_profile() == nullptr) {
2121
0
                continue;
2122
0
            }
2123
2124
14.1k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2125
2126
14.1k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2127
14.1k
                                                           _runtime_state->profile_level());
2128
14.1k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2129
14.1k
        }
2130
7.23k
    }
2131
2132
3.07k
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2133
3.07k
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2134
3.07k
                                                      _runtime_state->profile_level());
2135
3.07k
    return load_channel_profile;
2136
3.07k
}
2137
2138
// Collect runtime filter IDs registered by all tasks in this PFC.
2139
// Used during recursive CTE stage transitions to know which filters to deregister
2140
// before creating the new PFC for the next recursion round.
2141
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2142
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2143
// the vector sizes do not change, and individual RuntimeState filter sets are
2144
// written only during open() which has completed by the time we reach rerun.
2145
3.28k
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2146
3.28k
    std::set<int> result;
2147
6.29k
    for (const auto& _task : _tasks) {
2148
10.2k
        for (const auto& task : _task) {
2149
10.2k
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2150
10.2k
            result.merge(set);
2151
10.2k
        }
2152
6.29k
    }
2153
3.28k
    if (_runtime_state) {
2154
3.28k
        auto set = _runtime_state->get_deregister_runtime_filter();
2155
3.28k
        result.merge(set);
2156
3.28k
    }
2157
3.28k
    return result;
2158
3.28k
}
2159
2160
432k
void PipelineFragmentContext::_release_resource() {
2161
432k
    std::lock_guard<std::mutex> l(_task_mutex);
2162
    // The memory released by the query end is recorded in the query mem tracker.
2163
432k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2164
432k
    auto st = _query_ctx->exec_status();
2165
1.26M
    for (auto& _task : _tasks) {
2166
1.26M
        if (!_task.empty()) {
2167
1.26M
            _call_back(_task.front().first->runtime_state(), &st);
2168
1.26M
        }
2169
1.26M
    }
2170
432k
    _tasks.clear();
2171
432k
    _dag.clear();
2172
432k
    _pip_id_to_pipeline.clear();
2173
432k
    _pipelines.clear();
2174
432k
    _sink.reset();
2175
432k
    _root_op.reset();
2176
432k
    _runtime_filter_mgr_map.clear();
2177
432k
    _op_id_to_shared_state.clear();
2178
432k
}
2179
2180
#include "common/compile_check_end.h"
2181
} // namespace doris