Coverage Report

Created: 2026-03-31 10:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <pthread.h>
24
25
#include <algorithm>
26
#include <cstdlib>
27
// IWYU pragma: no_include <bits/chrono.h>
28
#include <fmt/format.h>
29
30
#include <chrono> // IWYU pragma: keep
31
#include <map>
32
#include <memory>
33
#include <ostream>
34
#include <utility>
35
36
#include "cloud/config.h"
37
#include "common/cast_set.h"
38
#include "common/config.h"
39
#include "common/exception.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "exec/exchange/local_exchange_sink_operator.h"
43
#include "exec/exchange/local_exchange_source_operator.h"
44
#include "exec/exchange/local_exchanger.h"
45
#include "exec/exchange/vdata_stream_mgr.h"
46
#include "exec/operator/aggregation_sink_operator.h"
47
#include "exec/operator/aggregation_source_operator.h"
48
#include "exec/operator/analytic_sink_operator.h"
49
#include "exec/operator/analytic_source_operator.h"
50
#include "exec/operator/assert_num_rows_operator.h"
51
#include "exec/operator/blackhole_sink_operator.h"
52
#include "exec/operator/cache_sink_operator.h"
53
#include "exec/operator/cache_source_operator.h"
54
#include "exec/operator/datagen_operator.h"
55
#include "exec/operator/dict_sink_operator.h"
56
#include "exec/operator/distinct_streaming_aggregation_operator.h"
57
#include "exec/operator/empty_set_operator.h"
58
#include "exec/operator/es_scan_operator.h"
59
#include "exec/operator/exchange_sink_operator.h"
60
#include "exec/operator/exchange_source_operator.h"
61
#include "exec/operator/file_scan_operator.h"
62
#include "exec/operator/group_commit_block_sink_operator.h"
63
#include "exec/operator/group_commit_scan_operator.h"
64
#include "exec/operator/hashjoin_build_sink.h"
65
#include "exec/operator/hashjoin_probe_operator.h"
66
#include "exec/operator/hive_table_sink_operator.h"
67
#include "exec/operator/iceberg_delete_sink_operator.h"
68
#include "exec/operator/iceberg_merge_sink_operator.h"
69
#include "exec/operator/iceberg_table_sink_operator.h"
70
#include "exec/operator/jdbc_scan_operator.h"
71
#include "exec/operator/jdbc_table_sink_operator.h"
72
#include "exec/operator/local_merge_sort_source_operator.h"
73
#include "exec/operator/materialization_opertor.h"
74
#include "exec/operator/maxcompute_table_sink_operator.h"
75
#include "exec/operator/memory_scratch_sink_operator.h"
76
#include "exec/operator/meta_scan_operator.h"
77
#include "exec/operator/multi_cast_data_stream_sink.h"
78
#include "exec/operator/multi_cast_data_stream_source.h"
79
#include "exec/operator/nested_loop_join_build_operator.h"
80
#include "exec/operator/nested_loop_join_probe_operator.h"
81
#include "exec/operator/olap_scan_operator.h"
82
#include "exec/operator/olap_table_sink_operator.h"
83
#include "exec/operator/olap_table_sink_v2_operator.h"
84
#include "exec/operator/partition_sort_sink_operator.h"
85
#include "exec/operator/partition_sort_source_operator.h"
86
#include "exec/operator/partitioned_aggregation_sink_operator.h"
87
#include "exec/operator/partitioned_aggregation_source_operator.h"
88
#include "exec/operator/partitioned_hash_join_probe_operator.h"
89
#include "exec/operator/partitioned_hash_join_sink_operator.h"
90
#include "exec/operator/rec_cte_anchor_sink_operator.h"
91
#include "exec/operator/rec_cte_scan_operator.h"
92
#include "exec/operator/rec_cte_sink_operator.h"
93
#include "exec/operator/rec_cte_source_operator.h"
94
#include "exec/operator/repeat_operator.h"
95
#include "exec/operator/result_file_sink_operator.h"
96
#include "exec/operator/result_sink_operator.h"
97
#include "exec/operator/schema_scan_operator.h"
98
#include "exec/operator/select_operator.h"
99
#include "exec/operator/set_probe_sink_operator.h"
100
#include "exec/operator/set_sink_operator.h"
101
#include "exec/operator/set_source_operator.h"
102
#include "exec/operator/sort_sink_operator.h"
103
#include "exec/operator/sort_source_operator.h"
104
#include "exec/operator/spill_iceberg_table_sink_operator.h"
105
#include "exec/operator/spill_sort_sink_operator.h"
106
#include "exec/operator/spill_sort_source_operator.h"
107
#include "exec/operator/streaming_aggregation_operator.h"
108
#include "exec/operator/table_function_operator.h"
109
#include "exec/operator/tvf_table_sink_operator.h"
110
#include "exec/operator/union_sink_operator.h"
111
#include "exec/operator/union_source_operator.h"
112
#include "exec/pipeline/dependency.h"
113
#include "exec/pipeline/pipeline_task.h"
114
#include "exec/pipeline/task_scheduler.h"
115
#include "exec/runtime_filter/runtime_filter_mgr.h"
116
#include "exec/sort/topn_sorter.h"
117
#include "exec/spill/spill_file.h"
118
#include "io/fs/stream_load_pipe.h"
119
#include "load/stream_load/new_load_stream_mgr.h"
120
#include "runtime/exec_env.h"
121
#include "runtime/fragment_mgr.h"
122
#include "runtime/result_block_buffer.h"
123
#include "runtime/result_buffer_mgr.h"
124
#include "runtime/runtime_state.h"
125
#include "runtime/thread_context.h"
126
#include "service/backend_options.h"
127
#include "util/countdown_latch.h"
128
#include "util/debug_util.h"
129
#include "util/uid_util.h"
130
131
namespace doris {
132
#include "common/compile_check_begin.h"
133
PipelineFragmentContext::PipelineFragmentContext(
134
        TUniqueId query_id, const TPipelineFragmentParams& request,
135
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
136
        const std::function<void(RuntimeState*, Status*)>& call_back,
137
        report_status_callback report_status_cb)
138
431k
        : _query_id(std::move(query_id)),
139
431k
          _fragment_id(request.fragment_id),
140
431k
          _exec_env(exec_env),
141
431k
          _query_ctx(std::move(query_ctx)),
142
431k
          _call_back(call_back),
143
431k
          _is_report_on_cancel(true),
144
431k
          _report_status_cb(std::move(report_status_cb)),
145
431k
          _params(request),
146
431k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
147
431k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
148
431k
                                                               : false) {
149
431k
    _fragment_watcher.start();
150
431k
}
151
152
431k
PipelineFragmentContext::~PipelineFragmentContext() {
153
431k
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
154
431k
            .tag("query_id", print_id(_query_id))
155
431k
            .tag("fragment_id", _fragment_id);
156
431k
    _release_resource();
157
431k
    {
158
        // The memory released by the query end is recorded in the query mem tracker.
159
431k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
160
431k
        _runtime_state.reset();
161
431k
        _query_ctx.reset();
162
431k
    }
163
431k
}
164
165
94
bool PipelineFragmentContext::is_timeout(timespec now) const {
166
94
    if (_timeout <= 0) {
167
0
        return false;
168
0
    }
169
94
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
170
94
}
171
172
// notify_close() transitions the PFC from "waiting for external close notification" to
173
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
174
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
175
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
176
9.35k
bool PipelineFragmentContext::notify_close() {
177
9.35k
    bool all_closed = false;
178
9.35k
    bool need_remove = false;
179
9.35k
    {
180
9.35k
        std::lock_guard<std::mutex> l(_task_mutex);
181
9.35k
        if (_closed_tasks >= _total_tasks) {
182
3.49k
            if (_need_notify_close) {
183
                // Fragment was cancelled and waiting for notify to close.
184
                // Record that we need to remove from fragment mgr, but do it
185
                // after releasing _task_mutex to avoid ABBA deadlock with
186
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
187
                // first, then _task_mutex via debug_string()).
188
3.41k
                need_remove = true;
189
3.41k
            }
190
3.49k
            all_closed = true;
191
3.49k
        }
192
        // make fragment release by self after cancel
193
9.35k
        _need_notify_close = false;
194
9.35k
    }
195
9.35k
    if (need_remove) {
196
3.41k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
197
3.41k
    }
198
9.35k
    return all_closed;
199
9.35k
}
200
201
// Must not add lock in this method. Because it will call query ctx cancel. And
202
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
203
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
204
// There maybe dead lock.
205
5.87k
void PipelineFragmentContext::cancel(const Status reason) {
206
5.87k
    LOG_INFO("PipelineFragmentContext::cancel")
207
5.87k
            .tag("query_id", print_id(_query_id))
208
5.87k
            .tag("fragment_id", _fragment_id)
209
5.87k
            .tag("reason", reason.to_string());
210
5.87k
    if (notify_close()) {
211
100
        return;
212
100
    }
213
    // Timeout is a special error code, we need print current stack to debug timeout issue.
214
5.77k
    if (reason.is<ErrorCode::TIMEOUT>()) {
215
8
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
216
8
                                   debug_string());
217
8
        LOG_LONG_STRING(WARNING, dbg_str);
218
8
    }
219
220
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
221
5.77k
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
222
0
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
223
0
                    debug_string());
224
0
    }
225
226
5.77k
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
227
54
        print_profile("cancel pipeline, reason: " + reason.to_string());
228
54
    }
229
230
5.77k
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
231
23
        _query_ctx->set_load_error_url(error_url);
232
23
    }
233
234
5.77k
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
235
23
        _query_ctx->set_first_error_msg(first_error_msg);
236
23
    }
237
238
5.77k
    _query_ctx->cancel(reason, _fragment_id);
239
5.77k
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
240
481
        _is_report_on_cancel = false;
241
5.29k
    } else {
242
21.4k
        for (auto& id : _fragment_instance_ids) {
243
21.4k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
244
21.4k
        }
245
5.29k
    }
246
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
247
    // For stream load the fragment's query_id == load id, it is set in FE.
248
5.77k
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
249
5.77k
    if (stream_load_ctx != nullptr) {
250
30
        stream_load_ctx->pipe->cancel(reason.to_string());
251
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
252
        // We need to set the error URL at this point to ensure error information is properly
253
        // propagated to the client.
254
30
        stream_load_ctx->error_url = get_load_error_url();
255
30
        stream_load_ctx->first_error_msg = get_first_error_msg();
256
30
    }
257
258
23.1k
    for (auto& tasks : _tasks) {
259
51.5k
        for (auto& task : tasks) {
260
51.5k
            task.first->unblock_all_dependencies();
261
51.5k
        }
262
23.1k
    }
263
5.77k
}
264
265
692k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
266
692k
    PipelineId id = _next_pipeline_id++;
267
692k
    auto pipeline = std::make_shared<Pipeline>(
268
692k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
269
692k
            parent ? parent->num_tasks() : _num_instances);
270
692k
    if (idx >= 0) {
271
126k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
272
565k
    } else {
273
565k
        _pipelines.emplace_back(pipeline);
274
565k
    }
275
692k
    if (parent) {
276
254k
        parent->set_children(pipeline);
277
254k
    }
278
692k
    return pipeline;
279
692k
}
280
281
431k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
282
431k
    {
283
431k
        SCOPED_TIMER(_build_pipelines_timer);
284
        // 2. Build pipelines with operators in this fragment.
285
431k
        auto root_pipeline = add_pipeline();
286
431k
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
287
431k
                                         &_root_op, root_pipeline));
288
289
        // 3. Create sink operator
290
431k
        if (!_params.fragment.__isset.output_sink) {
291
0
            return Status::InternalError("No output sink in this fragment!");
292
0
        }
293
431k
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
294
431k
                                          _params.fragment.output_exprs, _params,
295
431k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
296
431k
                                          *_desc_tbl, root_pipeline->id()));
297
431k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
298
431k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
299
300
564k
        for (PipelinePtr& pipeline : _pipelines) {
301
18.4E
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
302
564k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
303
564k
        }
304
431k
    }
305
    // 4. Build local exchanger
306
431k
    if (_runtime_state->enable_local_shuffle()) {
307
428k
        SCOPED_TIMER(_plan_local_exchanger_timer);
308
428k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
309
428k
                                             _params.bucket_seq_to_instance_idx,
310
428k
                                             _params.shuffle_idx_to_instance_idx));
311
428k
    }
312
313
    // 5. Initialize global states in pipelines.
314
692k
    for (PipelinePtr& pipeline : _pipelines) {
315
692k
        SCOPED_TIMER(_prepare_all_pipelines_timer);
316
692k
        pipeline->children().clear();
317
692k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
318
692k
    }
319
320
430k
    {
321
430k
        SCOPED_TIMER(_build_tasks_timer);
322
        // 6. Build pipeline tasks and initialize local state.
323
430k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
324
430k
    }
325
326
430k
    return Status::OK();
327
430k
}
328
329
431k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
330
431k
    if (_prepared) {
331
0
        return Status::InternalError("Already prepared");
332
0
    }
333
431k
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
334
431k
        _timeout = _params.query_options.execution_timeout;
335
431k
    }
336
337
431k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
338
431k
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
339
431k
    SCOPED_TIMER(_prepare_timer);
340
431k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
341
431k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
342
431k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
343
431k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
344
431k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
345
431k
    {
346
431k
        SCOPED_TIMER(_init_context_timer);
347
431k
        cast_set(_num_instances, _params.local_params.size());
348
431k
        _total_instances =
349
431k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
350
351
431k
        auto* fragment_context = this;
352
353
431k
        if (_params.query_options.__isset.is_report_success) {
354
430k
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
355
430k
        }
356
357
        // 1. Set up the global runtime state.
358
431k
        _runtime_state = RuntimeState::create_unique(
359
431k
                _params.query_id, _params.fragment_id, _params.query_options,
360
431k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
361
431k
        _runtime_state->set_task_execution_context(shared_from_this());
362
431k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
363
431k
        if (_params.__isset.backend_id) {
364
428k
            _runtime_state->set_backend_id(_params.backend_id);
365
428k
        }
366
431k
        if (_params.__isset.import_label) {
367
238
            _runtime_state->set_import_label(_params.import_label);
368
238
        }
369
431k
        if (_params.__isset.db_name) {
370
190
            _runtime_state->set_db_name(_params.db_name);
371
190
        }
372
431k
        if (_params.__isset.load_job_id) {
373
0
            _runtime_state->set_load_job_id(_params.load_job_id);
374
0
        }
375
376
431k
        if (_params.is_simplified_param) {
377
148k
            _desc_tbl = _query_ctx->desc_tbl;
378
282k
        } else {
379
282k
            DCHECK(_params.__isset.desc_tbl);
380
282k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
381
282k
                                                  &_desc_tbl));
382
282k
        }
383
431k
        _runtime_state->set_desc_tbl(_desc_tbl);
384
431k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
385
431k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
386
431k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
387
431k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
388
389
        // init fragment_instance_ids
390
431k
        const auto target_size = _params.local_params.size();
391
431k
        _fragment_instance_ids.resize(target_size);
392
1.57M
        for (size_t i = 0; i < _params.local_params.size(); i++) {
393
1.14M
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
394
1.14M
            _fragment_instance_ids[i] = fragment_instance_id;
395
1.14M
        }
396
431k
    }
397
398
431k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
399
400
430k
    _init_next_report_time();
401
402
430k
    _prepared = true;
403
430k
    return Status::OK();
404
431k
}
405
406
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
407
        int instance_idx,
408
1.14M
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
409
1.14M
    const auto& local_params = _params.local_params[instance_idx];
410
1.14M
    auto fragment_instance_id = local_params.fragment_instance_id;
411
1.14M
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
412
1.14M
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
413
1.14M
    auto get_shared_state = [&](PipelinePtr pipeline)
414
1.14M
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
415
2.03M
                                       std::vector<std::shared_ptr<Dependency>>>> {
416
2.03M
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
417
2.03M
                                std::vector<std::shared_ptr<Dependency>>>>
418
2.03M
                shared_state_map;
419
2.57M
        for (auto& op : pipeline->operators()) {
420
2.57M
            auto source_id = op->operator_id();
421
2.57M
            if (auto iter = _op_id_to_shared_state.find(source_id);
422
2.57M
                iter != _op_id_to_shared_state.end()) {
423
806k
                shared_state_map.insert({source_id, iter->second});
424
806k
            }
425
2.57M
        }
426
2.03M
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
427
2.03M
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
428
2.03M
                iter != _op_id_to_shared_state.end()) {
429
375k
                shared_state_map.insert({sink_to_source_id, iter->second});
430
375k
            }
431
2.03M
        }
432
2.03M
        return shared_state_map;
433
2.03M
    };
434
435
3.60M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
436
2.46M
        auto& pipeline = _pipelines[pip_idx];
437
2.46M
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
438
2.03M
            auto task_runtime_state = RuntimeState::create_unique(
439
2.03M
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
440
2.03M
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
441
2.03M
            {
442
                // Initialize runtime state for this task
443
2.03M
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
444
445
2.03M
                task_runtime_state->set_task_execution_context(shared_from_this());
446
2.03M
                task_runtime_state->set_be_number(local_params.backend_num);
447
448
2.03M
                if (_params.__isset.backend_id) {
449
2.03M
                    task_runtime_state->set_backend_id(_params.backend_id);
450
2.03M
                }
451
2.03M
                if (_params.__isset.import_label) {
452
239
                    task_runtime_state->set_import_label(_params.import_label);
453
239
                }
454
2.03M
                if (_params.__isset.db_name) {
455
191
                    task_runtime_state->set_db_name(_params.db_name);
456
191
                }
457
2.03M
                if (_params.__isset.load_job_id) {
458
0
                    task_runtime_state->set_load_job_id(_params.load_job_id);
459
0
                }
460
2.03M
                if (_params.__isset.wal_id) {
461
114
                    task_runtime_state->set_wal_id(_params.wal_id);
462
114
                }
463
2.03M
                if (_params.__isset.content_length) {
464
31
                    task_runtime_state->set_content_length(_params.content_length);
465
31
                }
466
467
2.03M
                task_runtime_state->set_desc_tbl(_desc_tbl);
468
2.03M
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
469
2.03M
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
470
2.03M
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
471
2.03M
                task_runtime_state->set_max_operator_id(max_operator_id());
472
2.03M
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
473
2.03M
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
474
2.03M
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
475
476
2.03M
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
477
2.03M
            }
478
2.03M
            auto cur_task_id = _total_tasks++;
479
2.03M
            task_runtime_state->set_task_id(cur_task_id);
480
2.03M
            task_runtime_state->set_task_num(pipeline->num_tasks());
481
2.03M
            auto task = std::make_shared<PipelineTask>(
482
2.03M
                    pipeline, cur_task_id, task_runtime_state.get(),
483
2.03M
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
484
2.03M
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
485
2.03M
                    instance_idx);
486
2.03M
            pipeline->incr_created_tasks(instance_idx, task.get());
487
2.03M
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
488
2.03M
            _tasks[instance_idx].emplace_back(
489
2.03M
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
490
2.03M
                            std::move(task), std::move(task_runtime_state)});
491
2.03M
        }
492
2.46M
    }
493
494
    /**
495
         * Build DAG for pipeline tasks.
496
         * For example, we have
497
         *
498
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
499
         *            \                      /
500
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
501
         *                 \                          /
502
         *               JoinProbeOperator2 (Pipeline1)
503
         *
504
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
505
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
506
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
507
         *
508
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
509
         * and JoinProbeOperator2.
510
         */
511
2.46M
    for (auto& _pipeline : _pipelines) {
512
2.46M
        if (pipeline_id_to_task.contains(_pipeline->id())) {
513
2.03M
            auto* task = pipeline_id_to_task[_pipeline->id()];
514
2.03M
            DCHECK(task != nullptr);
515
516
            // If this task has upstream dependency, then inject it into this task.
517
2.03M
            if (_dag.contains(_pipeline->id())) {
518
1.32M
                auto& deps = _dag[_pipeline->id()];
519
2.11M
                for (auto& dep : deps) {
520
2.11M
                    if (pipeline_id_to_task.contains(dep)) {
521
1.25M
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
522
1.25M
                        if (ss) {
523
506k
                            task->inject_shared_state(ss);
524
744k
                        } else {
525
744k
                            pipeline_id_to_task[dep]->inject_shared_state(
526
744k
                                    task->get_source_shared_state());
527
744k
                        }
528
1.25M
                    }
529
2.11M
                }
530
1.32M
            }
531
2.03M
        }
532
2.46M
    }
533
3.60M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
534
2.46M
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
535
2.03M
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
536
2.03M
            DCHECK(pipeline_id_to_profile[pip_idx]);
537
2.03M
            std::vector<TScanRangeParams> scan_ranges;
538
2.03M
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
539
2.03M
            if (local_params.per_node_scan_ranges.contains(node_id)) {
540
342k
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
541
342k
            }
542
2.03M
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
543
2.03M
                                                             _params.fragment.output_sink));
544
2.03M
        }
545
2.46M
    }
546
1.14M
    {
547
1.14M
        std::lock_guard<std::mutex> l(_state_map_lock);
548
1.14M
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
549
1.14M
    }
550
1.14M
    return Status::OK();
551
1.14M
}
552
553
430k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
554
430k
    _total_tasks = 0;
555
430k
    _closed_tasks = 0;
556
430k
    const auto target_size = _params.local_params.size();
557
430k
    _tasks.resize(target_size);
558
430k
    _runtime_filter_mgr_map.resize(target_size);
559
1.12M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
560
691k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
561
691k
    }
562
430k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
563
564
430k
    if (target_size > 1 &&
565
430k
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
566
138k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
567
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
568
21.1k
        std::vector<Status> prepare_status(target_size);
569
21.1k
        int submitted_tasks = 0;
570
21.1k
        Status submit_status;
571
21.1k
        CountDownLatch latch((int)target_size);
572
265k
        for (int i = 0; i < target_size; i++) {
573
244k
            submit_status = thread_pool->submit_func([&, i]() {
574
244k
                SCOPED_ATTACH_TASK(_query_ctx.get());
575
244k
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
576
244k
                latch.count_down();
577
244k
            });
578
244k
            if (LIKELY(submit_status.ok())) {
579
244k
                submitted_tasks++;
580
18.4E
            } else {
581
18.4E
                break;
582
18.4E
            }
583
244k
        }
584
21.1k
        latch.arrive_and_wait(target_size - submitted_tasks);
585
21.1k
        if (UNLIKELY(!submit_status.ok())) {
586
0
            return submit_status;
587
0
        }
588
265k
        for (int i = 0; i < submitted_tasks; i++) {
589
244k
            if (!prepare_status[i].ok()) {
590
0
                return prepare_status[i];
591
0
            }
592
244k
        }
593
409k
    } else {
594
1.30M
        for (int i = 0; i < target_size; i++) {
595
898k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
596
898k
        }
597
409k
    }
598
430k
    _pipeline_parent_map.clear();
599
430k
    _op_id_to_shared_state.clear();
600
601
430k
    return Status::OK();
602
430k
}
603
604
429k
void PipelineFragmentContext::_init_next_report_time() {
605
429k
    auto interval_s = config::pipeline_status_report_interval;
606
429k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
607
41.2k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
608
41.2k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
609
        // We don't want to wait longer than it takes to run the entire fragment.
610
41.2k
        _previous_report_time =
611
41.2k
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
612
41.2k
        _disable_period_report = false;
613
41.2k
    }
614
429k
}
615
616
4.95k
void PipelineFragmentContext::refresh_next_report_time() {
617
4.95k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
618
4.95k
    DCHECK(disable == true);
619
4.95k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
620
4.95k
    _disable_period_report.compare_exchange_strong(disable, false);
621
4.95k
}
622
623
7.01M
void PipelineFragmentContext::trigger_report_if_necessary() {
624
7.01M
    if (!_is_report_success) {
625
6.50M
        return;
626
6.50M
    }
627
506k
    auto disable = _disable_period_report.load(std::memory_order_acquire);
628
506k
    if (disable) {
629
8.71k
        return;
630
8.71k
    }
631
497k
    int32_t interval_s = config::pipeline_status_report_interval;
632
497k
    if (interval_s <= 0) {
633
0
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
634
0
                        "trigger "
635
0
                        "report.";
636
0
    }
637
497k
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
638
497k
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
639
497k
    if (MonotonicNanos() > next_report_time) {
640
4.96k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
641
4.96k
                                                            std::memory_order_acq_rel)) {
642
3
            return;
643
3
        }
644
4.95k
        if (VLOG_FILE_IS_ON) {
645
0
            VLOG_FILE << "Reporting "
646
0
                      << "profile for query_id " << print_id(_query_id)
647
0
                      << ", fragment id: " << _fragment_id;
648
649
0
            std::stringstream ss;
650
0
            _runtime_state->runtime_profile()->compute_time_in_profile();
651
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
652
0
            if (_runtime_state->load_channel_profile()) {
653
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
654
0
            }
655
656
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
657
0
                      << " profile:\n"
658
0
                      << ss.str();
659
0
        }
660
4.95k
        auto st = send_report(false);
661
4.95k
        if (!st.ok()) {
662
0
            disable = true;
663
0
            _disable_period_report.compare_exchange_strong(disable, false,
664
0
                                                           std::memory_order_acq_rel);
665
0
        }
666
4.95k
    }
667
497k
}
668
669
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
670
429k
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
671
429k
    if (_params.fragment.plan.nodes.empty()) {
672
0
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
673
0
    }
674
675
429k
    int node_idx = 0;
676
677
429k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
678
429k
                                        &node_idx, root, cur_pipe, 0, false, false));
679
680
429k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
681
0
        return Status::InternalError(
682
0
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
683
0
    }
684
429k
    return Status::OK();
685
429k
}
686
687
Status PipelineFragmentContext::_create_tree_helper(
688
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
689
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
690
672k
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
691
    // propagate error case
692
672k
    if (*node_idx >= tnodes.size()) {
693
0
        return Status::InternalError(
694
0
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
695
0
                *node_idx, tnodes.size());
696
0
    }
697
672k
    const TPlanNode& tnode = tnodes[*node_idx];
698
699
672k
    int num_children = tnodes[*node_idx].num_children;
700
672k
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
701
672k
    bool current_require_bucket_distribution = require_bucket_distribution;
702
    // TODO: Create CacheOperator is confused now
703
672k
    OperatorPtr op = nullptr;
704
672k
    OperatorPtr cache_op = nullptr;
705
672k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
706
672k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
707
672k
                                     followed_by_shuffled_operator,
708
672k
                                     current_require_bucket_distribution, cache_op));
709
    // Initialization must be done here. For example, group by expressions in agg will be used to
710
    // decide if a local shuffle should be planed, so it must be initialized here.
711
672k
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
712
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
713
672k
    if (parent != nullptr) {
714
        // add to parent's child(s)
715
241k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
716
430k
    } else {
717
430k
        *root = op;
718
430k
    }
719
    /**
720
     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
721
     *
722
     * For plan:
723
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
724
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
725
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
726
     *
727
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
728
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
729
     */
730
672k
    auto required_data_distribution =
731
672k
            cur_pipe->operators().empty()
732
672k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
733
672k
                    : op->required_data_distribution(_runtime_state.get());
734
672k
    current_followed_by_shuffled_operator =
735
672k
            ((followed_by_shuffled_operator ||
736
672k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
737
610k
                                             : op->is_shuffled_operator())) &&
738
672k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
739
672k
            (followed_by_shuffled_operator &&
740
557k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
741
742
672k
    current_require_bucket_distribution =
743
672k
            ((require_bucket_distribution ||
744
672k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
745
615k
                                             : op->is_colocated_operator())) &&
746
672k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
747
672k
            (require_bucket_distribution &&
748
563k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
749
750
672k
    if (num_children == 0) {
751
445k
        _use_serial_source = op->is_serial_operator();
752
445k
    }
753
    // rely on that tnodes is preorder of the plan
754
914k
    for (int i = 0; i < num_children; i++) {
755
241k
        ++*node_idx;
756
241k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
757
241k
                                            current_followed_by_shuffled_operator,
758
241k
                                            current_require_bucket_distribution));
759
760
        // we are expecting a child, but have used all nodes
761
        // this means we have been given a bad tree and must fail
762
241k
        if (*node_idx >= tnodes.size()) {
763
0
            return Status::InternalError(
764
0
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
765
0
                    "nodes: {}",
766
0
                    *node_idx, tnodes.size());
767
0
        }
768
241k
    }
769
770
672k
    return Status::OK();
771
672k
}
772
773
void PipelineFragmentContext::_inherit_pipeline_properties(
774
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
775
126k
        PipelinePtr pipe_with_sink) {
776
126k
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
777
126k
    pipe_with_source->set_num_tasks(_num_instances);
778
126k
    pipe_with_source->set_data_distribution(data_distribution);
779
126k
}
780
781
Status PipelineFragmentContext::_add_local_exchange_impl(
782
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
783
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
784
        const std::map<int, int>& bucket_seq_to_instance_idx,
785
126k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
786
126k
    auto& operators = cur_pipe->operators();
787
126k
    const auto downstream_pipeline_id = cur_pipe->id();
788
126k
    auto local_exchange_id = next_operator_id();
789
    // 1. Create a new pipeline with local exchange sink.
790
126k
    DataSinkOperatorPtr sink;
791
126k
    auto sink_id = next_sink_operator_id();
792
793
    /**
794
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
795
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
796
     */
797
126k
    const bool followed_by_shuffled_operator =
798
126k
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
799
126k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
800
126k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
801
126k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
802
126k
                                         followed_by_shuffled_operator && !_use_serial_source;
803
126k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
804
126k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
805
126k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
806
126k
    if (bucket_seq_to_instance_idx.empty() &&
807
126k
        data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
808
9
        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
809
9
    }
810
126k
    RETURN_IF_ERROR(new_pip->set_sink(sink));
811
126k
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
812
126k
                                          num_buckets, use_global_hash_shuffle,
813
126k
                                          shuffle_idx_to_instance_idx));
814
815
    // 2. Create and initialize LocalExchangeSharedState.
816
126k
    std::shared_ptr<LocalExchangeSharedState> shared_state =
817
126k
            LocalExchangeSharedState::create_shared(_num_instances);
818
126k
    switch (data_distribution.distribution_type) {
819
27.0k
    case ExchangeType::HASH_SHUFFLE:
820
27.0k
        shared_state->exchanger = ShuffleExchanger::create_unique(
821
27.0k
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
822
27.0k
                use_global_hash_shuffle ? _total_instances : _num_instances,
823
27.0k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
824
27.0k
                        ? cast_set<int>(
825
27.0k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
826
27.0k
                        : 0);
827
27.0k
        break;
828
640
    case ExchangeType::BUCKET_HASH_SHUFFLE:
829
640
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
830
640
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
831
640
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
832
640
                        ? cast_set<int>(
833
640
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
834
640
                        : 0);
835
640
        break;
836
94.6k
    case ExchangeType::PASSTHROUGH:
837
94.6k
        shared_state->exchanger = PassthroughExchanger::create_unique(
838
94.6k
                cur_pipe->num_tasks(), _num_instances,
839
94.6k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
840
94.6k
                        ? cast_set<int>(
841
94.6k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
842
94.6k
                        : 0);
843
94.6k
        break;
844
482
    case ExchangeType::BROADCAST:
845
482
        shared_state->exchanger = BroadcastExchanger::create_unique(
846
482
                cur_pipe->num_tasks(), _num_instances,
847
482
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
848
482
                        ? cast_set<int>(
849
482
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
850
482
                        : 0);
851
482
        break;
852
2.97k
    case ExchangeType::PASS_TO_ONE:
853
2.97k
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
854
            // If shared hash table is enabled for BJ, hash table will be built by only one task
855
1.47k
            shared_state->exchanger = PassToOneExchanger::create_unique(
856
1.47k
                    cur_pipe->num_tasks(), _num_instances,
857
1.47k
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
858
1.47k
                            ? cast_set<int>(_runtime_state->query_options()
859
1.47k
                                                    .local_exchange_free_blocks_limit)
860
1.47k
                            : 0);
861
1.50k
        } else {
862
1.50k
            shared_state->exchanger = BroadcastExchanger::create_unique(
863
1.50k
                    cur_pipe->num_tasks(), _num_instances,
864
1.50k
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
865
1.50k
                            ? cast_set<int>(_runtime_state->query_options()
866
1.50k
                                                    .local_exchange_free_blocks_limit)
867
1.50k
                            : 0);
868
1.50k
        }
869
2.97k
        break;
870
967
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
871
967
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
872
967
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
873
967
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
874
967
                        ? cast_set<int>(
875
967
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
876
967
                        : 0);
877
967
        break;
878
0
    default:
879
0
        return Status::InternalError("Unsupported local exchange type : " +
880
0
                                     std::to_string((int)data_distribution.distribution_type));
881
126k
    }
882
126k
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
883
126k
                                             "LOCAL_EXCHANGE_OPERATOR");
884
126k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
885
126k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
886
887
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
888
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
889
890
    // 3.1 Initialize new pipeline's operator list.
891
126k
    std::copy(operators.begin(), operators.begin() + idx,
892
126k
              std::inserter(new_pip->operators(), new_pip->operators().end()));
893
894
    // 3.2 Erase unused operators in previous pipeline.
895
126k
    operators.erase(operators.begin(), operators.begin() + idx);
896
897
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
898
126k
    OperatorPtr source_op;
899
126k
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
900
126k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
901
126k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
902
126k
    if (!operators.empty()) {
903
44.5k
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
904
44.5k
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
905
44.5k
    }
906
126k
    operators.insert(operators.begin(), source_op);
907
908
    // 5. Set children for two pipelines separately.
909
126k
    std::vector<std::shared_ptr<Pipeline>> new_children;
910
126k
    std::vector<PipelineId> edges_with_source;
911
146k
    for (auto child : cur_pipe->children()) {
912
146k
        bool found = false;
913
161k
        for (auto op : new_pip->operators()) {
914
161k
            if (child->sink()->node_id() == op->node_id()) {
915
13.6k
                new_pip->set_children(child);
916
13.6k
                found = true;
917
13.6k
            };
918
161k
        }
919
146k
        if (!found) {
920
132k
            new_children.push_back(child);
921
132k
            edges_with_source.push_back(child->id());
922
132k
        }
923
146k
    }
924
126k
    new_children.push_back(new_pip);
925
126k
    edges_with_source.push_back(new_pip->id());
926
927
    // 6. Set DAG for new pipelines.
928
126k
    if (!new_pip->children().empty()) {
929
7.74k
        std::vector<PipelineId> edges_with_sink;
930
13.6k
        for (auto child : new_pip->children()) {
931
13.6k
            edges_with_sink.push_back(child->id());
932
13.6k
        }
933
7.74k
        _dag.insert({new_pip->id(), edges_with_sink});
934
7.74k
    }
935
126k
    cur_pipe->set_children(new_children);
936
126k
    _dag[downstream_pipeline_id] = edges_with_source;
937
126k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
938
126k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
939
126k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
940
941
    // 7. Inherit properties from current pipeline.
942
126k
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
943
126k
    return Status::OK();
944
126k
}
945
946
Status PipelineFragmentContext::_add_local_exchange(
947
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
948
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
949
        const std::map<int, int>& bucket_seq_to_instance_idx,
950
198k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
951
198k
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
952
50.8k
        return Status::OK();
953
50.8k
    }
954
955
147k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
956
47.8k
        return Status::OK();
957
47.8k
    }
958
99.3k
    *do_local_exchange = true;
959
960
99.3k
    auto& operators = cur_pipe->operators();
961
99.3k
    auto total_op_num = operators.size();
962
99.3k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
963
99.3k
    RETURN_IF_ERROR(_add_local_exchange_impl(
964
99.3k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
965
99.3k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
966
967
18.4E
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
968
18.4E
            << "total_op_num: " << total_op_num
969
18.4E
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
970
18.4E
            << " new_pip->operators().size(): " << new_pip->operators().size();
971
972
    // There are some local shuffles with relatively heavy operations on the sink.
973
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
974
    // Therefore, local passthrough is used to increase the concurrency of the sink.
975
    // op -> local sink(1) -> local source (n)
976
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
977
99.3k
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
978
99.3k
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
979
27.3k
        RETURN_IF_ERROR(_add_local_exchange_impl(
980
27.3k
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
981
27.3k
                add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
982
27.3k
                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
983
27.3k
                shuffle_idx_to_instance_idx));
984
27.3k
    }
985
99.3k
    return Status::OK();
986
99.3k
}
987
988
Status PipelineFragmentContext::_plan_local_exchange(
989
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
990
428k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
991
990k
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
992
562k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
993
        // Set property if child pipeline is not join operator's child.
994
562k
        if (!_pipelines[pip_idx]->children().empty()) {
995
127k
            for (auto& child : _pipelines[pip_idx]->children()) {
996
127k
                if (child->sink()->node_id() ==
997
127k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
998
114k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
999
114k
                }
1000
127k
            }
1001
122k
        }
1002
1003
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1004
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1005
        // still keep colocate plan after local shuffle
1006
562k
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1007
562k
                                             bucket_seq_to_instance_idx,
1008
562k
                                             shuffle_idx_to_instance_idx));
1009
562k
    }
1010
428k
    return Status::OK();
1011
428k
}
1012
1013
Status PipelineFragmentContext::_plan_local_exchange(
1014
        int num_buckets, int pip_idx, PipelinePtr pip,
1015
        const std::map<int, int>& bucket_seq_to_instance_idx,
1016
561k
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1017
561k
    int idx = 1;
1018
561k
    bool do_local_exchange = false;
1019
606k
    do {
1020
606k
        auto& ops = pip->operators();
1021
606k
        do_local_exchange = false;
1022
        // Plan local exchange for each operator.
1023
676k
        for (; idx < ops.size();) {
1024
114k
            if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1025
106k
                RETURN_IF_ERROR(_add_local_exchange(
1026
106k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
1027
106k
                        ops[idx]->required_data_distribution(_runtime_state.get()),
1028
106k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1029
106k
                        shuffle_idx_to_instance_idx));
1030
106k
            }
1031
114k
            if (do_local_exchange) {
1032
                // If local exchange is needed for current operator, we will split this pipeline to
1033
                // two pipelines by local exchange sink/source. And then we need to process remaining
1034
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1035
                // is current operator was already processed) and continue to plan local exchange.
1036
44.5k
                idx = 2;
1037
44.5k
                break;
1038
44.5k
            }
1039
69.9k
            idx++;
1040
69.9k
        }
1041
606k
    } while (do_local_exchange);
1042
561k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1043
90.9k
        RETURN_IF_ERROR(_add_local_exchange(
1044
90.9k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1045
90.9k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1046
90.9k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1047
90.9k
    }
1048
561k
    return Status::OK();
1049
561k
}
1050
1051
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1052
                                                  const std::vector<TExpr>& output_exprs,
1053
                                                  const TPipelineFragmentParams& params,
1054
                                                  const RowDescriptor& row_desc,
1055
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1056
431k
                                                  PipelineId cur_pipeline_id) {
1057
431k
    switch (thrift_sink.type) {
1058
146k
    case TDataSinkType::DATA_STREAM_SINK: {
1059
146k
        if (!thrift_sink.__isset.stream_sink) {
1060
0
            return Status::InternalError("Missing data stream sink.");
1061
0
        }
1062
146k
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1063
146k
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1064
146k
                params.destinations, _fragment_instance_ids);
1065
146k
        break;
1066
146k
    }
1067
247k
    case TDataSinkType::RESULT_SINK: {
1068
247k
        if (!thrift_sink.__isset.result_sink) {
1069
0
            return Status::InternalError("Missing data buffer sink.");
1070
0
        }
1071
1072
247k
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), row_desc,
1073
247k
                                                      output_exprs, thrift_sink.result_sink);
1074
247k
        break;
1075
247k
    }
1076
104
    case TDataSinkType::DICTIONARY_SINK: {
1077
104
        if (!thrift_sink.__isset.dictionary_sink) {
1078
0
            return Status::InternalError("Missing dict sink.");
1079
0
        }
1080
1081
104
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1082
104
                                                    thrift_sink.dictionary_sink);
1083
104
        break;
1084
104
    }
1085
0
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1086
30.4k
    case TDataSinkType::OLAP_TABLE_SINK: {
1087
30.4k
        if (state->query_options().enable_memtable_on_sink_node &&
1088
30.4k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1089
30.4k
            !config::is_cloud_mode()) {
1090
2.12k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(pool, next_sink_operator_id(),
1091
2.12k
                                                               row_desc, output_exprs);
1092
28.2k
        } else {
1093
28.2k
            _sink = std::make_shared<OlapTableSinkOperatorX>(pool, next_sink_operator_id(),
1094
28.2k
                                                             row_desc, output_exprs);
1095
28.2k
        }
1096
30.4k
        break;
1097
0
    }
1098
165
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1099
165
        DCHECK(thrift_sink.__isset.olap_table_sink);
1100
165
        DCHECK(state->get_query_ctx() != nullptr);
1101
165
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1102
165
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1103
165
                                                                output_exprs);
1104
165
        break;
1105
0
    }
1106
1.46k
    case TDataSinkType::HIVE_TABLE_SINK: {
1107
1.46k
        if (!thrift_sink.__isset.hive_table_sink) {
1108
0
            return Status::InternalError("Missing hive table sink.");
1109
0
        }
1110
1.46k
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1111
1.46k
                                                         output_exprs);
1112
1.46k
        break;
1113
1.46k
    }
1114
1.72k
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1115
1.72k
        if (!thrift_sink.__isset.iceberg_table_sink) {
1116
0
            return Status::InternalError("Missing iceberg table sink.");
1117
0
        }
1118
1.72k
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1119
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1120
0
                                                                     row_desc, output_exprs);
1121
1.72k
        } else {
1122
1.72k
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1123
1.72k
                                                                row_desc, output_exprs);
1124
1.72k
        }
1125
1.72k
        break;
1126
1.72k
    }
1127
20
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1128
20
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1129
0
            return Status::InternalError("Missing iceberg delete sink.");
1130
0
        }
1131
20
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1132
20
                                                             row_desc, output_exprs);
1133
20
        break;
1134
20
    }
1135
80
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1136
80
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1137
0
            return Status::InternalError("Missing iceberg merge sink.");
1138
0
        }
1139
80
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1140
80
                                                            output_exprs);
1141
80
        break;
1142
80
    }
1143
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1144
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1145
0
            return Status::InternalError("Missing max compute table sink.");
1146
0
        }
1147
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1148
0
                                                       output_exprs);
1149
0
        break;
1150
0
    }
1151
80
    case TDataSinkType::JDBC_TABLE_SINK: {
1152
80
        if (!thrift_sink.__isset.jdbc_table_sink) {
1153
0
            return Status::InternalError("Missing data jdbc sink.");
1154
0
        }
1155
80
        if (config::enable_java_support) {
1156
80
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1157
80
                                                             output_exprs);
1158
80
        } else {
1159
0
            return Status::InternalError(
1160
0
                    "Jdbc table sink is not enabled, you can change be config "
1161
0
                    "enable_java_support to true and restart be.");
1162
0
        }
1163
80
        break;
1164
80
    }
1165
80
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1166
3
        if (!thrift_sink.__isset.memory_scratch_sink) {
1167
0
            return Status::InternalError("Missing data buffer sink.");
1168
0
        }
1169
1170
3
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1171
3
                                                             output_exprs);
1172
3
        break;
1173
3
    }
1174
502
    case TDataSinkType::RESULT_FILE_SINK: {
1175
502
        if (!thrift_sink.__isset.result_file_sink) {
1176
0
            return Status::InternalError("Missing result file sink.");
1177
0
        }
1178
1179
        // Result file sink is not the top sink
1180
502
        if (params.__isset.destinations && !params.destinations.empty()) {
1181
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1182
0
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1183
0
                    params.destinations, output_exprs, desc_tbl);
1184
502
        } else {
1185
502
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1186
502
                                                              output_exprs);
1187
502
        }
1188
502
        break;
1189
502
    }
1190
2.27k
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1191
2.27k
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1192
2.27k
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1193
2.27k
        auto sink_id = next_sink_operator_id();
1194
2.27k
        const int multi_cast_node_id = sink_id;
1195
2.27k
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1196
        // one sink has multiple sources.
1197
2.27k
        std::vector<int> sources;
1198
8.88k
        for (int i = 0; i < sender_size; ++i) {
1199
6.60k
            auto source_id = next_operator_id();
1200
6.60k
            sources.push_back(source_id);
1201
6.60k
        }
1202
1203
2.27k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1204
2.27k
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1205
8.88k
        for (int i = 0; i < sender_size; ++i) {
1206
6.60k
            auto new_pipeline = add_pipeline();
1207
            // use to exchange sink
1208
6.60k
            RowDescriptor* exchange_row_desc = nullptr;
1209
6.60k
            {
1210
6.60k
                const auto& tmp_row_desc =
1211
6.60k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1212
6.61k
                                ? RowDescriptor(state->desc_tbl(),
1213
6.61k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1214
6.61k
                                                         .output_tuple_id})
1215
18.4E
                                : row_desc;
1216
6.60k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1217
6.60k
            }
1218
6.60k
            auto source_id = sources[i];
1219
6.60k
            OperatorPtr source_op;
1220
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1221
6.60k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1222
6.60k
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1223
6.60k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1224
6.60k
                    /*operator_id=*/source_id);
1225
6.60k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1226
6.60k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1227
            // 2. create and set sink operator of data stream sender for new pipeline
1228
1229
6.60k
            DataSinkOperatorPtr sink_op;
1230
6.60k
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1231
6.60k
                    state, *exchange_row_desc, next_sink_operator_id(),
1232
6.60k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1233
6.60k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1234
1235
6.60k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1236
6.60k
            {
1237
6.60k
                TDataSink* t = pool->add(new TDataSink());
1238
6.60k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1239
6.60k
                RETURN_IF_ERROR(sink_op->init(*t));
1240
6.60k
            }
1241
1242
            // 3. set dependency dag
1243
6.60k
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1244
6.60k
        }
1245
2.27k
        if (sources.empty()) {
1246
0
            return Status::InternalError("size of sources must be greater than 0");
1247
0
        }
1248
2.27k
        break;
1249
2.27k
    }
1250
2.27k
    case TDataSinkType::BLACKHOLE_SINK: {
1251
13
        if (!thrift_sink.__isset.blackhole_sink) {
1252
0
            return Status::InternalError("Missing blackhole sink.");
1253
0
        }
1254
1255
13
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1256
13
        break;
1257
13
    }
1258
156
    case TDataSinkType::TVF_TABLE_SINK: {
1259
156
        if (!thrift_sink.__isset.tvf_table_sink) {
1260
0
            return Status::InternalError("Missing TVF table sink.");
1261
0
        }
1262
156
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1263
156
                                                        output_exprs);
1264
156
        break;
1265
156
    }
1266
0
    default:
1267
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1268
431k
    }
1269
430k
    return Status::OK();
1270
431k
}
1271
1272
// NOLINTBEGIN(readability-function-size)
1273
// NOLINTBEGIN(readability-function-cognitive-complexity)
1274
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1275
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1276
                                                 PipelinePtr& cur_pipe, int parent_idx,
1277
                                                 int child_idx,
1278
                                                 const bool followed_by_shuffled_operator,
1279
                                                 const bool require_bucket_distribution,
1280
673k
                                                 OperatorPtr& cache_op) {
1281
673k
    std::vector<DataSinkOperatorPtr> sink_ops;
1282
673k
    Defer defer = Defer([&]() {
1283
672k
        if (op) {
1284
672k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1285
672k
        }
1286
672k
        for (auto& s : sink_ops) {
1287
127k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1288
127k
        }
1289
672k
    });
1290
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1291
    // Therefore, here we need to use a stack-like structure.
1292
673k
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1293
673k
    std::stringstream error_msg;
1294
673k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1295
1296
673k
    bool fe_with_old_version = false;
1297
673k
    switch (tnode.node_type) {
1298
205k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1299
205k
        op = std::make_shared<OlapScanOperatorX>(
1300
205k
                pool, tnode, next_operator_id(), descs, _num_instances,
1301
205k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1302
205k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1303
205k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1304
205k
        break;
1305
205k
    }
1306
77
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1307
77
        DCHECK(_query_ctx != nullptr);
1308
77
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1309
77
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1310
77
                                                    _num_instances);
1311
77
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1312
77
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1313
77
        break;
1314
77
    }
1315
0
    case TPlanNodeType::JDBC_SCAN_NODE: {
1316
0
        if (config::enable_java_support) {
1317
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1318
0
                                                     _num_instances);
1319
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1320
0
        } else {
1321
0
            return Status::InternalError(
1322
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1323
0
                    "to true and restart be.");
1324
0
        }
1325
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1326
0
        break;
1327
0
    }
1328
22.2k
    case TPlanNodeType::FILE_SCAN_NODE: {
1329
22.2k
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1330
22.2k
                                                 _num_instances);
1331
22.2k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1332
22.2k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1333
22.2k
        break;
1334
22.2k
    }
1335
0
    case TPlanNodeType::ES_SCAN_NODE:
1336
592
    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
1337
592
        op = std::make_shared<EsScanOperatorX>(pool, tnode, next_operator_id(), descs,
1338
592
                                               _num_instances);
1339
592
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1340
592
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1341
592
        break;
1342
592
    }
1343
150k
    case TPlanNodeType::EXCHANGE_NODE: {
1344
150k
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1345
151k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1346
18.4E
                                  : 0;
1347
150k
        DCHECK_GT(num_senders, 0);
1348
150k
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1349
150k
                                                       num_senders);
1350
150k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1351
150k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1352
150k
        break;
1353
150k
    }
1354
160k
    case TPlanNodeType::AGGREGATION_NODE: {
1355
160k
        if (tnode.agg_node.grouping_exprs.empty() &&
1356
160k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1357
0
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1358
0
                                         ": group by and output is empty");
1359
0
        }
1360
160k
        bool need_create_cache_op =
1361
160k
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1362
160k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1363
24
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1364
24
            auto cache_source_id = next_operator_id();
1365
24
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1366
24
                                                        _params.fragment.query_cache_param);
1367
24
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1368
1369
24
            const auto downstream_pipeline_id = cur_pipe->id();
1370
24
            if (!_dag.contains(downstream_pipeline_id)) {
1371
24
                _dag.insert({downstream_pipeline_id, {}});
1372
24
            }
1373
24
            new_pipe = add_pipeline(cur_pipe);
1374
24
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1375
1376
24
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1377
24
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1378
24
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1379
24
            return Status::OK();
1380
24
        };
1381
160k
        const bool group_by_limit_opt =
1382
160k
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1383
1384
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1385
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1386
160k
        const bool enable_spill = _runtime_state->enable_spill() &&
1387
160k
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1388
160k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1389
160k
                                      tnode.agg_node.use_streaming_preaggregation &&
1390
160k
                                      !tnode.agg_node.grouping_exprs.empty();
1391
        // TODO: distinct streaming agg does not support spill.
1392
160k
        const bool can_use_distinct_streaming_agg =
1393
160k
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1394
160k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1395
160k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1396
160k
                _params.query_options.enable_distinct_streaming_aggregation;
1397
1398
160k
        if (can_use_distinct_streaming_agg) {
1399
93.2k
            if (need_create_cache_op) {
1400
8
                PipelinePtr new_pipe;
1401
8
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1402
1403
8
                cache_op = op;
1404
8
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1405
8
                                                                     tnode, descs);
1406
8
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1407
8
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1408
8
                cur_pipe = new_pipe;
1409
93.2k
            } else {
1410
93.2k
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1411
93.2k
                                                                     tnode, descs);
1412
93.2k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1413
93.2k
            }
1414
93.2k
        } else if (is_streaming_agg) {
1415
3.10k
            if (need_create_cache_op) {
1416
0
                PipelinePtr new_pipe;
1417
0
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1418
0
                cache_op = op;
1419
0
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1420
0
                                                             descs);
1421
0
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1422
0
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1423
0
                cur_pipe = new_pipe;
1424
3.10k
            } else {
1425
3.10k
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1426
3.10k
                                                             descs);
1427
3.10k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1428
3.10k
            }
1429
64.1k
        } else {
1430
            // create new pipeline to add query cache operator
1431
64.1k
            PipelinePtr new_pipe;
1432
64.1k
            if (need_create_cache_op) {
1433
16
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1434
16
                cache_op = op;
1435
16
            }
1436
1437
64.1k
            if (enable_spill) {
1438
604
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1439
604
                                                                     next_operator_id(), descs);
1440
63.5k
            } else {
1441
63.5k
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1442
63.5k
            }
1443
64.1k
            if (need_create_cache_op) {
1444
16
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1445
16
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1446
16
                cur_pipe = new_pipe;
1447
64.1k
            } else {
1448
64.1k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1449
64.1k
            }
1450
1451
64.1k
            const auto downstream_pipeline_id = cur_pipe->id();
1452
64.1k
            if (!_dag.contains(downstream_pipeline_id)) {
1453
61.7k
                _dag.insert({downstream_pipeline_id, {}});
1454
61.7k
            }
1455
64.1k
            cur_pipe = add_pipeline(cur_pipe);
1456
64.1k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1457
1458
64.1k
            if (enable_spill) {
1459
604
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1460
604
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1461
63.5k
            } else {
1462
63.5k
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1463
63.5k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1464
63.5k
            }
1465
64.1k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1466
64.1k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1467
64.1k
        }
1468
160k
        break;
1469
160k
    }
1470
160k
    case TPlanNodeType::HASH_JOIN_NODE: {
1471
7.85k
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1472
7.85k
                                       tnode.hash_join_node.is_broadcast_join;
1473
7.85k
        const auto enable_spill = _runtime_state->enable_spill();
1474
7.85k
        if (enable_spill && !is_broadcast_join) {
1475
0
            auto tnode_ = tnode;
1476
0
            tnode_.runtime_filters.clear();
1477
0
            auto inner_probe_operator =
1478
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1479
1480
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1481
            // So here use `tnode_` which has no runtime filters.
1482
0
            auto probe_side_inner_sink_operator =
1483
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1484
1485
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1486
0
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1487
1488
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1489
0
                    pool, tnode_, next_operator_id(), descs);
1490
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1491
0
                                                inner_probe_operator);
1492
0
            op = std::move(probe_operator);
1493
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1494
1495
0
            const auto downstream_pipeline_id = cur_pipe->id();
1496
0
            if (!_dag.contains(downstream_pipeline_id)) {
1497
0
                _dag.insert({downstream_pipeline_id, {}});
1498
0
            }
1499
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1500
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1501
1502
0
            auto inner_sink_operator =
1503
0
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1504
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1505
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1506
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1507
1508
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1509
0
            sink_ops.push_back(std::move(sink_operator));
1510
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1511
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1512
1513
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1514
0
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1515
7.85k
        } else {
1516
7.85k
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1517
7.85k
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1518
1519
7.85k
            const auto downstream_pipeline_id = cur_pipe->id();
1520
7.85k
            if (!_dag.contains(downstream_pipeline_id)) {
1521
7.04k
                _dag.insert({downstream_pipeline_id, {}});
1522
7.04k
            }
1523
7.85k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1524
7.85k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1525
1526
7.85k
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1527
7.85k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1528
7.85k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1529
7.85k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1530
1531
7.85k
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1532
7.85k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1533
7.85k
        }
1534
7.85k
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1535
2.56k
            std::shared_ptr<HashJoinSharedState> shared_state =
1536
2.56k
                    HashJoinSharedState::create_shared(_num_instances);
1537
17.1k
            for (int i = 0; i < _num_instances; i++) {
1538
14.5k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1539
14.5k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1540
14.5k
                sink_dep->set_shared_state(shared_state.get());
1541
14.5k
                shared_state->sink_deps.push_back(sink_dep);
1542
14.5k
            }
1543
2.56k
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1544
2.56k
                                                     op->node_id(), "HASH_JOIN_PROBE");
1545
2.56k
            _op_id_to_shared_state.insert(
1546
2.56k
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1547
2.56k
        }
1548
7.85k
        break;
1549
7.85k
    }
1550
5.27k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1551
5.27k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1552
5.27k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1553
1554
5.27k
        const auto downstream_pipeline_id = cur_pipe->id();
1555
5.27k
        if (!_dag.contains(downstream_pipeline_id)) {
1556
5.02k
            _dag.insert({downstream_pipeline_id, {}});
1557
5.02k
        }
1558
5.27k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1559
5.27k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1560
1561
5.27k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1562
5.27k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1563
5.27k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1564
5.27k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1565
5.27k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1566
5.27k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1567
5.27k
        break;
1568
5.27k
    }
1569
53.1k
    case TPlanNodeType::UNION_NODE: {
1570
53.1k
        int child_count = tnode.num_children;
1571
53.1k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1572
53.1k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1573
1574
53.1k
        const auto downstream_pipeline_id = cur_pipe->id();
1575
53.1k
        if (!_dag.contains(downstream_pipeline_id)) {
1576
52.9k
            _dag.insert({downstream_pipeline_id, {}});
1577
52.9k
        }
1578
54.9k
        for (int i = 0; i < child_count; i++) {
1579
1.80k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1580
1.80k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1581
1.80k
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1582
1.80k
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1583
1.80k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1584
1.80k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1585
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1586
1.80k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1587
1.80k
        }
1588
53.1k
        break;
1589
53.1k
    }
1590
53.1k
    case TPlanNodeType::SORT_NODE: {
1591
45.9k
        const auto should_spill = _runtime_state->enable_spill() &&
1592
45.9k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1593
45.9k
        const bool use_local_merge =
1594
45.9k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1595
45.9k
        if (should_spill) {
1596
9
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1597
45.9k
        } else if (use_local_merge) {
1598
43.3k
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1599
43.3k
                                                                 descs);
1600
43.3k
        } else {
1601
2.61k
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1602
2.61k
        }
1603
45.9k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1604
1605
45.9k
        const auto downstream_pipeline_id = cur_pipe->id();
1606
45.9k
        if (!_dag.contains(downstream_pipeline_id)) {
1607
45.9k
            _dag.insert({downstream_pipeline_id, {}});
1608
45.9k
        }
1609
45.9k
        cur_pipe = add_pipeline(cur_pipe);
1610
45.9k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1611
1612
45.9k
        if (should_spill) {
1613
9
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1614
9
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1615
45.9k
        } else {
1616
45.9k
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1617
45.9k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1618
45.9k
        }
1619
45.9k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1620
45.9k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1621
45.9k
        break;
1622
45.9k
    }
1623
45.9k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1624
64
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1625
64
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1626
1627
64
        const auto downstream_pipeline_id = cur_pipe->id();
1628
64
        if (!_dag.contains(downstream_pipeline_id)) {
1629
64
            _dag.insert({downstream_pipeline_id, {}});
1630
64
        }
1631
64
        cur_pipe = add_pipeline(cur_pipe);
1632
64
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1633
1634
64
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1635
64
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1636
64
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1637
64
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1638
64
        break;
1639
64
    }
1640
1.67k
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1641
1.67k
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1642
1.67k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1643
1644
1.67k
        const auto downstream_pipeline_id = cur_pipe->id();
1645
1.67k
        if (!_dag.contains(downstream_pipeline_id)) {
1646
1.66k
            _dag.insert({downstream_pipeline_id, {}});
1647
1.66k
        }
1648
1.67k
        cur_pipe = add_pipeline(cur_pipe);
1649
1.67k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1650
1651
1.67k
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1652
1.67k
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1653
1.67k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1654
1.67k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1655
1.67k
        break;
1656
1.67k
    }
1657
1.67k
    case TPlanNodeType::MATERIALIZATION_NODE: {
1658
1.61k
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1659
1.61k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1660
1.61k
        break;
1661
1.61k
    }
1662
1.61k
    case TPlanNodeType::INTERSECT_NODE: {
1663
119
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1664
119
                                                                      cur_pipe, sink_ops));
1665
119
        break;
1666
119
    }
1667
129
    case TPlanNodeType::EXCEPT_NODE: {
1668
129
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1669
129
                                                                       cur_pipe, sink_ops));
1670
129
        break;
1671
129
    }
1672
313
    case TPlanNodeType::REPEAT_NODE: {
1673
313
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1674
313
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1675
313
        break;
1676
313
    }
1677
927
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1678
927
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1679
927
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1680
927
        break;
1681
927
    }
1682
927
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1683
218
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1684
218
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1685
218
        break;
1686
218
    }
1687
1.59k
    case TPlanNodeType::EMPTY_SET_NODE: {
1688
1.59k
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1689
1.59k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1690
1.59k
        break;
1691
1.59k
    }
1692
1.59k
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1693
459
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1694
459
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1695
459
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1696
459
        break;
1697
459
    }
1698
2.31k
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1699
2.31k
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1700
2.31k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1701
2.31k
        break;
1702
2.31k
    }
1703
7.59k
    case TPlanNodeType::META_SCAN_NODE: {
1704
7.59k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1705
7.59k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1706
7.59k
        break;
1707
7.59k
    }
1708
7.59k
    case TPlanNodeType::SELECT_NODE: {
1709
2.03k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1710
2.03k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1711
2.03k
        break;
1712
2.03k
    }
1713
2.03k
    case TPlanNodeType::REC_CTE_NODE: {
1714
151
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1715
151
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1716
1717
151
        const auto downstream_pipeline_id = cur_pipe->id();
1718
151
        if (!_dag.contains(downstream_pipeline_id)) {
1719
148
            _dag.insert({downstream_pipeline_id, {}});
1720
148
        }
1721
1722
151
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1723
151
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1724
1725
151
        DataSinkOperatorPtr anchor_sink;
1726
151
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1727
151
                                                                  op->operator_id(), tnode, descs);
1728
151
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1729
151
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1730
151
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1731
1732
151
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1733
151
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1734
1735
151
        DataSinkOperatorPtr rec_sink;
1736
151
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
1737
151
                                                         tnode, descs);
1738
151
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
1739
151
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
1740
151
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
1741
1742
151
        break;
1743
151
    }
1744
1.95k
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
1745
1.95k
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
1746
1.95k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1747
1.95k
        break;
1748
1.95k
    }
1749
1.95k
    default:
1750
0
        return Status::InternalError("Unsupported exec type in pipeline: {}",
1751
0
                                     print_plan_node_type(tnode.node_type));
1752
673k
    }
1753
672k
    if (_params.__isset.parallel_instances && fe_with_old_version) {
1754
0
        cur_pipe->set_num_tasks(_params.parallel_instances);
1755
0
        op->set_serial_operator();
1756
0
    }
1757
1758
672k
    return Status::OK();
1759
673k
}
1760
// NOLINTEND(readability-function-cognitive-complexity)
1761
// NOLINTEND(readability-function-size)
1762
1763
template <bool is_intersect>
1764
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
1765
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1766
248
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1767
248
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1768
248
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1769
1770
248
    const auto downstream_pipeline_id = cur_pipe->id();
1771
248
    if (!_dag.contains(downstream_pipeline_id)) {
1772
231
        _dag.insert({downstream_pipeline_id, {}});
1773
231
    }
1774
1775
837
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1776
589
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1777
589
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1778
1779
589
        if (child_id == 0) {
1780
248
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1781
248
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1782
341
        } else {
1783
341
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1784
341
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
341
        }
1786
589
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1787
589
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1788
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1789
589
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1790
589
    }
1791
1792
248
    return Status::OK();
1793
248
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1766
119
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1767
119
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1768
119
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1769
1770
119
    const auto downstream_pipeline_id = cur_pipe->id();
1771
119
    if (!_dag.contains(downstream_pipeline_id)) {
1772
110
        _dag.insert({downstream_pipeline_id, {}});
1773
110
    }
1774
1775
435
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1776
316
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1777
316
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1778
1779
316
        if (child_id == 0) {
1780
119
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1781
119
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1782
197
        } else {
1783
197
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1784
197
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
197
        }
1786
316
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1787
316
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1788
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1789
316
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1790
316
    }
1791
1792
119
    return Status::OK();
1793
119
}
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1766
129
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1767
129
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1768
129
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1769
1770
129
    const auto downstream_pipeline_id = cur_pipe->id();
1771
129
    if (!_dag.contains(downstream_pipeline_id)) {
1772
121
        _dag.insert({downstream_pipeline_id, {}});
1773
121
    }
1774
1775
402
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1776
273
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1777
273
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1778
1779
273
        if (child_id == 0) {
1780
129
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1781
129
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1782
144
        } else {
1783
144
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1784
144
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1785
144
        }
1786
273
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1787
273
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1788
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1789
273
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1790
273
    }
1791
1792
129
    return Status::OK();
1793
129
}
1794
1795
429k
Status PipelineFragmentContext::submit() {
1796
429k
    if (_submitted) {
1797
0
        return Status::InternalError("submitted");
1798
0
    }
1799
429k
    _submitted = true;
1800
1801
429k
    int submit_tasks = 0;
1802
429k
    Status st;
1803
429k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
1804
1.14M
    for (auto& task : _tasks) {
1805
2.03M
        for (auto& t : task) {
1806
2.03M
            st = scheduler->submit(t.first);
1807
2.03M
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
1808
2.03M
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
1809
2.03M
            if (!st) {
1810
0
                cancel(Status::InternalError("submit context to executor fail"));
1811
0
                std::lock_guard<std::mutex> l(_task_mutex);
1812
0
                _total_tasks = submit_tasks;
1813
0
                break;
1814
0
            }
1815
2.03M
            submit_tasks++;
1816
2.03M
        }
1817
1.14M
    }
1818
429k
    if (!st.ok()) {
1819
0
        bool need_remove = false;
1820
0
        {
1821
0
            std::lock_guard<std::mutex> l(_task_mutex);
1822
0
            if (_closed_tasks >= _total_tasks) {
1823
0
                need_remove = _close_fragment_instance();
1824
0
            }
1825
0
        }
1826
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1827
0
        if (need_remove) {
1828
0
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1829
0
        }
1830
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
1831
0
                                     BackendOptions::get_localhost());
1832
429k
    } else {
1833
429k
        return st;
1834
429k
    }
1835
429k
}
1836
1837
54
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
1838
54
    if (_runtime_state->enable_profile()) {
1839
0
        std::stringstream ss;
1840
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1841
0
            runtime_profile_ptr->pretty_print(&ss);
1842
0
        }
1843
1844
0
        if (_runtime_state->load_channel_profile()) {
1845
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1846
0
        }
1847
1848
0
        auto profile_str =
1849
0
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
1850
0
                            this->_fragment_id, extra_info, ss.str());
1851
0
        LOG_LONG_STRING(INFO, profile_str);
1852
0
    }
1853
54
}
1854
// If all pipeline tasks binded to the fragment instance are finished, then we could
1855
// close the fragment instance.
1856
// Returns true if the caller should call remove_pipeline_context() **after** releasing
1857
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
1858
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
1859
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
1860
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
1861
// (via debug_string()).
1862
430k
bool PipelineFragmentContext::_close_fragment_instance() {
1863
430k
    if (_is_fragment_instance_closed) {
1864
0
        return false;
1865
0
    }
1866
430k
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
1867
430k
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1868
430k
    if (!_need_notify_close) {
1869
427k
        auto st = send_report(true);
1870
427k
        if (!st) {
1871
0
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1872
0
                                        print_id(_query_id), _fragment_id, st.to_string());
1873
0
        }
1874
427k
    }
1875
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
1876
    // Since stream load does not have someting like coordinator on FE, so
1877
    // backend can not report profile to FE, ant its profile can not be shown
1878
    // in the same way with other query. So we print the profile content to info log.
1879
1880
430k
    if (_runtime_state->enable_profile() &&
1881
430k
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
1882
3.07k
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
1883
3.07k
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
1884
0
        std::stringstream ss;
1885
        // Compute the _local_time_percent before pretty_print the runtime_profile
1886
        // Before add this operation, the print out like that:
1887
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
1888
        // After add the operation, the print out like that:
1889
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
1890
        // We can easily know the exec node execute time without child time consumed.
1891
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1892
0
            runtime_profile_ptr->pretty_print(&ss);
1893
0
        }
1894
1895
0
        if (_runtime_state->load_channel_profile()) {
1896
0
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1897
0
        }
1898
1899
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
1900
0
    }
1901
1902
430k
    if (_query_ctx->enable_profile()) {
1903
3.07k
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
1904
3.07k
                                         collect_realtime_load_channel_profile());
1905
3.07k
    }
1906
1907
    // Return whether the caller needs to remove from the pipeline map.
1908
    // The caller must do this after releasing _task_mutex.
1909
430k
    return !_need_notify_close;
1910
430k
}
1911
1912
2.02M
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
1913
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
1914
2.02M
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
1915
2.02M
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
1916
691k
        if (_dag.contains(pipeline_id)) {
1917
387k
            for (auto dep : _dag[pipeline_id]) {
1918
387k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
1919
387k
            }
1920
302k
        }
1921
691k
    }
1922
2.02M
    bool need_remove = false;
1923
2.02M
    {
1924
2.02M
        std::lock_guard<std::mutex> l(_task_mutex);
1925
2.02M
        ++_closed_tasks;
1926
2.02M
        if (_closed_tasks >= _total_tasks) {
1927
430k
            need_remove = _close_fragment_instance();
1928
430k
        }
1929
2.02M
    }
1930
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1931
2.02M
    if (need_remove) {
1932
427k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1933
427k
    }
1934
2.02M
}
1935
1936
53.4k
std::string PipelineFragmentContext::get_load_error_url() {
1937
53.4k
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
1938
0
        return to_load_error_http_path(str);
1939
0
    }
1940
141k
    for (auto& tasks : _tasks) {
1941
225k
        for (auto& task : tasks) {
1942
225k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
1943
164
                return to_load_error_http_path(str);
1944
164
            }
1945
225k
        }
1946
141k
    }
1947
53.3k
    return "";
1948
53.4k
}
1949
1950
53.4k
std::string PipelineFragmentContext::get_first_error_msg() {
1951
53.4k
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
1952
0
        return str;
1953
0
    }
1954
141k
    for (auto& tasks : _tasks) {
1955
225k
        for (auto& task : tasks) {
1956
225k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
1957
164
                return str;
1958
164
            }
1959
225k
        }
1960
141k
    }
1961
53.3k
    return "";
1962
53.4k
}
1963
1964
432k
Status PipelineFragmentContext::send_report(bool done) {
1965
432k
    Status exec_status = _query_ctx->exec_status();
1966
    // If plan is done successfully, but _is_report_success is false,
1967
    // no need to send report.
1968
    // Load will set _is_report_success to true because load wants to know
1969
    // the process.
1970
432k
    if (!_is_report_success && done && exec_status.ok()) {
1971
384k
        return Status::OK();
1972
384k
    }
1973
1974
    // If both _is_report_success and _is_report_on_cancel are false,
1975
    // which means no matter query is success or failed, no report is needed.
1976
    // This may happen when the query limit reached and
1977
    // a internal cancellation being processed
1978
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
1979
    // be set to false, to avoid sending fault report to FE.
1980
48.0k
    if (!_is_report_success && !_is_report_on_cancel) {
1981
394
        if (done) {
1982
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
1983
394
            return Status::OK();
1984
394
        }
1985
0
        return Status::NeedSendAgain("");
1986
394
    }
1987
1988
47.6k
    std::vector<RuntimeState*> runtime_states;
1989
1990
118k
    for (auto& tasks : _tasks) {
1991
173k
        for (auto& task : tasks) {
1992
173k
            runtime_states.push_back(task.second.get());
1993
173k
        }
1994
118k
    }
1995
1996
47.6k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
1997
47.6k
                                        ? get_load_error_url()
1998
18.4E
                                        : _query_ctx->get_load_error_url();
1999
47.6k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
2000
47.6k
                                          ? get_first_error_msg()
2001
18.4E
                                          : _query_ctx->get_first_error_msg();
2002
2003
47.6k
    ReportStatusRequest req {.status = exec_status,
2004
47.6k
                             .runtime_states = runtime_states,
2005
47.6k
                             .done = done || !exec_status.ok(),
2006
47.6k
                             .coord_addr = _query_ctx->coord_addr,
2007
47.6k
                             .query_id = _query_id,
2008
47.6k
                             .fragment_id = _fragment_id,
2009
47.6k
                             .fragment_instance_id = TUniqueId(),
2010
47.6k
                             .backend_num = -1,
2011
47.6k
                             .runtime_state = _runtime_state.get(),
2012
47.6k
                             .load_error_url = load_eror_url,
2013
47.6k
                             .first_error_msg = first_error_msg,
2014
47.6k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2015
2016
47.6k
    return _report_status_cb(
2017
47.6k
            req, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
2018
48.0k
}
2019
2020
40
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2021
40
    size_t res = 0;
2022
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2023
    // here to traverse the vector.
2024
40
    for (const auto& task_instances : _tasks) {
2025
106
        for (const auto& task : task_instances) {
2026
106
            if (task.first->is_running()) {
2027
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2028
0
                                      << " is running, task: " << (void*)task.first.get()
2029
0
                                      << ", is_running: " << task.first->is_running();
2030
0
                *has_running_task = true;
2031
0
                return 0;
2032
0
            }
2033
2034
106
            size_t revocable_size = task.first->get_revocable_size();
2035
106
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2036
10
                res += revocable_size;
2037
10
            }
2038
106
        }
2039
40
    }
2040
40
    return res;
2041
40
}
2042
2043
80
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2044
80
    std::vector<PipelineTask*> revocable_tasks;
2045
80
    for (const auto& task_instances : _tasks) {
2046
212
        for (const auto& task : task_instances) {
2047
212
            size_t revocable_size_ = task.first->get_revocable_size();
2048
2049
212
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2050
20
                revocable_tasks.emplace_back(task.first.get());
2051
20
            }
2052
212
        }
2053
80
    }
2054
80
    return revocable_tasks;
2055
80
}
2056
2057
102
std::string PipelineFragmentContext::debug_string() {
2058
102
    std::lock_guard<std::mutex> l(_task_mutex);
2059
102
    fmt::memory_buffer debug_string_buffer;
2060
102
    fmt::format_to(debug_string_buffer,
2061
102
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2062
102
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2063
102
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2064
614
    for (size_t j = 0; j < _tasks.size(); j++) {
2065
512
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2066
2.13k
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2067
1.62k
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2068
1.62k
                           _tasks[j][i].first->debug_string());
2069
1.62k
        }
2070
512
    }
2071
2072
102
    return fmt::to_string(debug_string_buffer);
2073
102
}
2074
2075
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2076
3.07k
PipelineFragmentContext::collect_realtime_profile() const {
2077
3.07k
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2078
2079
    // we do not have mutex to protect pipeline_id_to_profile
2080
    // so we need to make sure this funciton is invoked after fragment context
2081
    // has already been prepared.
2082
3.07k
    if (!_prepared) {
2083
0
        std::string msg =
2084
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2085
0
        DCHECK(false) << msg;
2086
0
        LOG_ERROR(msg);
2087
0
        return res;
2088
0
    }
2089
2090
    // Make sure first profile is fragment level profile
2091
3.07k
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2092
3.07k
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2093
3.07k
    res.push_back(fragment_profile);
2094
2095
    // pipeline_id_to_profile is initialized in prepare stage
2096
5.64k
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2097
5.64k
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2098
5.64k
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2099
5.64k
        res.push_back(profile_ptr);
2100
5.64k
    }
2101
2102
3.07k
    return res;
2103
3.07k
}
2104
2105
std::shared_ptr<TRuntimeProfileTree>
2106
3.06k
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2107
    // we do not have mutex to protect pipeline_id_to_profile
2108
    // so we need to make sure this funciton is invoked after fragment context
2109
    // has already been prepared.
2110
3.06k
    if (!_prepared) {
2111
0
        std::string msg =
2112
0
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2113
0
        DCHECK(false) << msg;
2114
0
        LOG_ERROR(msg);
2115
0
        return nullptr;
2116
0
    }
2117
2118
7.51k
    for (const auto& tasks : _tasks) {
2119
15.0k
        for (const auto& task : tasks) {
2120
15.0k
            if (task.second->load_channel_profile() == nullptr) {
2121
0
                continue;
2122
0
            }
2123
2124
15.0k
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2125
2126
15.0k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2127
15.0k
                                                           _runtime_state->profile_level());
2128
15.0k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2129
15.0k
        }
2130
7.51k
    }
2131
2132
3.06k
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2133
3.06k
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2134
3.06k
                                                      _runtime_state->profile_level());
2135
3.06k
    return load_channel_profile;
2136
3.06k
}
2137
2138
// Collect runtime filter IDs registered by all tasks in this PFC.
2139
// Used during recursive CTE stage transitions to know which filters to deregister
2140
// before creating the new PFC for the next recursion round.
2141
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2142
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2143
// the vector sizes do not change, and individual RuntimeState filter sets are
2144
// written only during open() which has completed by the time we reach rerun.
2145
3.28k
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2146
3.28k
    std::set<int> result;
2147
8.14k
    for (const auto& _task : _tasks) {
2148
16.4k
        for (const auto& task : _task) {
2149
16.4k
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2150
16.4k
            result.merge(set);
2151
16.4k
        }
2152
8.14k
    }
2153
3.28k
    if (_runtime_state) {
2154
3.28k
        auto set = _runtime_state->get_deregister_runtime_filter();
2155
3.28k
        result.merge(set);
2156
3.28k
    }
2157
3.28k
    return result;
2158
3.28k
}
2159
2160
431k
void PipelineFragmentContext::_release_resource() {
2161
431k
    std::lock_guard<std::mutex> l(_task_mutex);
2162
    // The memory released by the query end is recorded in the query mem tracker.
2163
431k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2164
431k
    auto st = _query_ctx->exec_status();
2165
1.14M
    for (auto& _task : _tasks) {
2166
1.14M
        if (!_task.empty()) {
2167
1.14M
            _call_back(_task.front().first->runtime_state(), &st);
2168
1.14M
        }
2169
1.14M
    }
2170
431k
    _tasks.clear();
2171
431k
    _dag.clear();
2172
431k
    _pip_id_to_pipeline.clear();
2173
431k
    _pipelines.clear();
2174
431k
    _sink.reset();
2175
431k
    _root_op.reset();
2176
431k
    _runtime_filter_mgr_map.clear();
2177
431k
    _op_id_to_shared_state.clear();
2178
431k
}
2179
2180
#include "common/compile_check_end.h"
2181
} // namespace doris