Coverage Report

Created: 2026-04-01 07:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/pipeline/pipeline_fragment_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/pipeline/pipeline_fragment_context.h"
19
20
#include <gen_cpp/DataSinks_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <pthread.h>
24
25
#include <algorithm>
26
#include <cstdlib>
27
// IWYU pragma: no_include <bits/chrono.h>
28
#include <fmt/format.h>
29
30
#include <chrono> // IWYU pragma: keep
31
#include <map>
32
#include <memory>
33
#include <ostream>
34
#include <utility>
35
36
#include "cloud/config.h"
37
#include "common/cast_set.h"
38
#include "common/config.h"
39
#include "common/exception.h"
40
#include "common/logging.h"
41
#include "common/status.h"
42
#include "exec/exchange/local_exchange_sink_operator.h"
43
#include "exec/exchange/local_exchange_source_operator.h"
44
#include "exec/exchange/local_exchanger.h"
45
#include "exec/exchange/vdata_stream_mgr.h"
46
#include "exec/operator/aggregation_sink_operator.h"
47
#include "exec/operator/aggregation_source_operator.h"
48
#include "exec/operator/analytic_sink_operator.h"
49
#include "exec/operator/analytic_source_operator.h"
50
#include "exec/operator/assert_num_rows_operator.h"
51
#include "exec/operator/blackhole_sink_operator.h"
52
#include "exec/operator/cache_sink_operator.h"
53
#include "exec/operator/cache_source_operator.h"
54
#include "exec/operator/datagen_operator.h"
55
#include "exec/operator/dict_sink_operator.h"
56
#include "exec/operator/distinct_streaming_aggregation_operator.h"
57
#include "exec/operator/empty_set_operator.h"
58
#include "exec/operator/es_scan_operator.h"
59
#include "exec/operator/exchange_sink_operator.h"
60
#include "exec/operator/exchange_source_operator.h"
61
#include "exec/operator/file_scan_operator.h"
62
#include "exec/operator/group_commit_block_sink_operator.h"
63
#include "exec/operator/group_commit_scan_operator.h"
64
#include "exec/operator/hashjoin_build_sink.h"
65
#include "exec/operator/hashjoin_probe_operator.h"
66
#include "exec/operator/hive_table_sink_operator.h"
67
#include "exec/operator/iceberg_delete_sink_operator.h"
68
#include "exec/operator/iceberg_merge_sink_operator.h"
69
#include "exec/operator/iceberg_table_sink_operator.h"
70
#include "exec/operator/jdbc_scan_operator.h"
71
#include "exec/operator/jdbc_table_sink_operator.h"
72
#include "exec/operator/local_merge_sort_source_operator.h"
73
#include "exec/operator/materialization_opertor.h"
74
#include "exec/operator/maxcompute_table_sink_operator.h"
75
#include "exec/operator/memory_scratch_sink_operator.h"
76
#include "exec/operator/meta_scan_operator.h"
77
#include "exec/operator/multi_cast_data_stream_sink.h"
78
#include "exec/operator/multi_cast_data_stream_source.h"
79
#include "exec/operator/nested_loop_join_build_operator.h"
80
#include "exec/operator/nested_loop_join_probe_operator.h"
81
#include "exec/operator/olap_scan_operator.h"
82
#include "exec/operator/olap_table_sink_operator.h"
83
#include "exec/operator/olap_table_sink_v2_operator.h"
84
#include "exec/operator/partition_sort_sink_operator.h"
85
#include "exec/operator/partition_sort_source_operator.h"
86
#include "exec/operator/partitioned_aggregation_sink_operator.h"
87
#include "exec/operator/partitioned_aggregation_source_operator.h"
88
#include "exec/operator/partitioned_hash_join_probe_operator.h"
89
#include "exec/operator/partitioned_hash_join_sink_operator.h"
90
#include "exec/operator/rec_cte_anchor_sink_operator.h"
91
#include "exec/operator/rec_cte_scan_operator.h"
92
#include "exec/operator/rec_cte_sink_operator.h"
93
#include "exec/operator/rec_cte_source_operator.h"
94
#include "exec/operator/repeat_operator.h"
95
#include "exec/operator/result_file_sink_operator.h"
96
#include "exec/operator/result_sink_operator.h"
97
#include "exec/operator/schema_scan_operator.h"
98
#include "exec/operator/select_operator.h"
99
#include "exec/operator/set_probe_sink_operator.h"
100
#include "exec/operator/set_sink_operator.h"
101
#include "exec/operator/set_source_operator.h"
102
#include "exec/operator/sort_sink_operator.h"
103
#include "exec/operator/sort_source_operator.h"
104
#include "exec/operator/spill_iceberg_table_sink_operator.h"
105
#include "exec/operator/spill_sort_sink_operator.h"
106
#include "exec/operator/spill_sort_source_operator.h"
107
#include "exec/operator/streaming_aggregation_operator.h"
108
#include "exec/operator/table_function_operator.h"
109
#include "exec/operator/tvf_table_sink_operator.h"
110
#include "exec/operator/union_sink_operator.h"
111
#include "exec/operator/union_source_operator.h"
112
#include "exec/pipeline/dependency.h"
113
#include "exec/pipeline/pipeline_task.h"
114
#include "exec/pipeline/task_scheduler.h"
115
#include "exec/runtime_filter/runtime_filter_mgr.h"
116
#include "exec/sort/topn_sorter.h"
117
#include "exec/spill/spill_file.h"
118
#include "io/fs/stream_load_pipe.h"
119
#include "load/stream_load/new_load_stream_mgr.h"
120
#include "runtime/exec_env.h"
121
#include "runtime/fragment_mgr.h"
122
#include "runtime/result_buffer_mgr.h"
123
#include "runtime/runtime_state.h"
124
#include "runtime/thread_context.h"
125
#include "util/countdown_latch.h"
126
#include "util/debug_util.h"
127
#include "util/uid_util.h"
128
129
namespace doris {
130
#include "common/compile_check_begin.h"
131
PipelineFragmentContext::PipelineFragmentContext(
132
        TUniqueId query_id, const TPipelineFragmentParams& request,
133
        std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
134
        const std::function<void(RuntimeState*, Status*)>& call_back,
135
        report_status_callback report_status_cb)
136
        : _query_id(std::move(query_id)),
137
          _fragment_id(request.fragment_id),
138
333k
          _exec_env(exec_env),
139
333k
          _query_ctx(std::move(query_ctx)),
140
333k
          _call_back(call_back),
141
333k
          _is_report_on_cancel(true),
142
333k
          _report_status_cb(std::move(report_status_cb)),
143
333k
          _params(request),
144
333k
          _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
145
333k
          _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
146
333k
                                                               : false) {
147
333k
    _fragment_watcher.start();
148
333k
}
149
333k
150
333k
PipelineFragmentContext::~PipelineFragmentContext() {
151
    LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext")
152
333k
            .tag("query_id", print_id(_query_id))
153
333k
            .tag("fragment_id", _fragment_id);
154
333k
    _release_resource();
155
333k
    {
156
333k
        // The memory released by the query end is recorded in the query mem tracker.
157
333k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
158
        _runtime_state.reset();
159
333k
        _query_ctx.reset();
160
333k
    }
161
333k
}
162
333k
163
333k
bool PipelineFragmentContext::is_timeout(timespec now) const {
164
    if (_timeout <= 0) {
165
169
        return false;
166
169
    }
167
0
    return _fragment_watcher.elapsed_time_seconds(now) > _timeout;
168
0
}
169
169
170
169
// notify_close() transitions the PFC from "waiting for external close notification" to
171
// "self-managed close". For recursive CTE fragments, the old PFC is kept alive until
172
// the rerun_fragment(wait_for_destroy) RPC calls this to trigger shutdown.
173
// Returns true if all tasks have already closed (i.e., the PFC can be safely destroyed).
174
bool PipelineFragmentContext::notify_close() {
175
    bool all_closed = false;
176
8.94k
    bool need_remove = false;
177
8.94k
    {
178
8.94k
        std::lock_guard<std::mutex> l(_task_mutex);
179
8.94k
        if (_closed_tasks >= _total_tasks) {
180
8.94k
            if (_need_notify_close) {
181
8.94k
                // Fragment was cancelled and waiting for notify to close.
182
3.49k
                // Record that we need to remove from fragment mgr, but do it
183
                // after releasing _task_mutex to avoid ABBA deadlock with
184
                // dump_pipeline_tasks() (which acquires _pipeline_map lock
185
                // first, then _task_mutex via debug_string()).
186
                need_remove = true;
187
            }
188
3.42k
            all_closed = true;
189
3.42k
        }
190
3.49k
        // make fragment release by self after cancel
191
3.49k
        _need_notify_close = false;
192
    }
193
8.94k
    if (need_remove) {
194
8.94k
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
195
8.94k
    }
196
3.42k
    return all_closed;
197
3.42k
}
198
8.94k
199
8.94k
// Must not add lock in this method. Because it will call query ctx cancel. And
200
// QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running
201
// Method like exchange sink buffer will call query ctx cancel. If we add lock here
202
// There maybe dead lock.
203
void PipelineFragmentContext::cancel(const Status reason) {
204
    LOG_INFO("PipelineFragmentContext::cancel")
205
5.46k
            .tag("query_id", print_id(_query_id))
206
5.46k
            .tag("fragment_id", _fragment_id)
207
5.46k
            .tag("reason", reason.to_string());
208
5.46k
    if (notify_close()) {
209
5.46k
        return;
210
5.46k
    }
211
94
    // Timeout is a special error code, we need print current stack to debug timeout issue.
212
94
    if (reason.is<ErrorCode::TIMEOUT>()) {
213
        auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}",
214
5.36k
                                   debug_string());
215
1
        LOG_LONG_STRING(WARNING, dbg_str);
216
1
    }
217
1
218
1
    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished)
219
    if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
220
        LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}",
221
5.36k
                    debug_string());
222
0
    }
223
0
224
0
    if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) {
225
        print_profile("cancel pipeline, reason: " + reason.to_string());
226
5.36k
    }
227
0
228
0
    if (auto error_url = get_load_error_url(); !error_url.empty()) {
229
        _query_ctx->set_load_error_url(error_url);
230
5.36k
    }
231
23
232
23
    if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) {
233
        _query_ctx->set_first_error_msg(first_error_msg);
234
5.36k
    }
235
23
236
23
    _query_ctx->cancel(reason, _fragment_id);
237
    if (reason.is<ErrorCode::LIMIT_REACH>()) {
238
5.36k
        _is_report_on_cancel = false;
239
5.36k
    } else {
240
308
        for (auto& id : _fragment_instance_ids) {
241
5.06k
            LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id);
242
20.4k
        }
243
20.4k
    }
244
20.4k
    // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
245
5.06k
    // For stream load the fragment's query_id == load id, it is set in FE.
246
    auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
247
    if (stream_load_ctx != nullptr) {
248
5.36k
        stream_load_ctx->pipe->cancel(reason.to_string());
249
5.36k
        // Set error URL here because after pipe is cancelled, stream load execution may return early.
250
30
        // We need to set the error URL at this point to ensure error information is properly
251
        // propagated to the client.
252
        stream_load_ctx->error_url = get_load_error_url();
253
        stream_load_ctx->first_error_msg = get_first_error_msg();
254
30
    }
255
30
256
30
    for (auto& tasks : _tasks) {
257
        for (auto& task : tasks) {
258
22.2k
            task.first->unblock_all_dependencies();
259
48.6k
        }
260
48.6k
    }
261
48.6k
}
262
22.2k
263
5.36k
PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
264
    PipelineId id = _next_pipeline_id++;
265
525k
    auto pipeline = std::make_shared<Pipeline>(
266
525k
            id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances,
267
525k
            parent ? parent->num_tasks() : _num_instances);
268
525k
    if (idx >= 0) {
269
525k
        _pipelines.insert(_pipelines.begin() + idx, pipeline);
270
525k
    } else {
271
99.1k
        _pipelines.emplace_back(pipeline);
272
426k
    }
273
426k
    if (parent) {
274
426k
        parent->set_children(pipeline);
275
525k
    }
276
191k
    return pipeline;
277
191k
}
278
525k
279
525k
Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) {
280
    {
281
332k
        SCOPED_TIMER(_build_pipelines_timer);
282
332k
        // 2. Build pipelines with operators in this fragment.
283
332k
        auto root_pipeline = add_pipeline();
284
        RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl,
285
332k
                                         &_root_op, root_pipeline));
286
332k
287
332k
        // 3. Create sink operator
288
        if (!_params.fragment.__isset.output_sink) {
289
            return Status::InternalError("No output sink in this fragment!");
290
332k
        }
291
0
        RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink,
292
0
                                          _params.fragment.output_exprs, _params,
293
332k
                                          root_pipeline->output_row_desc(), _runtime_state.get(),
294
332k
                                          *_desc_tbl, root_pipeline->id()));
295
332k
        RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink));
296
332k
        RETURN_IF_ERROR(root_pipeline->set_sink(_sink));
297
332k
298
332k
        for (PipelinePtr& pipeline : _pipelines) {
299
            DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size();
300
425k
            RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
301
18.4E
        }
302
425k
    }
303
425k
    // 4. Build local exchanger
304
332k
    if (_runtime_state->enable_local_shuffle()) {
305
        SCOPED_TIMER(_plan_local_exchanger_timer);
306
332k
        RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets,
307
330k
                                             _params.bucket_seq_to_instance_idx,
308
330k
                                             _params.shuffle_idx_to_instance_idx));
309
330k
    }
310
330k
311
330k
    // 5. Initialize global states in pipelines.
312
    for (PipelinePtr& pipeline : _pipelines) {
313
        SCOPED_TIMER(_prepare_all_pipelines_timer);
314
526k
        pipeline->children().clear();
315
526k
        RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
316
526k
    }
317
526k
318
526k
    {
319
        SCOPED_TIMER(_build_tasks_timer);
320
331k
        // 6. Build pipeline tasks and initialize local state.
321
331k
        RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool));
322
    }
323
331k
324
331k
    return Status::OK();
325
}
326
331k
327
331k
Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) {
328
    if (_prepared) {
329
332k
        return Status::InternalError("Already prepared");
330
332k
    }
331
0
    if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) {
332
0
        _timeout = _params.query_options.execution_timeout;
333
332k
    }
334
332k
335
332k
    _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext");
336
    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
337
332k
    SCOPED_TIMER(_prepare_timer);
338
332k
    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime");
339
332k
    _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime");
340
332k
    _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime");
341
332k
    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
342
332k
    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime");
343
332k
    {
344
332k
        SCOPED_TIMER(_init_context_timer);
345
332k
        cast_set(_num_instances, _params.local_params.size());
346
332k
        _total_instances =
347
332k
                _params.__isset.total_instances ? _params.total_instances : _num_instances;
348
332k
349
332k
        auto* fragment_context = this;
350
351
332k
        if (_params.query_options.__isset.is_report_success) {
352
            fragment_context->set_is_report_success(_params.query_options.is_report_success);
353
332k
        }
354
331k
355
331k
        // 1. Set up the global runtime state.
356
        _runtime_state = RuntimeState::create_unique(
357
                _params.query_id, _params.fragment_id, _params.query_options,
358
332k
                _query_ctx->query_globals, _exec_env, _query_ctx.get());
359
332k
        _runtime_state->set_task_execution_context(shared_from_this());
360
332k
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
361
332k
        if (_params.__isset.backend_id) {
362
332k
            _runtime_state->set_backend_id(_params.backend_id);
363
332k
        }
364
330k
        if (_params.__isset.import_label) {
365
330k
            _runtime_state->set_import_label(_params.import_label);
366
332k
        }
367
242
        if (_params.__isset.db_name) {
368
242
            _runtime_state->set_db_name(_params.db_name);
369
332k
        }
370
194
        if (_params.__isset.load_job_id) {
371
194
            _runtime_state->set_load_job_id(_params.load_job_id);
372
332k
        }
373
0
374
0
        if (_params.is_simplified_param) {
375
            _desc_tbl = _query_ctx->desc_tbl;
376
332k
        } else {
377
117k
            DCHECK(_params.__isset.desc_tbl);
378
215k
            RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl,
379
215k
                                                  &_desc_tbl));
380
215k
        }
381
215k
        _runtime_state->set_desc_tbl(_desc_tbl);
382
215k
        _runtime_state->set_num_per_fragment_instances(_params.num_senders);
383
332k
        _runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
384
332k
        _runtime_state->set_total_load_streams(_params.total_load_streams);
385
332k
        _runtime_state->set_num_local_sink(_params.num_local_sink);
386
332k
387
332k
        // init fragment_instance_ids
388
        const auto target_size = _params.local_params.size();
389
        _fragment_instance_ids.resize(target_size);
390
332k
        for (size_t i = 0; i < _params.local_params.size(); i++) {
391
332k
            auto fragment_instance_id = _params.local_params[i].fragment_instance_id;
392
1.30M
            _fragment_instance_ids[i] = fragment_instance_id;
393
977k
        }
394
977k
    }
395
977k
396
332k
    RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool));
397
398
332k
    _init_next_report_time();
399
400
331k
    _prepared = true;
401
    return Status::OK();
402
331k
}
403
331k
404
332k
Status PipelineFragmentContext::_build_pipeline_tasks_for_instance(
405
        int instance_idx,
406
        const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) {
407
    const auto& local_params = _params.local_params[instance_idx];
408
976k
    auto fragment_instance_id = local_params.fragment_instance_id;
409
976k
    auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false);
410
976k
    std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
411
976k
    auto get_shared_state = [&](PipelinePtr pipeline)
412
976k
            -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
413
976k
                                       std::vector<std::shared_ptr<Dependency>>>> {
414
976k
        std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
415
1.72M
                                std::vector<std::shared_ptr<Dependency>>>>
416
1.72M
                shared_state_map;
417
1.72M
        for (auto& op : pipeline->operators()) {
418
1.72M
            auto source_id = op->operator_id();
419
2.24M
            if (auto iter = _op_id_to_shared_state.find(source_id);
420
2.24M
                iter != _op_id_to_shared_state.end()) {
421
2.24M
                shared_state_map.insert({source_id, iter->second});
422
2.24M
            }
423
691k
        }
424
691k
        for (auto sink_to_source_id : pipeline->sink()->dests_id()) {
425
2.24M
            if (auto iter = _op_id_to_shared_state.find(sink_to_source_id);
426
1.72M
                iter != _op_id_to_shared_state.end()) {
427
1.72M
                shared_state_map.insert({sink_to_source_id, iter->second});
428
1.72M
            }
429
300k
        }
430
300k
        return shared_state_map;
431
1.72M
    };
432
1.72M
433
1.72M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
434
        auto& pipeline = _pipelines[pip_idx];
435
3.08M
        if (pipeline->num_tasks() > 1 || instance_idx == 0) {
436
2.10M
            auto task_runtime_state = RuntimeState::create_unique(
437
2.10M
                    local_params.fragment_instance_id, _params.query_id, _params.fragment_id,
438
1.71M
                    _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get());
439
1.71M
            {
440
1.71M
                // Initialize runtime state for this task
441
1.71M
                task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker());
442
443
1.71M
                task_runtime_state->set_task_execution_context(shared_from_this());
444
                task_runtime_state->set_be_number(local_params.backend_num);
445
1.71M
446
1.71M
                if (_params.__isset.backend_id) {
447
                    task_runtime_state->set_backend_id(_params.backend_id);
448
1.71M
                }
449
1.71M
                if (_params.__isset.import_label) {
450
1.71M
                    task_runtime_state->set_import_label(_params.import_label);
451
1.71M
                }
452
243
                if (_params.__isset.db_name) {
453
243
                    task_runtime_state->set_db_name(_params.db_name);
454
1.71M
                }
455
195
                if (_params.__isset.load_job_id) {
456
195
                    task_runtime_state->set_load_job_id(_params.load_job_id);
457
1.71M
                }
458
0
                if (_params.__isset.wal_id) {
459
0
                    task_runtime_state->set_wal_id(_params.wal_id);
460
1.71M
                }
461
114
                if (_params.__isset.content_length) {
462
114
                    task_runtime_state->set_content_length(_params.content_length);
463
1.71M
                }
464
31
465
31
                task_runtime_state->set_desc_tbl(_desc_tbl);
466
                task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
467
1.71M
                task_runtime_state->set_num_per_fragment_instances(_params.num_senders);
468
1.71M
                task_runtime_state->resize_op_id_to_local_state(max_operator_id());
469
1.71M
                task_runtime_state->set_max_operator_id(max_operator_id());
470
1.71M
                task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node);
471
1.71M
                task_runtime_state->set_total_load_streams(_params.total_load_streams);
472
1.71M
                task_runtime_state->set_num_local_sink(_params.num_local_sink);
473
1.71M
474
1.71M
                task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
475
            }
476
1.71M
            auto cur_task_id = _total_tasks++;
477
1.71M
            task_runtime_state->set_task_id(cur_task_id);
478
1.71M
            task_runtime_state->set_task_num(pipeline->num_tasks());
479
1.71M
            auto task = std::make_shared<PipelineTask>(
480
1.71M
                    pipeline, cur_task_id, task_runtime_state.get(),
481
1.71M
                    std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
482
1.71M
                    pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline),
483
1.71M
                    instance_idx);
484
1.71M
            pipeline->incr_created_tasks(instance_idx, task.get());
485
1.71M
            pipeline_id_to_task.insert({pipeline->id(), task.get()});
486
1.71M
            _tasks[instance_idx].emplace_back(
487
1.71M
                    std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> {
488
1.71M
                            std::move(task), std::move(task_runtime_state)});
489
1.71M
        }
490
1.71M
    }
491
1.71M
492
2.10M
    /**
493
         * Build DAG for pipeline tasks.
494
         * For example, we have
495
         *
496
         *   ExchangeSink (Pipeline1)     JoinBuildSink (Pipeline2)
497
         *            \                      /
498
         *          JoinProbeOperator1 (Pipeline1)    JoinBuildSink (Pipeline3)
499
         *                 \                          /
500
         *               JoinProbeOperator2 (Pipeline1)
501
         *
502
         * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3.
503
         * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and
504
         * `pipeline_id_to_task` is used to find the task by a unique pipeline ID.
505
         *
506
         * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1
507
         * and JoinProbeOperator2.
508
         */
509
    for (auto& _pipeline : _pipelines) {
510
        if (pipeline_id_to_task.contains(_pipeline->id())) {
511
2.11M
            auto* task = pipeline_id_to_task[_pipeline->id()];
512
2.11M
            DCHECK(task != nullptr);
513
1.71M
514
1.71M
            // If this task has upstream dependency, then inject it into this task.
515
            if (_dag.contains(_pipeline->id())) {
516
                auto& deps = _dag[_pipeline->id()];
517
1.71M
                for (auto& dep : deps) {
518
1.13M
                    if (pipeline_id_to_task.contains(dep)) {
519
1.81M
                        auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
520
1.81M
                        if (ss) {
521
1.03M
                            task->inject_shared_state(ss);
522
1.03M
                        } else {
523
433k
                            pipeline_id_to_task[dep]->inject_shared_state(
524
597k
                                    task->get_source_shared_state());
525
597k
                        }
526
597k
                    }
527
597k
                }
528
1.03M
            }
529
1.81M
        }
530
1.13M
    }
531
1.71M
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
532
2.11M
        if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
533
3.08M
            auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
534
2.10M
            DCHECK(pipeline_id_to_profile[pip_idx]);
535
1.71M
            std::vector<TScanRangeParams> scan_ranges;
536
1.71M
            auto node_id = _pipelines[pip_idx]->operators().front()->node_id();
537
1.71M
            if (local_params.per_node_scan_ranges.contains(node_id)) {
538
1.71M
                scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second;
539
1.71M
            }
540
266k
            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id,
541
266k
                                                             _params.fragment.output_sink));
542
1.71M
        }
543
1.71M
    }
544
1.71M
    {
545
2.10M
        std::lock_guard<std::mutex> l(_state_map_lock);
546
974k
        _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr);
547
974k
    }
548
974k
    return Status::OK();
549
974k
}
550
974k
551
976k
Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) {
552
    _total_tasks = 0;
553
331k
    _closed_tasks = 0;
554
331k
    const auto target_size = _params.local_params.size();
555
331k
    _tasks.resize(target_size);
556
331k
    _runtime_filter_mgr_map.resize(target_size);
557
331k
    for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
558
331k
        _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get();
559
856k
    }
560
524k
    auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size());
561
524k
562
331k
    if (target_size > 1 &&
563
        (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
564
331k
         target_size > _runtime_state->query_options().parallel_prepare_threshold)) {
565
331k
        // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads
566
119k
        std::vector<Status> prepare_status(target_size);
567
        int submitted_tasks = 0;
568
22.0k
        Status submit_status;
569
22.0k
        CountDownLatch latch((int)target_size);
570
22.0k
        for (int i = 0; i < target_size; i++) {
571
22.0k
            submit_status = thread_pool->submit_func([&, i]() {
572
261k
                SCOPED_ATTACH_TASK(_query_ctx.get());
573
239k
                prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile);
574
239k
                latch.count_down();
575
239k
            });
576
239k
            if (LIKELY(submit_status.ok())) {
577
239k
                submitted_tasks++;
578
239k
            } else {
579
239k
                break;
580
18.4E
            }
581
18.4E
        }
582
18.4E
        latch.arrive_and_wait(target_size - submitted_tasks);
583
239k
        if (UNLIKELY(!submit_status.ok())) {
584
22.0k
            return submit_status;
585
22.0k
        }
586
0
        for (int i = 0; i < submitted_tasks; i++) {
587
0
            if (!prepare_status[i].ok()) {
588
261k
                return prepare_status[i];
589
239k
            }
590
0
        }
591
0
    } else {
592
239k
        for (int i = 0; i < target_size; i++) {
593
309k
            RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile));
594
1.04M
        }
595
737k
    }
596
737k
    _pipeline_parent_map.clear();
597
309k
    _op_id_to_shared_state.clear();
598
331k
599
331k
    return Status::OK();
600
}
601
331k
602
331k
void PipelineFragmentContext::_init_next_report_time() {
603
    auto interval_s = config::pipeline_status_report_interval;
604
331k
    if (_is_report_success && interval_s > 0 && _timeout > interval_s) {
605
331k
        VLOG_FILE << "enable period report: fragment id=" << _fragment_id;
606
331k
        uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC;
607
31.3k
        // We don't want to wait longer than it takes to run the entire fragment.
608
31.3k
        _previous_report_time =
609
                MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC;
610
31.3k
        _disable_period_report = false;
611
31.3k
    }
612
31.3k
}
613
31.3k
614
331k
void PipelineFragmentContext::refresh_next_report_time() {
615
    auto disable = _disable_period_report.load(std::memory_order_acquire);
616
3.95k
    DCHECK(disable == true);
617
3.95k
    _previous_report_time.store(MonotonicNanos(), std::memory_order_release);
618
3.95k
    _disable_period_report.compare_exchange_strong(disable, false);
619
3.95k
}
620
3.95k
621
3.95k
void PipelineFragmentContext::trigger_report_if_necessary() {
622
    if (!_is_report_success) {
623
5.92M
        return;
624
5.92M
    }
625
5.53M
    auto disable = _disable_period_report.load(std::memory_order_acquire);
626
5.53M
    if (disable) {
627
398k
        return;
628
398k
    }
629
6.33k
    int32_t interval_s = config::pipeline_status_report_interval;
630
6.33k
    if (interval_s <= 0) {
631
391k
        LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not "
632
391k
                        "trigger "
633
0
                        "report.";
634
0
    }
635
0
    uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) +
636
0
                                (uint64_t)(interval_s)*NANOS_PER_SEC;
637
391k
    if (MonotonicNanos() > next_report_time) {
638
391k
        if (!_disable_period_report.compare_exchange_strong(disable, true,
639
391k
                                                            std::memory_order_acq_rel)) {
640
3.95k
            return;
641
3.95k
        }
642
3
        if (VLOG_FILE_IS_ON) {
643
3
            VLOG_FILE << "Reporting "
644
3.95k
                      << "profile for query_id " << print_id(_query_id)
645
0
                      << ", fragment id: " << _fragment_id;
646
0
647
0
            std::stringstream ss;
648
            _runtime_state->runtime_profile()->compute_time_in_profile();
649
0
            _runtime_state->runtime_profile()->pretty_print(&ss);
650
0
            if (_runtime_state->load_channel_profile()) {
651
0
                _runtime_state->load_channel_profile()->pretty_print(&ss);
652
0
            }
653
0
654
0
            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id()
655
                      << " profile:\n"
656
0
                      << ss.str();
657
0
        }
658
0
        auto st = send_report(false);
659
0
        if (!st.ok()) {
660
3.95k
            disable = true;
661
3.95k
            _disable_period_report.compare_exchange_strong(disable, false,
662
0
                                                           std::memory_order_acq_rel);
663
0
        }
664
0
    }
665
0
}
666
3.95k
667
391k
Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs,
668
                                                 OperatorPtr* root, PipelinePtr cur_pipe) {
669
    if (_params.fragment.plan.nodes.empty()) {
670
331k
        throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!");
671
331k
    }
672
0
673
0
    int node_idx = 0;
674
675
331k
    RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr,
676
                                        &node_idx, root, cur_pipe, 0, false, false));
677
331k
678
331k
    if (node_idx + 1 != _params.fragment.plan.nodes.size()) {
679
        return Status::InternalError(
680
331k
                "Plan tree only partially reconstructed. Not all thrift nodes were used.");
681
0
    }
682
0
    return Status::OK();
683
0
}
684
331k
685
331k
Status PipelineFragmentContext::_create_tree_helper(
686
        ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs,
687
        OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx,
688
        const bool followed_by_shuffled_operator, const bool require_bucket_distribution) {
689
    // propagate error case
690
532k
    if (*node_idx >= tnodes.size()) {
691
        return Status::InternalError(
692
532k
                "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}",
693
0
                *node_idx, tnodes.size());
694
0
    }
695
0
    const TPlanNode& tnode = tnodes[*node_idx];
696
0
697
532k
    int num_children = tnodes[*node_idx].num_children;
698
    bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
699
532k
    bool current_require_bucket_distribution = require_bucket_distribution;
700
532k
    // TODO: Create CacheOperator is confused now
701
532k
    OperatorPtr op = nullptr;
702
    OperatorPtr cache_op = nullptr;
703
532k
    RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe,
704
532k
                                     parent == nullptr ? -1 : parent->node_id(), child_idx,
705
532k
                                     followed_by_shuffled_operator,
706
532k
                                     current_require_bucket_distribution, cache_op));
707
532k
    // Initialization must be done here. For example, group by expressions in agg will be used to
708
532k
    // decide if a local shuffle should be planed, so it must be initialized here.
709
    RETURN_IF_ERROR(op->init(tnode, _runtime_state.get()));
710
    // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
711
532k
    if (parent != nullptr) {
712
        // add to parent's child(s)
713
532k
        RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op));
714
    } else {
715
200k
        *root = op;
716
331k
    }
717
331k
    /**
718
331k
     * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
719
     *
720
     * For plan:
721
     * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
722
     *                           Exchange(id=3) -> ShuffledHashJoinBuild(id=2)
723
     * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3).
724
     *
725
     * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a
726
     * shuffled local exchanger will be used before join so it is not followed by shuffle join.
727
     */
728
    auto required_data_distribution =
729
            cur_pipe->operators().empty()
730
532k
                    ? cur_pipe->sink()->required_data_distribution(_runtime_state.get())
731
532k
                    : op->required_data_distribution(_runtime_state.get());
732
532k
    current_followed_by_shuffled_operator =
733
532k
            ((followed_by_shuffled_operator ||
734
532k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator()
735
532k
                                             : op->is_shuffled_operator())) &&
736
532k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
737
472k
            (followed_by_shuffled_operator &&
738
532k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
739
532k
740
421k
    current_require_bucket_distribution =
741
            ((require_bucket_distribution ||
742
532k
              (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator()
743
532k
                                             : op->is_colocated_operator())) &&
744
532k
             Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) ||
745
477k
            (require_bucket_distribution &&
746
532k
             required_data_distribution.distribution_type == ExchangeType::NOOP);
747
532k
748
427k
    if (num_children == 0) {
749
        _use_serial_source = op->is_serial_operator();
750
532k
    }
751
344k
    // rely on that tnodes is preorder of the plan
752
344k
    for (int i = 0; i < num_children; i++) {
753
        ++*node_idx;
754
732k
        RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i,
755
200k
                                            current_followed_by_shuffled_operator,
756
200k
                                            current_require_bucket_distribution));
757
200k
758
200k
        // we are expecting a child, but have used all nodes
759
        // this means we have been given a bad tree and must fail
760
        if (*node_idx >= tnodes.size()) {
761
            return Status::InternalError(
762
200k
                    "Failed to reconstruct plan tree from thrift. Node id: {}, number of "
763
0
                    "nodes: {}",
764
0
                    *node_idx, tnodes.size());
765
0
        }
766
0
    }
767
0
768
200k
    return Status::OK();
769
}
770
532k
771
532k
void PipelineFragmentContext::_inherit_pipeline_properties(
772
        const DataDistribution& data_distribution, PipelinePtr pipe_with_source,
773
        PipelinePtr pipe_with_sink) {
774
    pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks());
775
99.1k
    pipe_with_source->set_num_tasks(_num_instances);
776
99.1k
    pipe_with_source->set_data_distribution(data_distribution);
777
99.1k
}
778
99.1k
779
99.1k
Status PipelineFragmentContext::_add_local_exchange_impl(
780
        int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
781
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
782
        const std::map<int, int>& bucket_seq_to_instance_idx,
783
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
784
    auto& operators = cur_pipe->operators();
785
99.0k
    const auto downstream_pipeline_id = cur_pipe->id();
786
99.0k
    auto local_exchange_id = next_operator_id();
787
99.0k
    // 1. Create a new pipeline with local exchange sink.
788
99.0k
    DataSinkOperatorPtr sink;
789
    auto sink_id = next_sink_operator_id();
790
99.0k
791
99.0k
    /**
792
     * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
793
     * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
794
     */
795
    const bool followed_by_shuffled_operator =
796
            operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
797
99.0k
                                   : cur_pipe->sink()->followed_by_shuffled_operator();
798
99.0k
    const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() &&
799
99.0k
                                         !shuffle_idx_to_instance_idx.contains(-1) &&
800
99.0k
                                         followed_by_shuffled_operator && !_use_serial_source;
801
99.0k
    sink = std::make_shared<LocalExchangeSinkOperatorX>(
802
99.0k
            sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances,
803
99.0k
            data_distribution.partition_exprs, bucket_seq_to_instance_idx);
804
99.0k
    if (bucket_seq_to_instance_idx.empty() &&
805
99.0k
        data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
806
99.0k
        data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE;
807
99.0k
    }
808
4
    RETURN_IF_ERROR(new_pip->set_sink(sink));
809
4
    RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type,
810
99.0k
                                          num_buckets, use_global_hash_shuffle,
811
99.0k
                                          shuffle_idx_to_instance_idx));
812
99.0k
813
99.0k
    // 2. Create and initialize LocalExchangeSharedState.
814
    std::shared_ptr<LocalExchangeSharedState> shared_state =
815
            LocalExchangeSharedState::create_shared(_num_instances);
816
99.0k
    switch (data_distribution.distribution_type) {
817
99.0k
    case ExchangeType::HASH_SHUFFLE:
818
99.0k
        shared_state->exchanger = ShuffleExchanger::create_unique(
819
16.1k
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
820
16.1k
                use_global_hash_shuffle ? _total_instances : _num_instances,
821
16.1k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
822
16.1k
                        ? cast_set<int>(
823
16.1k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
824
16.1k
                        : 0);
825
16.1k
        break;
826
16.1k
    case ExchangeType::BUCKET_HASH_SHUFFLE:
827
16.1k
        shared_state->exchanger = BucketShuffleExchanger::create_unique(
828
514
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets,
829
514
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
830
514
                        ? cast_set<int>(
831
514
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
832
514
                        : 0);
833
514
        break;
834
514
    case ExchangeType::PASSTHROUGH:
835
514
        shared_state->exchanger = PassthroughExchanger::create_unique(
836
78.9k
                cur_pipe->num_tasks(), _num_instances,
837
78.9k
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
838
78.9k
                        ? cast_set<int>(
839
78.9k
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
840
78.9k
                        : 0);
841
78.8k
        break;
842
78.9k
    case ExchangeType::BROADCAST:
843
78.9k
        shared_state->exchanger = BroadcastExchanger::create_unique(
844
492
                cur_pipe->num_tasks(), _num_instances,
845
492
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
846
492
                        ? cast_set<int>(
847
492
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
848
492
                        : 0);
849
492
        break;
850
492
    case ExchangeType::PASS_TO_ONE:
851
492
        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
852
2.11k
            // If shared hash table is enabled for BJ, hash table will be built by only one task
853
2.11k
            shared_state->exchanger = PassToOneExchanger::create_unique(
854
                    cur_pipe->num_tasks(), _num_instances,
855
885
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
856
885
                            ? cast_set<int>(_runtime_state->query_options()
857
885
                                                    .local_exchange_free_blocks_limit)
858
885
                            : 0);
859
885
        } else {
860
885
            shared_state->exchanger = BroadcastExchanger::create_unique(
861
1.22k
                    cur_pipe->num_tasks(), _num_instances,
862
1.22k
                    _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
863
1.22k
                            ? cast_set<int>(_runtime_state->query_options()
864
1.22k
                                                    .local_exchange_free_blocks_limit)
865
1.22k
                            : 0);
866
1.22k
        }
867
1.22k
        break;
868
1.22k
    case ExchangeType::ADAPTIVE_PASSTHROUGH:
869
2.11k
        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
870
928
                std::max(cur_pipe->num_tasks(), _num_instances), _num_instances,
871
928
                _runtime_state->query_options().__isset.local_exchange_free_blocks_limit
872
928
                        ? cast_set<int>(
873
928
                                  _runtime_state->query_options().local_exchange_free_blocks_limit)
874
928
                        : 0);
875
928
        break;
876
928
    default:
877
928
        return Status::InternalError("Unsupported local exchange type : " +
878
0
                                     std::to_string((int)data_distribution.distribution_type));
879
0
    }
880
0
    shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id,
881
99.0k
                                             "LOCAL_EXCHANGE_OPERATOR");
882
99.1k
    shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK");
883
99.1k
    _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}});
884
99.1k
885
99.1k
    // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
886
    // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink].
887
888
    // 3.1 Initialize new pipeline's operator list.
889
    std::copy(operators.begin(), operators.begin() + idx,
890
              std::inserter(new_pip->operators(), new_pip->operators().end()));
891
99.1k
892
99.1k
    // 3.2 Erase unused operators in previous pipeline.
893
    operators.erase(operators.begin(), operators.begin() + idx);
894
895
99.1k
    // 4. Initialize LocalExchangeSource and insert it into this pipeline.
896
    OperatorPtr source_op;
897
    source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id);
898
99.1k
    RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back()));
899
99.1k
    RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type));
900
99.1k
    if (!operators.empty()) {
901
99.1k
        RETURN_IF_ERROR(operators.front()->set_child(nullptr));
902
99.1k
        RETURN_IF_ERROR(operators.front()->set_child(source_op));
903
40.9k
    }
904
40.9k
    operators.insert(operators.begin(), source_op);
905
40.9k
906
99.1k
    // 5. Set children for two pipelines separately.
907
    std::vector<std::shared_ptr<Pipeline>> new_children;
908
    std::vector<PipelineId> edges_with_source;
909
99.1k
    for (auto child : cur_pipe->children()) {
910
99.1k
        bool found = false;
911
117k
        for (auto op : new_pip->operators()) {
912
117k
            if (child->sink()->node_id() == op->node_id()) {
913
131k
                new_pip->set_children(child);
914
131k
                found = true;
915
12.8k
            };
916
12.8k
        }
917
12.8k
        if (!found) {
918
131k
            new_children.push_back(child);
919
117k
            edges_with_source.push_back(child->id());
920
104k
        }
921
104k
    }
922
104k
    new_children.push_back(new_pip);
923
117k
    edges_with_source.push_back(new_pip->id());
924
99.1k
925
99.1k
    // 6. Set DAG for new pipelines.
926
    if (!new_pip->children().empty()) {
927
        std::vector<PipelineId> edges_with_sink;
928
99.1k
        for (auto child : new_pip->children()) {
929
7.12k
            edges_with_sink.push_back(child->id());
930
12.8k
        }
931
12.8k
        _dag.insert({new_pip->id(), edges_with_sink});
932
12.8k
    }
933
7.12k
    cur_pipe->set_children(new_children);
934
7.12k
    _dag[downstream_pipeline_id] = edges_with_source;
935
99.1k
    RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back()));
936
99.1k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr));
937
99.1k
    RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back()));
938
99.1k
939
99.1k
    // 7. Inherit properties from current pipeline.
940
    _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip);
941
    return Status::OK();
942
99.1k
}
943
99.1k
944
99.1k
Status PipelineFragmentContext::_add_local_exchange(
945
        int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe,
946
        DataDistribution data_distribution, bool* do_local_exchange, int num_buckets,
947
        const std::map<int, int>& bucket_seq_to_instance_idx,
948
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
949
    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
950
165k
        return Status::OK();
951
165k
    }
952
37.9k
953
37.9k
    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
954
        return Status::OK();
955
127k
    }
956
45.4k
    *do_local_exchange = true;
957
45.4k
958
82.4k
    auto& operators = cur_pipe->operators();
959
    auto total_op_num = operators.size();
960
82.4k
    auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
961
82.4k
    RETURN_IF_ERROR(_add_local_exchange_impl(
962
82.4k
            idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets,
963
82.4k
            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
964
82.4k
965
82.4k
    CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size())
966
            << "total_op_num: " << total_op_num
967
18.4E
            << " cur_pipe->operators().size(): " << cur_pipe->operators().size()
968
18.4E
            << " new_pip->operators().size(): " << new_pip->operators().size();
969
18.4E
970
18.4E
    // There are some local shuffles with relatively heavy operations on the sink.
971
    // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck.
972
    // Therefore, local passthrough is used to increase the concurrency of the sink.
973
    // op -> local sink(1) -> local source (n)
974
    // op -> local passthrough(1) -> local passthrough(n) ->  local sink(n) -> local source (n)
975
    if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 &&
976
        Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) {
977
82.5k
        RETURN_IF_ERROR(_add_local_exchange_impl(
978
82.4k
                cast_set<int>(new_pip->operators().size()), pool, new_pip,
979
16.4k
                add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH),
980
16.4k
                do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
981
16.4k
                shuffle_idx_to_instance_idx));
982
16.4k
    }
983
16.4k
    return Status::OK();
984
16.4k
}
985
82.4k
986
82.4k
Status PipelineFragmentContext::_plan_local_exchange(
987
        int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx,
988
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
989
    for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) {
990
329k
        _pipelines[pip_idx]->init_data_distribution(_runtime_state.get());
991
753k
        // Set property if child pipeline is not join operator's child.
992
424k
        if (!_pipelines[pip_idx]->children().empty()) {
993
            for (auto& child : _pipelines[pip_idx]->children()) {
994
424k
                if (child->sink()->node_id() ==
995
92.0k
                    _pipelines[pip_idx]->operators().front()->node_id()) {
996
92.0k
                    _pipelines[pip_idx]->set_data_distribution(child->data_distribution());
997
92.0k
                }
998
80.6k
            }
999
80.6k
        }
1000
92.0k
1001
88.3k
        // if 'num_buckets == 0' means the fragment is colocated by exchange node not the
1002
        // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0
1003
        // still keep colocate plan after local shuffle
1004
        RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
1005
                                             bucket_seq_to_instance_idx,
1006
424k
                                             shuffle_idx_to_instance_idx));
1007
424k
    }
1008
424k
    return Status::OK();
1009
424k
}
1010
329k
1011
329k
Status PipelineFragmentContext::_plan_local_exchange(
1012
        int num_buckets, int pip_idx, PipelinePtr pip,
1013
        const std::map<int, int>& bucket_seq_to_instance_idx,
1014
        const std::map<int, int>& shuffle_idx_to_instance_idx) {
1015
    int idx = 1;
1016
423k
    bool do_local_exchange = false;
1017
423k
    do {
1018
423k
        auto& ops = pip->operators();
1019
464k
        do_local_exchange = false;
1020
464k
        // Plan local exchange for each operator.
1021
464k
        for (; idx < ops.size();) {
1022
            if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1023
532k
                RETURN_IF_ERROR(_add_local_exchange(
1024
108k
                        pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
1025
102k
                        ops[idx]->required_data_distribution(_runtime_state.get()),
1026
102k
                        &do_local_exchange, num_buckets, bucket_seq_to_instance_idx,
1027
102k
                        shuffle_idx_to_instance_idx));
1028
102k
            }
1029
102k
            if (do_local_exchange) {
1030
102k
                // If local exchange is needed for current operator, we will split this pipeline to
1031
108k
                // two pipelines by local exchange sink/source. And then we need to process remaining
1032
                // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
1033
                // is current operator was already processed) and continue to plan local exchange.
1034
                idx = 2;
1035
                break;
1036
41.0k
            }
1037
41.0k
            idx++;
1038
41.0k
        }
1039
67.6k
    } while (do_local_exchange);
1040
67.6k
    if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) {
1041
464k
        RETURN_IF_ERROR(_add_local_exchange(
1042
423k
                pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip,
1043
63.5k
                pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange,
1044
63.5k
                num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
1045
63.5k
    }
1046
63.5k
    return Status::OK();
1047
63.5k
}
1048
423k
1049
423k
Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
1050
                                                  const std::vector<TExpr>& output_exprs,
1051
                                                  const TPipelineFragmentParams& params,
1052
                                                  const RowDescriptor& row_desc,
1053
                                                  RuntimeState* state, DescriptorTbl& desc_tbl,
1054
                                                  PipelineId cur_pipeline_id) {
1055
    switch (thrift_sink.type) {
1056
332k
    case TDataSinkType::DATA_STREAM_SINK: {
1057
332k
        if (!thrift_sink.__isset.stream_sink) {
1058
116k
            return Status::InternalError("Missing data stream sink.");
1059
116k
        }
1060
0
        _sink = std::make_shared<ExchangeSinkOperatorX>(
1061
0
                state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink,
1062
116k
                params.destinations, _fragment_instance_ids);
1063
116k
        break;
1064
116k
    }
1065
116k
    case TDataSinkType::RESULT_SINK: {
1066
116k
        if (!thrift_sink.__isset.result_sink) {
1067
186k
            return Status::InternalError("Missing data buffer sink.");
1068
186k
        }
1069
0
1070
0
        _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), row_desc,
1071
                                                      output_exprs, thrift_sink.result_sink);
1072
186k
        break;
1073
186k
    }
1074
186k
    case TDataSinkType::DICTIONARY_SINK: {
1075
186k
        if (!thrift_sink.__isset.dictionary_sink) {
1076
105
            return Status::InternalError("Missing dict sink.");
1077
105
        }
1078
0
1079
0
        _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs,
1080
                                                    thrift_sink.dictionary_sink);
1081
105
        break;
1082
105
    }
1083
105
    case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
1084
105
    case TDataSinkType::OLAP_TABLE_SINK: {
1085
0
        if (state->query_options().enable_memtable_on_sink_node &&
1086
28.0k
            !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) &&
1087
28.0k
            !config::is_cloud_mode()) {
1088
28.0k
            _sink = std::make_shared<OlapTableSinkV2OperatorX>(pool, next_sink_operator_id(),
1089
28.0k
                                                               row_desc, output_exprs);
1090
56
        } else {
1091
56
            _sink = std::make_shared<OlapTableSinkOperatorX>(pool, next_sink_operator_id(),
1092
28.0k
                                                             row_desc, output_exprs);
1093
28.0k
        }
1094
28.0k
        break;
1095
28.0k
    }
1096
28.0k
    case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
1097
0
        DCHECK(thrift_sink.__isset.olap_table_sink);
1098
165
        DCHECK(state->get_query_ctx() != nullptr);
1099
165
        state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true;
1100
165
        _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc,
1101
165
                                                                output_exprs);
1102
165
        break;
1103
165
    }
1104
165
    case TDataSinkType::HIVE_TABLE_SINK: {
1105
0
        if (!thrift_sink.__isset.hive_table_sink) {
1106
0
            return Status::InternalError("Missing hive table sink.");
1107
0
        }
1108
0
        _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1109
0
                                                         output_exprs);
1110
0
        break;
1111
0
    }
1112
0
    case TDataSinkType::ICEBERG_TABLE_SINK: {
1113
0
        if (!thrift_sink.__isset.iceberg_table_sink) {
1114
0
            return Status::InternalError("Missing iceberg table sink.");
1115
0
        }
1116
0
        if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
1117
0
            _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1118
0
                                                                     row_desc, output_exprs);
1119
0
        } else {
1120
0
            _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(),
1121
0
                                                                row_desc, output_exprs);
1122
0
        }
1123
0
        break;
1124
0
    }
1125
0
    case TDataSinkType::ICEBERG_DELETE_SINK: {
1126
0
        if (!thrift_sink.__isset.iceberg_delete_sink) {
1127
0
            return Status::InternalError("Missing iceberg delete sink.");
1128
0
        }
1129
0
        _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(),
1130
0
                                                             row_desc, output_exprs);
1131
0
        break;
1132
0
    }
1133
0
    case TDataSinkType::ICEBERG_MERGE_SINK: {
1134
0
        if (!thrift_sink.__isset.iceberg_merge_sink) {
1135
0
            return Status::InternalError("Missing iceberg merge sink.");
1136
0
        }
1137
0
        _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1138
0
                                                            output_exprs);
1139
0
        break;
1140
0
    }
1141
0
    case TDataSinkType::MAXCOMPUTE_TABLE_SINK: {
1142
0
        if (!thrift_sink.__isset.max_compute_table_sink) {
1143
0
            return Status::InternalError("Missing max compute table sink.");
1144
0
        }
1145
0
        _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1146
0
                                                       output_exprs);
1147
0
        break;
1148
0
    }
1149
0
    case TDataSinkType::JDBC_TABLE_SINK: {
1150
0
        if (!thrift_sink.__isset.jdbc_table_sink) {
1151
0
            return Status::InternalError("Missing data jdbc sink.");
1152
0
        }
1153
0
        if (config::enable_java_support) {
1154
0
            _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(),
1155
0
                                                             output_exprs);
1156
0
        } else {
1157
0
            return Status::InternalError(
1158
0
                    "Jdbc table sink is not enabled, you can change be config "
1159
0
                    "enable_java_support to true and restart be.");
1160
0
        }
1161
0
        break;
1162
0
    }
1163
0
    case TDataSinkType::MEMORY_SCRATCH_SINK: {
1164
0
        if (!thrift_sink.__isset.memory_scratch_sink) {
1165
3
            return Status::InternalError("Missing data buffer sink.");
1166
3
        }
1167
0
1168
0
        _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(),
1169
                                                             output_exprs);
1170
3
        break;
1171
3
    }
1172
3
    case TDataSinkType::RESULT_FILE_SINK: {
1173
3
        if (!thrift_sink.__isset.result_file_sink) {
1174
338
            return Status::InternalError("Missing result file sink.");
1175
338
        }
1176
0
1177
0
        // Result file sink is not the top sink
1178
        if (params.__isset.destinations && !params.destinations.empty()) {
1179
            _sink = std::make_shared<ResultFileSinkOperatorX>(
1180
338
                    next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
1181
0
                    params.destinations, output_exprs, desc_tbl);
1182
0
        } else {
1183
0
            _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc,
1184
338
                                                              output_exprs);
1185
338
        }
1186
338
        break;
1187
338
    }
1188
338
    case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
1189
338
        DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
1190
645
        DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
1191
645
        auto sink_id = next_sink_operator_id();
1192
645
        const int multi_cast_node_id = sink_id;
1193
645
        auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
1194
645
        // one sink has multiple sources.
1195
645
        std::vector<int> sources;
1196
        for (int i = 0; i < sender_size; ++i) {
1197
645
            auto source_id = next_operator_id();
1198
2.36k
            sources.push_back(source_id);
1199
1.71k
        }
1200
1.71k
1201
1.71k
        _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>(
1202
                sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink);
1203
645
        for (int i = 0; i < sender_size; ++i) {
1204
645
            auto new_pipeline = add_pipeline();
1205
2.36k
            // use to exchange sink
1206
1.71k
            RowDescriptor* exchange_row_desc = nullptr;
1207
            {
1208
1.71k
                const auto& tmp_row_desc =
1209
1.71k
                        !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
1210
1.71k
                                ? RowDescriptor(state->desc_tbl(),
1211
1.71k
                                                {thrift_sink.multi_cast_stream_sink.sinks[i]
1212
1.71k
                                                         .output_tuple_id})
1213
1.71k
                                : row_desc;
1214
1.71k
                exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc));
1215
1.71k
            }
1216
1.71k
            auto source_id = sources[i];
1217
1.71k
            OperatorPtr source_op;
1218
1.71k
            // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline
1219
1.71k
            source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>(
1220
                    /*node_id*/ source_id, /*consumer_id*/ i, pool,
1221
1.71k
                    thrift_sink.multi_cast_stream_sink.sinks[i], row_desc,
1222
1.71k
                    /*operator_id=*/source_id);
1223
1.71k
            RETURN_IF_ERROR(new_pipeline->add_operator(
1224
1.71k
                    source_op, params.__isset.parallel_instances ? params.parallel_instances : 0));
1225
1.71k
            // 2. create and set sink operator of data stream sender for new pipeline
1226
1.71k
1227
            DataSinkOperatorPtr sink_op;
1228
            sink_op = std::make_shared<ExchangeSinkOperatorX>(
1229
1.71k
                    state, *exchange_row_desc, next_sink_operator_id(),
1230
1.71k
                    thrift_sink.multi_cast_stream_sink.sinks[i],
1231
1.71k
                    thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids);
1232
1.71k
1233
1.71k
            RETURN_IF_ERROR(new_pipeline->set_sink(sink_op));
1234
            {
1235
1.71k
                TDataSink* t = pool->add(new TDataSink());
1236
1.71k
                t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
1237
1.71k
                RETURN_IF_ERROR(sink_op->init(*t));
1238
1.71k
            }
1239
1.71k
1240
1.71k
            // 3. set dependency dag
1241
            _dag[new_pipeline->id()].push_back(cur_pipeline_id);
1242
        }
1243
1.71k
        if (sources.empty()) {
1244
1.71k
            return Status::InternalError("size of sources must be greater than 0");
1245
645
        }
1246
0
        break;
1247
0
    }
1248
645
    case TDataSinkType::BLACKHOLE_SINK: {
1249
645
        if (!thrift_sink.__isset.blackhole_sink) {
1250
645
            return Status::InternalError("Missing blackhole sink.");
1251
3
        }
1252
0
1253
0
        _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id()));
1254
        break;
1255
3
    }
1256
3
    case TDataSinkType::TVF_TABLE_SINK: {
1257
3
        if (!thrift_sink.__isset.tvf_table_sink) {
1258
0
            return Status::InternalError("Missing TVF table sink.");
1259
0
        }
1260
0
        _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc,
1261
0
                                                        output_exprs);
1262
0
        break;
1263
0
    }
1264
0
    default:
1265
0
        return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
1266
0
    }
1267
0
    return Status::OK();
1268
332k
}
1269
332k
1270
332k
// NOLINTBEGIN(readability-function-size)
1271
// NOLINTBEGIN(readability-function-cognitive-complexity)
1272
Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode,
1273
                                                 const DescriptorTbl& descs, OperatorPtr& op,
1274
                                                 PipelinePtr& cur_pipe, int parent_idx,
1275
                                                 int child_idx,
1276
                                                 const bool followed_by_shuffled_operator,
1277
                                                 const bool require_bucket_distribution,
1278
                                                 OperatorPtr& cache_op) {
1279
    std::vector<DataSinkOperatorPtr> sink_ops;
1280
533k
    Defer defer = Defer([&]() {
1281
533k
        if (op) {
1282
533k
            op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1283
532k
        }
1284
532k
        for (auto& s : sink_ops) {
1285
532k
            s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution);
1286
532k
        }
1287
91.7k
    });
1288
91.7k
    // We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
1289
532k
    // Therefore, here we need to use a stack-like structure.
1290
    _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
1291
    std::stringstream error_msg;
1292
533k
    bool enable_query_cache = _params.fragment.__isset.query_cache_param;
1293
533k
1294
533k
    bool fe_with_old_version = false;
1295
    switch (tnode.node_type) {
1296
533k
    case TPlanNodeType::OLAP_SCAN_NODE: {
1297
533k
        op = std::make_shared<OlapScanOperatorX>(
1298
167k
                pool, tnode, next_operator_id(), descs, _num_instances,
1299
167k
                enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {});
1300
167k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1301
167k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1302
167k
        break;
1303
167k
    }
1304
167k
    case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
1305
167k
        DCHECK(_query_ctx != nullptr);
1306
81
        _query_ctx->query_mem_tracker()->is_group_commit_load = true;
1307
81
        op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs,
1308
81
                                                    _num_instances);
1309
81
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1310
81
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1311
81
        break;
1312
81
    }
1313
81
    case TPlanNodeType::JDBC_SCAN_NODE: {
1314
81
        if (config::enable_java_support) {
1315
0
            op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs,
1316
0
                                                     _num_instances);
1317
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1318
0
        } else {
1319
0
            return Status::InternalError(
1320
0
                    "Jdbc scan node is disabled, you can change be config enable_java_support "
1321
0
                    "to true and restart be.");
1322
0
        }
1323
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1324
0
        break;
1325
0
    }
1326
0
    case TPlanNodeType::FILE_SCAN_NODE: {
1327
0
        op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs,
1328
2.68k
                                                 _num_instances);
1329
2.68k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1330
2.68k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1331
2.68k
        break;
1332
2.68k
    }
1333
2.68k
    case TPlanNodeType::ES_SCAN_NODE:
1334
2.68k
    case TPlanNodeType::ES_HTTP_SCAN_NODE: {
1335
0
        op = std::make_shared<EsScanOperatorX>(pool, tnode, next_operator_id(), descs,
1336
0
                                               _num_instances);
1337
0
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1338
0
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1339
0
        break;
1340
0
    }
1341
0
    case TPlanNodeType::EXCHANGE_NODE: {
1342
0
        int num_senders = _params.per_exch_num_senders.contains(tnode.node_id)
1343
116k
                                  ? _params.per_exch_num_senders.find(tnode.node_id)->second
1344
116k
                                  : 0;
1345
116k
        DCHECK_GT(num_senders, 0);
1346
18.4E
        op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs,
1347
116k
                                                       num_senders);
1348
116k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1349
116k
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1350
116k
        break;
1351
116k
    }
1352
116k
    case TPlanNodeType::AGGREGATION_NODE: {
1353
116k
        if (tnode.agg_node.grouping_exprs.empty() &&
1354
137k
            descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) {
1355
137k
            return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) +
1356
137k
                                         ": group by and output is empty");
1357
0
        }
1358
0
        bool need_create_cache_op =
1359
0
                enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id;
1360
137k
        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
1361
137k
            auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first;
1362
137k
            auto cache_source_id = next_operator_id();
1363
24
            op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id,
1364
24
                                                        _params.fragment.query_cache_param);
1365
24
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1366
24
1367
24
            const auto downstream_pipeline_id = cur_pipe->id();
1368
            if (!_dag.contains(downstream_pipeline_id)) {
1369
24
                _dag.insert({downstream_pipeline_id, {}});
1370
24
            }
1371
24
            new_pipe = add_pipeline(cur_pipe);
1372
24
            _dag[downstream_pipeline_id].push_back(new_pipe->id());
1373
24
1374
24
            DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX(
1375
                    next_sink_operator_id(), op->node_id(), op->operator_id()));
1376
24
            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
1377
24
            return Status::OK();
1378
24
        };
1379
24
        const bool group_by_limit_opt =
1380
24
                tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0;
1381
137k
1382
137k
        /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet.
1383
        /// If `group_by_limit_opt` is true, then it might not need to spill at all.
1384
        const bool enable_spill = _runtime_state->enable_spill() &&
1385
                                  !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt;
1386
137k
        const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation &&
1387
137k
                                      tnode.agg_node.use_streaming_preaggregation &&
1388
137k
                                      !tnode.agg_node.grouping_exprs.empty();
1389
137k
        // TODO: distinct streaming agg does not support spill.
1390
137k
        const bool can_use_distinct_streaming_agg =
1391
                (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() &&
1392
137k
                !tnode.agg_node.__isset.agg_sort_info_by_group_key &&
1393
137k
                _params.query_options.__isset.enable_distinct_streaming_aggregation &&
1394
137k
                _params.query_options.enable_distinct_streaming_aggregation;
1395
137k
1396
137k
        if (can_use_distinct_streaming_agg) {
1397
            if (need_create_cache_op) {
1398
137k
                PipelinePtr new_pipe;
1399
93.1k
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1400
8
1401
8
                cache_op = op;
1402
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1403
8
                                                                     tnode, descs);
1404
8
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1405
8
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1406
8
                cur_pipe = new_pipe;
1407
8
            } else {
1408
8
                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(),
1409
93.1k
                                                                     tnode, descs);
1410
93.1k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1411
93.1k
            }
1412
93.1k
        } else if (is_streaming_agg) {
1413
93.1k
            if (need_create_cache_op) {
1414
93.1k
                PipelinePtr new_pipe;
1415
1.75k
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1416
5
                cache_op = op;
1417
5
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1418
5
                                                             descs);
1419
5
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1420
5
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1421
5
                cur_pipe = new_pipe;
1422
5
            } else {
1423
5
                op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode,
1424
1.75k
                                                             descs);
1425
1.75k
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1426
1.75k
            }
1427
1.75k
        } else {
1428
1.75k
            // create new pipeline to add query cache operator
1429
42.7k
            PipelinePtr new_pipe;
1430
            if (need_create_cache_op) {
1431
42.7k
                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
1432
42.7k
                cache_op = op;
1433
11
            }
1434
11
1435
11
            if (enable_spill) {
1436
                op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode,
1437
42.7k
                                                                     next_operator_id(), descs);
1438
163
            } else {
1439
163
                op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1440
42.6k
            }
1441
42.6k
            if (need_create_cache_op) {
1442
42.6k
                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
1443
42.7k
                RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances));
1444
11
                cur_pipe = new_pipe;
1445
11
            } else {
1446
11
                RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1447
42.7k
            }
1448
42.7k
1449
42.7k
            const auto downstream_pipeline_id = cur_pipe->id();
1450
            if (!_dag.contains(downstream_pipeline_id)) {
1451
42.7k
                _dag.insert({downstream_pipeline_id, {}});
1452
42.7k
            }
1453
42.1k
            cur_pipe = add_pipeline(cur_pipe);
1454
42.1k
            _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1455
42.7k
1456
42.7k
            if (enable_spill) {
1457
                sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
1458
42.7k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1459
163
            } else {
1460
163
                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
1461
42.6k
                        pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1462
42.6k
            }
1463
42.6k
            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1464
42.6k
            RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1465
42.7k
        }
1466
42.7k
        break;
1467
42.7k
    }
1468
137k
    case TPlanNodeType::HASH_JOIN_NODE: {
1469
137k
        const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join &&
1470
137k
                                       tnode.hash_join_node.is_broadcast_join;
1471
9.20k
        const auto enable_spill = _runtime_state->enable_spill();
1472
9.20k
        if (enable_spill && !is_broadcast_join) {
1473
9.20k
            auto tnode_ = tnode;
1474
9.20k
            tnode_.runtime_filters.clear();
1475
0
            auto inner_probe_operator =
1476
0
                    std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs);
1477
0
1478
0
            // probe side inner sink operator is used to build hash table on probe side when data is spilled.
1479
            // So here use `tnode_` which has no runtime filters.
1480
            auto probe_side_inner_sink_operator =
1481
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs);
1482
0
1483
0
            RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get()));
1484
            RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get()));
1485
0
1486
0
            auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>(
1487
                    pool, tnode_, next_operator_id(), descs);
1488
0
            probe_operator->set_inner_operators(probe_side_inner_sink_operator,
1489
0
                                                inner_probe_operator);
1490
0
            op = std::move(probe_operator);
1491
0
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1492
0
1493
0
            const auto downstream_pipeline_id = cur_pipe->id();
1494
            if (!_dag.contains(downstream_pipeline_id)) {
1495
0
                _dag.insert({downstream_pipeline_id, {}});
1496
0
            }
1497
0
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1498
0
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1499
0
1500
0
            auto inner_sink_operator =
1501
                    std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs);
1502
0
            auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>(
1503
0
                    pool, next_sink_operator_id(), op->operator_id(), tnode_, descs);
1504
0
            RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get()));
1505
0
1506
0
            sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator);
1507
            sink_ops.push_back(std::move(sink_operator));
1508
0
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1509
0
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get()));
1510
0
1511
0
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1512
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1513
0
        } else {
1514
0
            op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1515
9.20k
            RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1516
9.20k
1517
9.20k
            const auto downstream_pipeline_id = cur_pipe->id();
1518
            if (!_dag.contains(downstream_pipeline_id)) {
1519
9.20k
                _dag.insert({downstream_pipeline_id, {}});
1520
9.20k
            }
1521
7.46k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1522
7.46k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1523
9.20k
1524
9.20k
            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
1525
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1526
9.20k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1527
9.20k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1528
9.20k
1529
9.20k
            _pipeline_parent_map.push(op->node_id(), cur_pipe);
1530
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1531
9.20k
        }
1532
9.20k
        if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) {
1533
9.20k
            std::shared_ptr<HashJoinSharedState> shared_state =
1534
9.20k
                    HashJoinSharedState::create_shared(_num_instances);
1535
2.47k
            for (int i = 0; i < _num_instances; i++) {
1536
2.47k
                auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(),
1537
15.2k
                                                             "HASH_JOIN_BUILD_DEPENDENCY");
1538
12.8k
                sink_dep->set_shared_state(shared_state.get());
1539
12.8k
                shared_state->sink_deps.push_back(sink_dep);
1540
12.8k
            }
1541
12.8k
            shared_state->create_source_dependencies(_num_instances, op->operator_id(),
1542
12.8k
                                                     op->node_id(), "HASH_JOIN_PROBE");
1543
2.47k
            _op_id_to_shared_state.insert(
1544
2.47k
                    {op->operator_id(), {shared_state, shared_state->sink_deps}});
1545
2.47k
        }
1546
2.47k
        break;
1547
2.47k
    }
1548
9.20k
    case TPlanNodeType::CROSS_JOIN_NODE: {
1549
9.20k
        op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs);
1550
2.18k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1551
2.18k
1552
2.18k
        const auto downstream_pipeline_id = cur_pipe->id();
1553
        if (!_dag.contains(downstream_pipeline_id)) {
1554
2.18k
            _dag.insert({downstream_pipeline_id, {}});
1555
2.18k
        }
1556
1.93k
        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1557
1.93k
        _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1558
2.18k
1559
2.18k
        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
1560
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1561
2.18k
        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1562
2.18k
        RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1563
2.18k
        _pipeline_parent_map.push(op->node_id(), cur_pipe);
1564
2.18k
        _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1565
2.18k
        break;
1566
2.18k
    }
1567
2.18k
    case TPlanNodeType::UNION_NODE: {
1568
2.18k
        int child_count = tnode.num_children;
1569
49.0k
        op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1570
49.0k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1571
49.0k
1572
49.0k
        const auto downstream_pipeline_id = cur_pipe->id();
1573
        if (!_dag.contains(downstream_pipeline_id)) {
1574
49.0k
            _dag.insert({downstream_pipeline_id, {}});
1575
49.0k
        }
1576
48.8k
        for (int i = 0; i < child_count; i++) {
1577
48.8k
            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
1578
50.1k
            _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
1579
1.12k
            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
1580
1.12k
                    i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1581
1.12k
            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
1582
1.12k
            RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
1583
1.12k
            // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1584
1.12k
            _pipeline_parent_map.push(op->node_id(), build_side_pipe);
1585
        }
1586
1.12k
        break;
1587
1.12k
    }
1588
49.0k
    case TPlanNodeType::SORT_NODE: {
1589
49.0k
        const auto should_spill = _runtime_state->enable_spill() &&
1590
49.0k
                                  tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT;
1591
33.9k
        const bool use_local_merge =
1592
33.9k
                tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge;
1593
33.9k
        if (should_spill) {
1594
33.9k
            op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1595
33.9k
        } else if (use_local_merge) {
1596
9
            op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(),
1597
33.9k
                                                                 descs);
1598
31.9k
        } else {
1599
31.9k
            op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1600
31.9k
        }
1601
2.02k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1602
2.02k
1603
33.9k
        const auto downstream_pipeline_id = cur_pipe->id();
1604
        if (!_dag.contains(downstream_pipeline_id)) {
1605
33.9k
            _dag.insert({downstream_pipeline_id, {}});
1606
33.9k
        }
1607
33.9k
        cur_pipe = add_pipeline(cur_pipe);
1608
33.9k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1609
33.9k
1610
33.9k
        if (should_spill) {
1611
            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
1612
33.9k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1613
9
        } else {
1614
9
            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
1615
33.9k
                    pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1616
33.9k
        }
1617
33.9k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1618
33.9k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1619
33.9k
        break;
1620
33.9k
    }
1621
33.9k
    case TPlanNodeType::PARTITION_SORT_NODE: {
1622
33.9k
        op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1623
33.9k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1624
65
1625
65
        const auto downstream_pipeline_id = cur_pipe->id();
1626
        if (!_dag.contains(downstream_pipeline_id)) {
1627
65
            _dag.insert({downstream_pipeline_id, {}});
1628
65
        }
1629
65
        cur_pipe = add_pipeline(cur_pipe);
1630
65
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1631
65
1632
65
        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
1633
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1634
65
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1635
65
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1636
65
        break;
1637
65
    }
1638
65
    case TPlanNodeType::ANALYTIC_EVAL_NODE: {
1639
65
        op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1640
1.76k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1641
1.76k
1642
1.76k
        const auto downstream_pipeline_id = cur_pipe->id();
1643
        if (!_dag.contains(downstream_pipeline_id)) {
1644
1.76k
            _dag.insert({downstream_pipeline_id, {}});
1645
1.76k
        }
1646
1.75k
        cur_pipe = add_pipeline(cur_pipe);
1647
1.75k
        _dag[downstream_pipeline_id].push_back(cur_pipe->id());
1648
1.76k
1649
1.76k
        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
1650
                pool, next_sink_operator_id(), op->operator_id(), tnode, descs));
1651
1.76k
        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
1652
1.76k
        RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
1653
1.76k
        break;
1654
1.76k
    }
1655
1.76k
    case TPlanNodeType::MATERIALIZATION_NODE: {
1656
1.76k
        op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs);
1657
1.76k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1658
684
        break;
1659
684
    }
1660
684
    case TPlanNodeType::INTERSECT_NODE: {
1661
684
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op,
1662
684
                                                                      cur_pipe, sink_ops));
1663
119
        break;
1664
119
    }
1665
119
    case TPlanNodeType::EXCEPT_NODE: {
1666
119
        RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op,
1667
129
                                                                       cur_pipe, sink_ops));
1668
129
        break;
1669
129
    }
1670
129
    case TPlanNodeType::REPEAT_NODE: {
1671
129
        op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs);
1672
328
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1673
328
        break;
1674
328
    }
1675
328
    case TPlanNodeType::TABLE_FUNCTION_NODE: {
1676
328
        op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs);
1677
923
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1678
923
        break;
1679
923
    }
1680
923
    case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
1681
923
        op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs);
1682
923
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1683
18
        break;
1684
18
    }
1685
18
    case TPlanNodeType::EMPTY_SET_NODE: {
1686
18
        op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1687
1.48k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1688
1.48k
        break;
1689
1.48k
    }
1690
1.48k
    case TPlanNodeType::DATA_GEN_SCAN_NODE: {
1691
1.48k
        op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs);
1692
1.48k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1693
265
        fe_with_old_version = !tnode.__isset.is_serial_operator;
1694
265
        break;
1695
265
    }
1696
265
    case TPlanNodeType::SCHEMA_SCAN_NODE: {
1697
265
        op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1698
1.54k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1699
1.54k
        break;
1700
1.54k
    }
1701
1.54k
    case TPlanNodeType::META_SCAN_NODE: {
1702
1.54k
        op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs);
1703
5.16k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1704
5.16k
        break;
1705
5.16k
    }
1706
5.16k
    case TPlanNodeType::SELECT_NODE: {
1707
5.16k
        op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs);
1708
5.16k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1709
585
        break;
1710
585
    }
1711
585
    case TPlanNodeType::REC_CTE_NODE: {
1712
585
        op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs);
1713
585
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1714
151
1715
151
        const auto downstream_pipeline_id = cur_pipe->id();
1716
        if (!_dag.contains(downstream_pipeline_id)) {
1717
151
            _dag.insert({downstream_pipeline_id, {}});
1718
151
        }
1719
148
1720
148
        PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe);
1721
        _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id());
1722
151
1723
151
        DataSinkOperatorPtr anchor_sink;
1724
        anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(),
1725
151
                                                                  op->operator_id(), tnode, descs);
1726
151
        RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink));
1727
151
        RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get()));
1728
151
        _pipeline_parent_map.push(op->node_id(), anchor_side_pipe);
1729
151
1730
151
        PipelinePtr rec_side_pipe = add_pipeline(cur_pipe);
1731
        _dag[downstream_pipeline_id].push_back(rec_side_pipe->id());
1732
151
1733
151
        DataSinkOperatorPtr rec_sink;
1734
        rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(),
1735
151
                                                         tnode, descs);
1736
151
        RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink));
1737
151
        RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get()));
1738
151
        _pipeline_parent_map.push(op->node_id(), rec_side_pipe);
1739
151
1740
151
        break;
1741
    }
1742
151
    case TPlanNodeType::REC_CTE_SCAN_NODE: {
1743
151
        op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs);
1744
1.95k
        RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1745
1.95k
        break;
1746
1.95k
    }
1747
1.95k
    default:
1748
1.95k
        return Status::InternalError("Unsupported exec type in pipeline: {}",
1749
1.95k
                                     print_plan_node_type(tnode.node_type));
1750
0
    }
1751
0
    if (_params.__isset.parallel_instances && fe_with_old_version) {
1752
533k
        cur_pipe->set_num_tasks(_params.parallel_instances);
1753
532k
        op->set_serial_operator();
1754
0
    }
1755
0
1756
0
    return Status::OK();
1757
}
1758
532k
// NOLINTEND(readability-function-cognitive-complexity)
1759
533k
// NOLINTEND(readability-function-size)
1760
1761
template <bool is_intersect>
1762
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
1763
        ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op,
1764
        PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) {
1765
    op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs));
1766
248
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1767
248
1768
248
    const auto downstream_pipeline_id = cur_pipe->id();
1769
    if (!_dag.contains(downstream_pipeline_id)) {
1770
248
        _dag.insert({downstream_pipeline_id, {}});
1771
248
    }
1772
231
1773
231
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1774
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1775
837
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1776
589
1777
589
        if (child_id == 0) {
1778
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1779
589
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1780
248
        } else {
1781
248
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1782
341
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1783
341
        }
1784
341
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1785
341
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1786
589
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1787
589
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1788
    }
1789
589
1790
589
    return Status::OK();
1791
}
1792
248
1793
248
Status PipelineFragmentContext::submit() {
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb1EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1766
119
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1767
119
1768
119
    const auto downstream_pipeline_id = cur_pipe->id();
1769
    if (!_dag.contains(downstream_pipeline_id)) {
1770
119
        _dag.insert({downstream_pipeline_id, {}});
1771
119
    }
1772
110
1773
110
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1774
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1775
435
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1776
316
1777
316
        if (child_id == 0) {
1778
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1779
316
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1780
119
        } else {
1781
119
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1782
197
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1783
197
        }
1784
197
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1785
197
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1786
316
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1787
316
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1788
    }
1789
316
1790
316
    return Status::OK();
1791
}
1792
119
1793
119
Status PipelineFragmentContext::submit() {
_ZN5doris23PipelineFragmentContext39_build_operators_for_set_operation_nodeILb0EEENS_6StatusEPNS_10ObjectPoolERKNS_9TPlanNodeERKNS_13DescriptorTblERSt10shared_ptrINS_13OperatorXBaseEERSB_INS_8PipelineEERSt6vectorISB_INS_21DataSinkOperatorXBaseEESaISK_EE
Line
Count
Source
1766
129
    RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
1767
129
1768
129
    const auto downstream_pipeline_id = cur_pipe->id();
1769
    if (!_dag.contains(downstream_pipeline_id)) {
1770
129
        _dag.insert({downstream_pipeline_id, {}});
1771
129
    }
1772
121
1773
121
    for (int child_id = 0; child_id < tnode.num_children; child_id++) {
1774
        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
1775
402
        _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
1776
273
1777
273
        if (child_id == 0) {
1778
            sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
1779
273
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1780
129
        } else {
1781
129
            sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
1782
144
                    child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs));
1783
144
        }
1784
144
        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
1785
144
        RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get()));
1786
273
        // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build.
1787
273
        _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
1788
    }
1789
273
1790
273
    return Status::OK();
1791
}
1792
129
1793
129
Status PipelineFragmentContext::submit() {
1794
    if (_submitted) {
1795
331k
        return Status::InternalError("submitted");
1796
331k
    }
1797
0
    _submitted = true;
1798
0
1799
331k
    int submit_tasks = 0;
1800
    Status st;
1801
331k
    auto* scheduler = _query_ctx->get_pipe_exec_scheduler();
1802
331k
    for (auto& task : _tasks) {
1803
331k
        for (auto& t : task) {
1804
977k
            st = scheduler->submit(t.first);
1805
1.72M
            DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed",
1806
1.72M
                            { st = Status::Aborted("PipelineFragmentContext.submit.failed"); });
1807
1.72M
            if (!st) {
1808
1.72M
                cancel(Status::InternalError("submit context to executor fail"));
1809
1.72M
                std::lock_guard<std::mutex> l(_task_mutex);
1810
0
                _total_tasks = submit_tasks;
1811
0
                break;
1812
0
            }
1813
0
            submit_tasks++;
1814
0
        }
1815
1.72M
    }
1816
1.72M
    if (!st.ok()) {
1817
977k
        bool need_remove = false;
1818
331k
        {
1819
0
            std::lock_guard<std::mutex> l(_task_mutex);
1820
0
            if (_closed_tasks >= _total_tasks) {
1821
0
                need_remove = _close_fragment_instance();
1822
0
            }
1823
0
        }
1824
0
        // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1825
0
        if (need_remove) {
1826
            _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1827
0
        }
1828
0
        return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
1829
0
                                     BackendOptions::get_localhost());
1830
0
    } else {
1831
0
        return st;
1832
331k
    }
1833
331k
}
1834
331k
1835
331k
void PipelineFragmentContext::print_profile(const std::string& extra_info) {
1836
    if (_runtime_state->enable_profile()) {
1837
0
        std::stringstream ss;
1838
0
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1839
0
            runtime_profile_ptr->pretty_print(&ss);
1840
0
        }
1841
0
1842
0
        if (_runtime_state->load_channel_profile()) {
1843
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1844
0
        }
1845
0
1846
0
        auto profile_str =
1847
                fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id),
1848
0
                            this->_fragment_id, extra_info, ss.str());
1849
0
        LOG_LONG_STRING(INFO, profile_str);
1850
0
    }
1851
0
}
1852
0
// If all pipeline tasks binded to the fragment instance are finished, then we could
1853
0
// close the fragment instance.
1854
// Returns true if the caller should call remove_pipeline_context() **after** releasing
1855
// _task_mutex. We must not call remove_pipeline_context() here because it acquires
1856
// _pipeline_map's shard lock, and this function is called while _task_mutex is held.
1857
// Acquiring _pipeline_map while holding _task_mutex creates an ABBA deadlock with
1858
// dump_pipeline_tasks(), which acquires _pipeline_map first and then _task_mutex
1859
// (via debug_string()).
1860
bool PipelineFragmentContext::_close_fragment_instance() {
1861
    if (_is_fragment_instance_closed) {
1862
331k
        return false;
1863
331k
    }
1864
0
    Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
1865
0
    _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
1866
331k
    if (!_need_notify_close) {
1867
331k
        auto st = send_report(true);
1868
331k
        if (!st) {
1869
328k
            LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}",
1870
328k
                                        print_id(_query_id), _fragment_id, st.to_string());
1871
0
        }
1872
0
    }
1873
0
    // Print profile content in info log is a tempoeray solution for stream load and external_connector.
1874
328k
    // Since stream load does not have someting like coordinator on FE, so
1875
    // backend can not report profile to FE, ant its profile can not be shown
1876
    // in the same way with other query. So we print the profile content to info log.
1877
1878
    if (_runtime_state->enable_profile() &&
1879
        (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD ||
1880
331k
         _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR ||
1881
331k
         _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) {
1882
2.63k
        std::stringstream ss;
1883
2.63k
        // Compute the _local_time_percent before pretty_print the runtime_profile
1884
0
        // Before add this operation, the print out like that:
1885
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%)
1886
        // After add the operation, the print out like that:
1887
        // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%)
1888
        // We can easily know the exec node execute time without child time consumed.
1889
        for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) {
1890
            runtime_profile_ptr->pretty_print(&ss);
1891
0
        }
1892
0
1893
0
        if (_runtime_state->load_channel_profile()) {
1894
            _runtime_state->load_channel_profile()->pretty_print(&ss);
1895
0
        }
1896
0
1897
0
        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str());
1898
    }
1899
0
1900
0
    if (_query_ctx->enable_profile()) {
1901
        _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(),
1902
331k
                                         collect_realtime_load_channel_profile());
1903
2.63k
    }
1904
2.63k
1905
2.63k
    // Return whether the caller needs to remove from the pipeline map.
1906
    // The caller must do this after releasing _task_mutex.
1907
    return !_need_notify_close;
1908
}
1909
331k
1910
331k
void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
1911
    // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here
1912
1.71M
    DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
1913
    if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
1914
1.71M
        if (_dag.contains(pipeline_id)) {
1915
1.71M
            for (auto dep : _dag[pipeline_id]) {
1916
524k
                _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
1917
292k
            }
1918
292k
        }
1919
292k
    }
1920
232k
    bool need_remove = false;
1921
524k
    {
1922
1.71M
        std::lock_guard<std::mutex> l(_task_mutex);
1923
1.71M
        ++_closed_tasks;
1924
1.71M
        if (_closed_tasks >= _total_tasks) {
1925
1.71M
            need_remove = _close_fragment_instance();
1926
1.71M
        }
1927
332k
    }
1928
332k
    // Call remove_pipeline_context() outside _task_mutex to avoid ABBA deadlock.
1929
1.71M
    if (need_remove) {
1930
        _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id});
1931
1.71M
    }
1932
328k
}
1933
328k
1934
1.71M
std::string PipelineFragmentContext::get_load_error_url() {
1935
    if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) {
1936
42.0k
        return to_load_error_http_path(str);
1937
42.0k
    }
1938
0
    for (auto& tasks : _tasks) {
1939
0
        for (auto& task : tasks) {
1940
108k
            if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) {
1941
186k
                return to_load_error_http_path(str);
1942
186k
            }
1943
151
        }
1944
151
    }
1945
186k
    return "";
1946
108k
}
1947
41.8k
1948
42.0k
std::string PipelineFragmentContext::get_first_error_msg() {
1949
    if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) {
1950
42.0k
        return str;
1951
42.0k
    }
1952
0
    for (auto& tasks : _tasks) {
1953
0
        for (auto& task : tasks) {
1954
107k
            if (const auto& str = task.second->get_first_error_msg(); !str.empty()) {
1955
186k
                return str;
1956
186k
            }
1957
151
        }
1958
151
    }
1959
186k
    return "";
1960
107k
}
1961
41.8k
1962
42.0k
Status PipelineFragmentContext::send_report(bool done) {
1963
    Status exec_status = _query_ctx->exec_status();
1964
332k
    // If plan is done successfully, but _is_report_success is false,
1965
332k
    // no need to send report.
1966
    // Load will set _is_report_success to true because load wants to know
1967
    // the process.
1968
    if (!_is_report_success && done && exec_status.ok()) {
1969
        return Status::OK();
1970
332k
    }
1971
295k
1972
295k
    // If both _is_report_success and _is_report_on_cancel are false,
1973
    // which means no matter query is success or failed, no report is needed.
1974
    // This may happen when the query limit reached and
1975
    // a internal cancellation being processed
1976
    // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will
1977
    // be set to false, to avoid sending fault report to FE.
1978
    if (!_is_report_success && !_is_report_on_cancel) {
1979
        if (done) {
1980
36.8k
            // if done is true, which means the query is finished successfully, we can safely close the fragment instance without sending report to FE, and just return OK status here.
1981
167
            return Status::OK();
1982
        }
1983
167
        return Status::NeedSendAgain("");
1984
167
    }
1985
0
1986
167
    std::vector<RuntimeState*> runtime_states;
1987
1988
36.6k
    for (auto& tasks : _tasks) {
1989
        for (auto& task : tasks) {
1990
85.8k
            runtime_states.push_back(task.second.get());
1991
137k
        }
1992
137k
    }
1993
137k
1994
85.8k
    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
1995
                                        ? get_load_error_url()
1996
36.6k
                                        : _query_ctx->get_load_error_url();
1997
36.6k
    std::string first_error_msg = _query_ctx->get_first_error_msg().empty()
1998
36.6k
                                          ? get_first_error_msg()
1999
36.6k
                                          : _query_ctx->get_first_error_msg();
2000
36.6k
2001
36.6k
    ReportStatusRequest req {.status = exec_status,
2002
                             .runtime_states = runtime_states,
2003
36.6k
                             .done = done || !exec_status.ok(),
2004
36.6k
                             .coord_addr = _query_ctx->coord_addr,
2005
36.6k
                             .query_id = _query_id,
2006
36.6k
                             .fragment_id = _fragment_id,
2007
36.6k
                             .fragment_instance_id = TUniqueId(),
2008
36.6k
                             .backend_num = -1,
2009
36.6k
                             .runtime_state = _runtime_state.get(),
2010
36.6k
                             .load_error_url = load_eror_url,
2011
36.6k
                             .first_error_msg = first_error_msg,
2012
36.6k
                             .cancel_fn = [this](const Status& reason) { cancel(reason); }};
2013
36.6k
2014
36.6k
    return _report_status_cb(
2015
            req, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
2016
36.6k
}
2017
36.6k
2018
36.8k
size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
2019
    size_t res = 0;
2020
0
    // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe
2021
0
    // here to traverse the vector.
2022
    for (const auto& task_instances : _tasks) {
2023
        for (const auto& task : task_instances) {
2024
0
            if (task.first->is_running()) {
2025
0
                LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
2026
0
                                      << " is running, task: " << (void*)task.first.get()
2027
0
                                      << ", is_running: " << task.first->is_running();
2028
0
                *has_running_task = true;
2029
0
                return 0;
2030
0
            }
2031
0
2032
0
            size_t revocable_size = task.first->get_revocable_size();
2033
            if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2034
0
                res += revocable_size;
2035
0
            }
2036
0
        }
2037
0
    }
2038
0
    return res;
2039
0
}
2040
0
2041
0
std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const {
2042
    std::vector<PipelineTask*> revocable_tasks;
2043
0
    for (const auto& task_instances : _tasks) {
2044
0
        for (const auto& task : task_instances) {
2045
0
            size_t revocable_size_ = task.first->get_revocable_size();
2046
0
2047
0
            if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
2048
                revocable_tasks.emplace_back(task.first.get());
2049
0
            }
2050
0
        }
2051
0
    }
2052
0
    return revocable_tasks;
2053
0
}
2054
0
2055
0
std::string PipelineFragmentContext::debug_string() {
2056
    std::lock_guard<std::mutex> l(_task_mutex);
2057
170
    fmt::memory_buffer debug_string_buffer;
2058
170
    fmt::format_to(debug_string_buffer,
2059
170
                   "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, "
2060
170
                   "need_notify_close={}, fragment_id={}, _rec_cte_stage={}\n",
2061
170
                   _closed_tasks, _total_tasks, _need_notify_close, _fragment_id, _rec_cte_stage);
2062
170
    for (size_t j = 0; j < _tasks.size(); j++) {
2063
170
        fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j);
2064
1.24k
        for (size_t i = 0; i < _tasks[j].size(); i++) {
2065
1.07k
            fmt::format_to(debug_string_buffer, "Task {}: {}\n", i,
2066
3.10k
                           _tasks[j][i].first->debug_string());
2067
2.03k
        }
2068
2.03k
    }
2069
2.03k
2070
1.07k
    return fmt::to_string(debug_string_buffer);
2071
}
2072
170
2073
170
std::vector<std::shared_ptr<TRuntimeProfileTree>>
2074
PipelineFragmentContext::collect_realtime_profile() const {
2075
    std::vector<std::shared_ptr<TRuntimeProfileTree>> res;
2076
2.63k
2077
2.63k
    // we do not have mutex to protect pipeline_id_to_profile
2078
    // so we need to make sure this funciton is invoked after fragment context
2079
    // has already been prepared.
2080
    if (!_prepared) {
2081
        std::string msg =
2082
2.63k
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2083
0
        DCHECK(false) << msg;
2084
0
        LOG_ERROR(msg);
2085
0
        return res;
2086
0
    }
2087
0
2088
0
    // Make sure first profile is fragment level profile
2089
    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
2090
    _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level());
2091
2.63k
    res.push_back(fragment_profile);
2092
2.63k
2093
2.63k
    // pipeline_id_to_profile is initialized in prepare stage
2094
    for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
2095
        auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
2096
4.88k
        pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level());
2097
4.88k
        res.push_back(profile_ptr);
2098
4.88k
    }
2099
4.88k
2100
4.88k
    return res;
2101
}
2102
2.63k
2103
2.63k
std::shared_ptr<TRuntimeProfileTree>
2104
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
2105
    // we do not have mutex to protect pipeline_id_to_profile
2106
2.63k
    // so we need to make sure this funciton is invoked after fragment context
2107
    // has already been prepared.
2108
    if (!_prepared) {
2109
        std::string msg =
2110
2.63k
                "Query " + print_id(_query_id) + " collecting profile, but its not prepared";
2111
0
        DCHECK(false) << msg;
2112
0
        LOG_ERROR(msg);
2113
0
        return nullptr;
2114
0
    }
2115
0
2116
0
    for (const auto& tasks : _tasks) {
2117
        for (const auto& task : tasks) {
2118
5.97k
            if (task.second->load_channel_profile() == nullptr) {
2119
11.8k
                continue;
2120
11.8k
            }
2121
0
2122
0
            auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2123
2124
11.8k
            task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
2125
                                                           _runtime_state->profile_level());
2126
11.8k
            _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
2127
11.8k
        }
2128
11.8k
    }
2129
11.8k
2130
5.97k
    auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
2131
    _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(),
2132
2.63k
                                                      _runtime_state->profile_level());
2133
2.63k
    return load_channel_profile;
2134
2.63k
}
2135
2.63k
2136
2.63k
// Collect runtime filter IDs registered by all tasks in this PFC.
2137
// Used during recursive CTE stage transitions to know which filters to deregister
2138
// before creating the new PFC for the next recursion round.
2139
// Called from rerun_fragment(wait_for_destroy) while tasks are still closing.
2140
// Thread safety: safe because _tasks is structurally immutable after prepare() —
2141
// the vector sizes do not change, and individual RuntimeState filter sets are
2142
// written only during open() which has completed by the time we reach rerun.
2143
std::set<int> PipelineFragmentContext::get_deregister_runtime_filter() const {
2144
    std::set<int> result;
2145
3.28k
    for (const auto& _task : _tasks) {
2146
3.28k
        for (const auto& task : _task) {
2147
7.80k
            auto set = task.first->runtime_state()->get_deregister_runtime_filter();
2148
13.1k
            result.merge(set);
2149
13.1k
        }
2150
13.1k
    }
2151
13.1k
    if (_runtime_state) {
2152
7.80k
        auto set = _runtime_state->get_deregister_runtime_filter();
2153
3.28k
        result.merge(set);
2154
3.28k
    }
2155
3.28k
    return result;
2156
3.28k
}
2157
3.28k
2158
3.28k
void PipelineFragmentContext::_release_resource() {
2159
    std::lock_guard<std::mutex> l(_task_mutex);
2160
333k
    // The memory released by the query end is recorded in the query mem tracker.
2161
333k
    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker());
2162
    auto st = _query_ctx->exec_status();
2163
333k
    for (auto& _task : _tasks) {
2164
333k
        if (!_task.empty()) {
2165
977k
            _call_back(_task.front().first->runtime_state(), &st);
2166
977k
        }
2167
977k
    }
2168
977k
    _tasks.clear();
2169
977k
    _dag.clear();
2170
333k
    _pip_id_to_pipeline.clear();
2171
333k
    _pipelines.clear();
2172
333k
    _sink.reset();
2173
333k
    _root_op.reset();
2174
333k
    _runtime_filter_mgr_map.clear();
2175
333k
    _op_id_to_shared_state.clear();
2176
333k
}
2177
333k
2178
333k
#include "common/compile_check_end.h"
2179
} // namespace doris